Retour aux Articles
Oct 07, 2024
5 min de lecture

Interroger des tables Delta : Guide complet

Les tables Delta offrent un moyen robuste et performant de gérer et d’interroger vos données dans un data lake. Ce guide couvre les différentes méthodes d’interrogation, ainsi que les bonnes pratiques pour exploiter pleinement vos données.

Méthodes d’interrogation des tables Delta

Il existe plusieurs façons d’interroger une table Delta, selon vos préférences et cas d’usage.

1. Utiliser des requêtes SQL SELECT

Pour les utilisateurs familiers du SQL, interroger une table Delta avec des requêtes SELECT est simple. Delta Lake s’intègre à Spark SQL et prend en charge la syntaxe SQL standard.

// Exemple Scala avec Spark SQL
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("DeltaTableQueryExample")
  .getOrCreate()

val df = spark.sql("SELECT * FROM my_delta_table WHERE column_a > 100")
df.show()
# Exemple PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeltaTableQueryExample").getOrCreate()

df = spark.sql("SELECT * FROM my_delta_table WHERE column_a > 100")
df.show()

2. Utiliser l’API Delta

L’API Delta permet d’interagir de manière programmatique avec les tables Delta, avec plus de contrôle.

// Exemple Scala
import io.delta.tables._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("DeltaTableQueryExample").getOrCreate()

val deltaTable = DeltaTable.forPath(spark, "/chemin/vers/table_delta")

val df = deltaTable.toDF
val filteredDF = deltaTable.toDF.where("column_a > 100")
filteredDF.show()
# Exemple PySpark
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeltaTableQueryExample").getOrCreate()

deltaTable = DeltaTable.forPath(spark, "/chemin/vers/table_delta")

df = deltaTable.toDF()
filtered_df = deltaTable.toDF().where("column_a > 100")
filtered_df.show()

3. Interroger des partitions spécifiques

Si votre table est partitionnée, ciblez des partitions précises pour optimiser les performances.

// Exemple Scala
val df = spark.read.format("delta")
  .option("basePath", "/chemin/vers/table_delta") // Optionnel
  .load()
  .where("partition_column = 'valeur'")
df.show()
# Exemple PySpark
df = spark.read.format("delta") \
    .option("basePath", "/chemin/vers/table_delta") \
    .load() \
    .where("partition_column = 'valeur'")
df.show()

4. Utiliser le Time Travel (version historique)

Delta Lake permet d’interroger des versions précédentes d’une table, ce qui est utile pour l’audit ou l’analyse historique.

Interroger par version

val df = spark.read.format("delta")
  .option("versionAsOf", 2)
  .load("/chemin/vers/table_delta")
df.show()
df = spark.read.format("delta") \
    .option("versionAsOf", 2) \
    .load("/chemin/vers/table_delta")
df.show()

Interroger par horodatage

val df = spark.read.format("delta")
  .option("timestampAsOf", "2023-10-27T10:00:00.000Z")
  .load("/chemin/vers/table_delta")
df.show()
df = spark.read.format("delta") \
    .option("timestampAsOf", "2023-10-27T10:00:00.000Z") \
    .load("/chemin/vers/table_delta")
df.show()

Options et configurations pour les requêtes

1. Filtrage des données

Utilisez WHERE pour limiter le volume de données lues :

SELECT * FROM my_delta_table WHERE column_b = 'valeur' AND column_c > 10

2. Agrégation

Utilisez GROUP BY avec des fonctions comme SUM, AVG, COUNT :

SELECT column_d, COUNT(*) FROM my_delta_table GROUP BY column_d

3. Tri des résultats

Tri par colonne :

SELECT * FROM my_delta_table ORDER BY column_e DESC

4. Jointures

Jointures avec d’autres tables ou sources :

SELECT t1.*, t2.other_column
FROM my_delta_table t1
JOIN another_table t2 ON t1.join_column = t2.join_column

5. Optimisation des performances

Profitez des fonctionnalités comme le data skipping, le partitionnement et le Z-ordering pour accélérer vos requêtes (voir articles dédiés).

6. Lecture par version ou horodatage

Comme vu plus haut, versionAsOf et timestampAsOf permettent de faire des requêtes historiques.

Bonnes pratiques pour interroger des tables Delta

  • Filtrage efficace : placez les filtres tôt dans les requêtes.
  • Partitionnement : ciblez les partitions pour éviter des scans globaux.
  • Data Skipping : assurez-vous que les statistiques sont bien générées.
  • Z-Ordering : améliore l’accès rapide aux colonnes souvent filtrées.
  • Choix du format : Delta utilise Parquet, mais adaptez si un autre format est plus performant pour votre cas.
  • Mise en cache stratégique : utilisez df.cache() ou spark.catalog.cacheTable(...) pour les requêtes répétées.
  • Analyse des performances : consultez Spark UI pour identifier les lenteurs (scans complets, jointures inefficaces, etc.).
  • Compréhension des coûts : certaines requêtes (agrégats, jointures complexes) sont plus gourmandes — planifiez-les en conséquence.

Gestion de l’évolution de schéma dans le Time Travel

Lors de requêtes sur des versions anciennes, vous pouvez rencontrer des problèmes de schéma. Utilisez spark.read.option("readChangeFeed", "true") pour lire les changements incrémentaux, ou ajustez vos requêtes selon les colonnes disponibles.

Tables gérées vs. externes

Le processus d’interrogation est identique. La seule différence est que pour une table externe, les fichiers sont stockés en dehors du répertoire géré par Delta Lake. Assurez-vous que le chemin est toujours valide.

Conseils de dépannage

  • Erreur “Table not found” : vérifiez le nom et le chemin de la table.
  • Problèmes de performance : consultez Spark UI pour analyser les plans d’exécution.
  • Erreur de schéma : lors du Time Travel, certaines colonnes peuvent être absentes.
  • Résultats incorrects : relisez bien vos filtres, jointures, et horodatages utilisés.

En appliquant ces bonnes pratiques, vous serez en mesure d’interroger efficacement vos tables Delta et d’en extraire toute la valeur.