Retour aux Articles
Nov 29, 2024
5 min de lecture

Maîtriser le Schema Evolution dans les Delta Tables

Dans le monde dynamique du Data Engineering, les formats et besoins en données évoluent constamment. Historiquement, gérer les changements de schéma dans les data lakes était un processus complexe et sujet aux erreurs, nécessitant souvent une migration manuelle des données et impactant les applications en aval. La fonctionnalité de schema evolution de Delta Lake simplifie grandement cela, en vous permettant d’adapter vos tables aux nouveaux besoins sans intervention manuelle. Cet article vous guide à travers toutes les notions clés du schema evolution dans les Delta Tables, pour vous permettre de gérer efficacement les changements de schéma dans vos data pipelines.

Comprendre le Schema Evolution

Le schema evolution désigne la capacité à modifier la structure d’une table au fil du temps : ajout de colonnes, changement de type de données, ou renommage. Dans un data lake traditionnel, ces modifications impliquent souvent la création d’une nouvelle table, la migration des données et l’adaptation de tous les traitements downstream — une démarche chronophage et risquée.

Delta Lake automatise de nombreuses modifications de schéma courantes, réduisant l’effort manuel et les risques d’erreurs.

Avantages clés :

  • Réduction de l’effort manuel : Mise à jour automatique du schéma, sans scripts de migration.
  • Résilience accrue des pipelines : Les pipelines s’adaptent aux changements de schéma sans modification majeure de code.
  • Moins d’interruptions : Les changements peuvent être appliqués sans mettre la table hors ligne.
  • Meilleure traçabilité : L’historique des changements de schéma est conservé, ce qui améliore la gouvernance des données.

Modes de Schema Evolution dans Delta Lake

Delta Lake prend en charge plusieurs modes de schema evolution, adaptés à différents scénarios.

1. Merge Schema automatique

C’est le mode le plus courant. Lors de l’écriture de nouvelles données, Delta Lake détecte automatiquement les changements et adapte le schéma : ajout de colonnes, par exemple. Cela simplifie beaucoup les ETL dont les sources peuvent évoluer.

Exemple (PySpark) :

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
from pyspark.sql.functions import col

new_data = spark.createDataFrame(
    [(1, "Alice", 30), (2, "Bob", 25), (3, "Charlie", 35)],
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ])
)

(new_data.write.format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .save("/path/to/delta/users"))

delta_table = DeltaTable.forPath(spark, "/path/to/delta/users")
print(delta_table.toDF().schema)

Exemple (Scala) :

import org.apache.spark.sql.types._
import io.delta.tables._
import org.apache.spark.sql.functions._

val newData = spark.createDataFrame(Seq(
  (1, "Alice", 30), (2, "Bob", 25), (3, "Charlie", 35)
)).toDF("id", "name", "age")

newData.write.format("delta")
  .mode("append")
  .option("mergeSchema", "true")
  .save("/path/to/delta/users")

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta/users")
println(deltaTable.toDF.schema)

2. Ajout explicite de colonnes

On peut aussi ajouter une colonne via SQL ou l’API Delta sans passer par un write.

Exemple (SQL) :

ALTER TABLE delta.`/path/to/delta/users` ADD COLUMNS (age INT);

Exemple (PySpark) :

from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/path/to/delta/users")
delta_table.addColumn("age", "INT")  # Delta 3.0+ requis

Exemple (Scala) :

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta/users")
deltaTable.addColumn("age", "INT")  // Delta 3.0+ requis

3. Changement de type de données

Changer un type de données existant est plus délicat. Delta Lake autorise certains changements (ex : int → long), mais pas tous (ex : string → int).

À retenir :

  • Compatibilité des données
  • Perte de précision potentielle
  • Impacts downstream

Exemple (PySpark) :

from delta.tables import DeltaTable
from pyspark.sql.functions import col

delta_table = DeltaTable.forPath(spark, "/path/to/delta/users")
df = delta_table.toDF().withColumn("age", col("age").cast("LONG"))

(df.write
  .format("delta")
  .mode("overwrite")
  .option("mergeSchema", "true")
  .option("overwriteSchema", "true")
  .save("/path/to/delta/users"))

Exemple (Scala) :

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta/users")
val df = deltaTable.toDF.withColumn("age", col("age").cast("LONG"))

df.write
  .format("delta")
  .mode("overwrite")
  .option("mergeSchema", "true")
  .option("overwriteSchema", "true")
  .save("/path/to/delta/users")

Configuration du Schema Evolution

L’évolution du schéma repose sur certaines options à configurer à l’écriture :

  • mergeSchema : fusion automatique des schémas (par défaut : false)
  • overwriteSchema : écrasement total du schéma existant, à utiliser avec overwrite()
  • Changement de partition : attention, cela peut nécessiter une recréation de la table

Exemple (PySpark) :

df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/path/to/delta/my_table")

Bonnes pratiques

  • Anticipez le changement : Préparez votre modèle de données avec une certaine souplesse
  • Communiquez : Tenez informées les équipes downstream
  • Surveillez : Utilisez le transaction log pour tracer les changements
  • Gérez les cas complexes manuellement : Ex : suppression de colonnes, types non compatibles
  • Testez vos pipelines : Toujours après modification de schéma
  • Versionnez les définitions de schéma : Pour faciliter les rollbacks

Cas d’usage concrets

  • Sources de données évolutives : APIs ou bases tierces avec schéma changeant
  • Ajout de fonctionnalités : nouveaux attributs métier
  • Refinement des types : améliorer performance ou précision
  • Intégration progressive de sources multiples

Débogage et erreurs fréquentes

  • UnsupportedOperationException : opération de schéma non supportée (type incompatible, suppression)
  • AnalysisException : conflit de types ou colonnes incohérentes
  • Qualité des données : vérifiez les valeurs après changement

Exemple (gestion d’erreur PySpark) :

try:
    df.write.format("delta").mode("append").option("mergeSchema", "true").save("/path/to/delta/my_table")
except Exception as e:
    print(f"Erreur lors du schema evolution : {e}")

Conclusion

Le schema evolution de Delta Lake facilite grandement la gestion des schémas évolutifs dans les pipelines de Data Engineering. En maîtrisant les modes d’évolution, les bonnes pratiques et la configuration, vous pouvez construire des systèmes robustes, adaptables et conformes aux exigences changeantes des données modernes.