Retour aux Articles
Jan 09, 2025
5 min de lecture

Delta Tables : Cas d’usage en traitement batch et streaming

Grâce à leurs propriétés ACID et leur support natif du batch comme du streaming, les Delta Tables offrent une solution polyvalente pour de nombreux scénarios de traitement de données. Cet article explore des cas d’usage pratiques dans les deux contextes, illustrés par du code et des exemples concrets.

Cas d’usage en traitement batch

Les Delta Tables brillent dans les pipelines batch pour le data warehousing, l’ETL, l’analyse, et la préparation des données.

Data Warehousing et pipelines ETL

Delta Tables constituent une base robuste pour un data warehouse, avec gestion de schéma évolutif et transactions ACID intégrées.

Exemple (PySpark) :

from pyspark.sql.functions import *
from pyspark.sql.types import *

input_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("payload", StringType(), True)
])

raw_data = spark.read.schema(input_schema).parquet("/path/to/raw/events")

transformed_data = raw_data.withColumn("processed_time", current_timestamp()) \
    .filter(col("event_type").isin("login", "logout")) \
    .select("user_id", "event_time", "event_type", "processed_time")

transformed_data.write.format("delta").mode("append").save("/path/to/delta/events")

delta_events = spark.read.format("delta").load("/path/to/delta/events")
delta_events.groupBy("event_type").count().show()

Data Lake Management et qualité des données

Avec les Delta Tables, vous pouvez appliquer des règles strictes sur les schémas et garantir l’intégrité des données.

Exemple (Scala) :

import io.delta.tables._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("product_id", IntegerType, false),
  StructField("name", StringType, true),
  StructField("price", DoubleType, true)
))

spark.emptyDataFrame(schema)
  .write
  .format("delta")
  .mode("overwrite")
  .option("dataChange", "false")
  .save("/path/to/delta/products")

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta/products")

deltaTable.update()
  .withColumn("price", expr("IF(price >= 0, price, NULL)"))
  .execute()

Feature Engineering et préparation des données pour ML

Les Delta Tables permettent de versionner et de tracer les datasets utilisés pour l’entraînement des modèles ML.

Exemple (PySpark) :

from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression

data = spark.read.format("delta").load("/path/to/delta/training_data")

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed_data = indexer.fit(data).transform(data)

feature_cols = ["feature1", "feature2", "categoryIndex"]
feature_vector = udf(lambda features: Vectors.dense(features), VectorUDT())
feature_data = indexed_data.withColumn("features", feature_vector(array(*feature_cols))) \
    .select("features", "label")

(training_data, test_data) = feature_data.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(training_data)
predictions = model.transform(test_data)

feature_data.write.format("delta").mode("overwrite").save("/path/to/delta/prepared_training_data")

Analyse historique et reporting

Grâce au Time Travel, les Delta Tables permettent d’interroger l’état passé des données.

Exemple (SQL) :

SELECT * FROM delta.`/path/to/delta/table` VERSION AS OF 5;
SELECT * FROM delta.`/path/to/delta/table` TIMESTAMP AS OF '2023-10-27 10:00:00';

CREATE OR REPLACE TEMP VIEW historical_data AS
SELECT * FROM delta.`/path/to/delta/sales_data` VERSION AS OF 10;

SELECT date, SUM(amount) as total_sales
FROM historical_data
GROUP BY date
ORDER BY date;

Cas d’usage en traitement streaming

Delta Tables s’intègrent naturellement aux frameworks de streaming comme Spark Structured Streaming.

Pipelines temps réel et applications de stream processing

Delta peut être utilisé comme sink ou source fiable dans des applications de streaming.

Exemple (PySpark Structured Streaming) :

stream_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("action", StringType(), True),
    StructField("resource", StringType(), True)
])

stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "your_kafka_brokers") \
    .option("subscribe", "your_topic") \
    .load() \
    .select(from_json(col("value").cast("string"), stream_schema).alias("data")) \
    .select("data.*")

processed_stream = stream_df.groupBy("user_id") \
    .agg(count("*").alias("event_count")) \
    .select("user_id", "event_count", current_timestamp().alias("processed_time"))

query = processed_stream.writeStream.format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .start("/path/to/delta/user_activity")

CDC (Change Data Capture) et synchronisation

Delta facilite l’implémentation de CDC avec des opérations MERGE.

Exemple (PySpark) :

source_df = spark.read.format("delta").load("/path/to/source_table")
destination_df = spark.read.format("delta").load("/path/to/destination_table")

deltaTable = DeltaTable.forPath(spark, "/path/to/destination_table")

deltaTable.alias("destination") \
    .merge(
        source_df.alias("source"),
        "destination.id = source.id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

Agrégations continues et vues matérialisées

Delta peut stocker les résultats d’agrégations temps réel pour les dashboards.

Exemple :

aggregated_data = input_stream.groupBy("window", "category") \
    .agg(sum("value").alias("total_value")) \
    .select("window", "category", "total_value")

query = aggregated_data.writeStream.format("delta") \
    .outputMode("complete") \
    .partitionBy("category") \
    .option("checkpointLocation", "/path/to/checkpoint_agg") \
    .start("/path/to/delta/aggregated_data")

Monitoring et analytics en temps réel

Exemple de boucle pour dashboard :

while True:
    df = spark.read.format("delta").load("/path/to/delta/processed_events") \
        .groupBy("action").count().toPandas()

    plt.figure(figsize=(8, 6))
    plt.bar(df["action"], df["count"])
    plt.xlabel("Action")
    plt.ylabel("Count")
    plt.title("Real-Time Event Counts")
    plt.pause(1)
    plt.clf()

Avantages des Delta Tables pour batch et streaming

  • Traitement unifié : même couche de stockage pour batch et streaming
  • Fiabilité et cohérence : grâce aux propriétés ACID
  • Qualité et gouvernance des données : schéma strict, contraintes, versioning
  • Support des pipelines complexes : transformations avancées, CDC, etc.
  • Analytique temps réel : lecture efficace en streaming

Comparaison batch vs streaming

FonctionnalitéBatch ProcessingStreaming Processing
SourceFichiers, bases de donnéesKafka, Kinesis, etc.
Mode de traitementAppend / OverwriteAppend / Update
ÉcritureDonnées écrites en une foisIncrémental en continu
LectureVersion entière ou historiqueDonnées en cours ou vues matérialisées
LatenceÉlevéeFaible / temps réel

Bonnes pratiques

Pour le batch :

  • Partitionnement adapté
  • Z-Ordering sur colonnes filtrées fréquemment
  • Compaction régulière
  • Caching des résultats intermédiaires

Pour le streaming :

  • Checkpointing activé pour reprise après échec
  • Output Mode bien choisi (append/update/complete)
  • Watermarking pour données en retard
  • Backpressure surveillé et géré

Conclusion

Les Delta Tables offrent une approche unifiée, fiable et performante pour le traitement batch et streaming. En comprenant les cas d’usage spécifiques à chaque mode et en appliquant les bonnes pratiques, les data engineers peuvent construire des pipelines robustes, scalables et optimisés. Les exemples partagés ici constituent une base solide pour tirer parti de Delta Lake dans tous vos traitements de données.