Les tables Delta offrent un moyen robuste et performant de gérer et d’interroger vos données dans un data lake. Ce guide couvre les différentes méthodes d’interrogation, ainsi que les bonnes pratiques pour exploiter pleinement vos données.
Méthodes d’interrogation des tables Delta
Il existe plusieurs façons d’interroger une table Delta, selon vos préférences et cas d’usage.
1. Utiliser des requêtes SQL SELECT
Pour les utilisateurs familiers du SQL, interroger une table Delta avec des requêtes SELECT
est simple. Delta Lake s’intègre à Spark SQL et prend en charge la syntaxe SQL standard.
// Exemple Scala avec Spark SQL
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("DeltaTableQueryExample")
.getOrCreate()
val df = spark.sql("SELECT * FROM my_delta_table WHERE column_a > 100")
df.show()
# Exemple PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DeltaTableQueryExample").getOrCreate()
df = spark.sql("SELECT * FROM my_delta_table WHERE column_a > 100")
df.show()
2. Utiliser l’API Delta
L’API Delta permet d’interagir de manière programmatique avec les tables Delta, avec plus de contrôle.
// Exemple Scala
import io.delta.tables._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("DeltaTableQueryExample").getOrCreate()
val deltaTable = DeltaTable.forPath(spark, "/chemin/vers/table_delta")
val df = deltaTable.toDF
val filteredDF = deltaTable.toDF.where("column_a > 100")
filteredDF.show()
# Exemple PySpark
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DeltaTableQueryExample").getOrCreate()
deltaTable = DeltaTable.forPath(spark, "/chemin/vers/table_delta")
df = deltaTable.toDF()
filtered_df = deltaTable.toDF().where("column_a > 100")
filtered_df.show()
3. Interroger des partitions spécifiques
Si votre table est partitionnée, ciblez des partitions précises pour optimiser les performances.
// Exemple Scala
val df = spark.read.format("delta")
.option("basePath", "/chemin/vers/table_delta") // Optionnel
.load()
.where("partition_column = 'valeur'")
df.show()
# Exemple PySpark
df = spark.read.format("delta") \
.option("basePath", "/chemin/vers/table_delta") \
.load() \
.where("partition_column = 'valeur'")
df.show()
4. Utiliser le Time Travel (version historique)
Delta Lake permet d’interroger des versions précédentes d’une table, ce qui est utile pour l’audit ou l’analyse historique.
Interroger par version
val df = spark.read.format("delta")
.option("versionAsOf", 2)
.load("/chemin/vers/table_delta")
df.show()
df = spark.read.format("delta") \
.option("versionAsOf", 2) \
.load("/chemin/vers/table_delta")
df.show()
Interroger par horodatage
val df = spark.read.format("delta")
.option("timestampAsOf", "2023-10-27T10:00:00.000Z")
.load("/chemin/vers/table_delta")
df.show()
df = spark.read.format("delta") \
.option("timestampAsOf", "2023-10-27T10:00:00.000Z") \
.load("/chemin/vers/table_delta")
df.show()
Options et configurations pour les requêtes
1. Filtrage des données
Utilisez WHERE
pour limiter le volume de données lues :
SELECT * FROM my_delta_table WHERE column_b = 'valeur' AND column_c > 10
2. Agrégation
Utilisez GROUP BY
avec des fonctions comme SUM
, AVG
, COUNT
:
SELECT column_d, COUNT(*) FROM my_delta_table GROUP BY column_d
3. Tri des résultats
Tri par colonne :
SELECT * FROM my_delta_table ORDER BY column_e DESC
4. Jointures
Jointures avec d’autres tables ou sources :
SELECT t1.*, t2.other_column
FROM my_delta_table t1
JOIN another_table t2 ON t1.join_column = t2.join_column
5. Optimisation des performances
Profitez des fonctionnalités comme le data skipping, le partitionnement et le Z-ordering pour accélérer vos requêtes (voir articles dédiés).
6. Lecture par version ou horodatage
Comme vu plus haut, versionAsOf
et timestampAsOf
permettent de faire des requêtes historiques.
Bonnes pratiques pour interroger des tables Delta
- Filtrage efficace : placez les filtres tôt dans les requêtes.
- Partitionnement : ciblez les partitions pour éviter des scans globaux.
- Data Skipping : assurez-vous que les statistiques sont bien générées.
- Z-Ordering : améliore l’accès rapide aux colonnes souvent filtrées.
- Choix du format : Delta utilise Parquet, mais adaptez si un autre format est plus performant pour votre cas.
- Mise en cache stratégique : utilisez
df.cache()
ouspark.catalog.cacheTable(...)
pour les requêtes répétées. - Analyse des performances : consultez Spark UI pour identifier les lenteurs (scans complets, jointures inefficaces, etc.).
- Compréhension des coûts : certaines requêtes (agrégats, jointures complexes) sont plus gourmandes — planifiez-les en conséquence.
Gestion de l’évolution de schéma dans le Time Travel
Lors de requêtes sur des versions anciennes, vous pouvez rencontrer des problèmes de schéma. Utilisez spark.read.option("readChangeFeed", "true")
pour lire les changements incrémentaux, ou ajustez vos requêtes selon les colonnes disponibles.
Tables gérées vs. externes
Le processus d’interrogation est identique. La seule différence est que pour une table externe, les fichiers sont stockés en dehors du répertoire géré par Delta Lake. Assurez-vous que le chemin est toujours valide.
Conseils de dépannage
- Erreur “Table not found” : vérifiez le nom et le chemin de la table.
- Problèmes de performance : consultez Spark UI pour analyser les plans d’exécution.
- Erreur de schéma : lors du Time Travel, certaines colonnes peuvent être absentes.
- Résultats incorrects : relisez bien vos filtres, jointures, et horodatages utilisés.
En appliquant ces bonnes pratiques, vous serez en mesure d’interroger efficacement vos tables Delta et d’en extraire toute la valeur.