Aller au contenu principal

Ilum Tables

Ilum Tables est un format Spark qui sert d’enveloppe pour les formats de données Delta, Iceberg et Hudi. Il vous permet d’accéder et de créer des ensembles de données dans ces formats à l’aide d’une interface unifiée, ce qui se traduit par une conception de code plus flexible.

Importer des tables Ilum

Pour utiliser Ilum Tables Vous devez inclure ilum-spark-format dans votre Emplois Ilum . Vous pouvez le faire en ajoutant cette configuration :

spark.jars.packages=cloud.ilum :ilum-spark-format :6.1.0 

ou en ajoutant le package en tant que jar séparé dans vos ressources Ilum Job

Comment inclure un forfait dans votre Scala Application

  • Utilisation de sbt :
libraryDependencies += "cloud.ilum" %  "ilum-spark-format" %  "6.1.0"
  • Utilisation de maven :
< dépendance > 
< groupId > cloud.ilum </ groupId >
< artefactId > ilum-spark-format </ artefactId >
< Version > 6.1.0 </ Version >
</ dépendance >
  • Utilisation de gradle :
Groupe d’implémentation : 'cloud.ilum', Nom : 'ilum-spark-format', version : '6.1.0' 

Comment l’utiliser ?

  • Lire et écrire les données en spécifiant ' Format de l’ilum '
  • Lire et écrire des données à l’aide de Méthode ilum

Pour ce faire, vous devez les importer comme ceci :

importation  nuage . ilum . implicits. { 
IlumDataFrameReader,
IlumDataFrameWriter,
IlumDataFrameWriterV2,
IlumDataStreamWriter,
IlumDataStreamReader
}
  • Lire et écrire des données en préconfigurant le catalogue ( writeTo, read.table)

Lecture

val filepath =  "s3a://ilum-files/ilum-tables/table"
val tableFormat = Some( "delta")

sans méthode ilum
val mydf = étincelle . lire . format( "ilum") . option ( "tableFormat", tableFormat) . load( filepath)

avec la méthode ilum
val mydf2 = sparkSession. lire . ilum ( filePath, tableFormat)


Écriture

val filepath =  "s3a://ilum-files/ilum-tables/table"
val tableFormat = "delta"

valdonnées = Seq(
( 1 , "Alice") ,
( 2 , "Bob") ,
( 3 , "Cathy")
)

valDf = étincelle . createDataFrame ( données ) . toDF( « Pièce d’identité » , « nom » )

à l’aide de DataframeWriterV1

Vous pouvez utiliser une syntaxe comme celle-ci
Df . écrire . format( "ilum") . option ( "tableFormat", tableFormat) . save( filepath)

ou vous pouvez utiliser la fonction ilum

Df . écrire . ilum ( filepath + "/1", format)

utilisation de DataframeWriterV2 avec un catalogue Delta préconfiguré
valcatalogue = "catalog"
valtable = "tablename"
Df . writeTo( s " ${catalogue } . ${table } " ) . ilum ( format, None ) . createOrReplace( )

Streaming

Sans méthodes Ilum

val filepath =  "s3a://ilum-files/ilum-tables/streaming"
val tableFormat = "delta"

val input = étincelle . readStream
. format( "ilum")
. option ( "tableFormat", tableFormat)
. load( filepath)

valrequête = input. writeStream
. outputMode( "append")
. format( "ilum")
. option ( "tableFormat", tableFormat)
. option ( "path", filepath + "_copy")
. option ( "checkpointLocation", filepath + "_checkpoint")
. start( )

requête . awaitTermination( )

Avec les méthodes Ilum :


val filePath = s "s3a://ilum-files/ilum-tables/smth"
val tableFormat = Some( "delta")

valDf = sparkSession. readStream. ilum ( filePath, tableFormat)

valrequête = Df . writeStream
. option ( "checkpointLocation", filePath + "_checkpoint")
. ilum ( filePath+ "_copy", tableFormat)

requête . awaitTermination( )

Configuration des formats de données

Delta

Pour utiliser Delta, vous devez utiliser les configurations d’étincelle suivantes :

spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.warehouse.dir=s3a ://ilum-files/ilum-warehouse

et vous devez inclure le package Delta dans votre environnement. Pour ce faire, vous pouvez utiliser l’image Spark Kubernetes avec des extensions delta Préinstallé :

spark.kubernetes.container.image=ilum/spark :3.5.2-delta 

ou installez vous-même le package d’extension requis

Iceberg

Pour utiliser Iceberg, vous devez ajouter ces configurations :

spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog 
spark.sql.catalog.iceberg_catalog.type=ruche
spark.sql.catalog.iceberg_catalog, org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.type=Hadoop
spark.sql.catalog.iceberg_catalog.warehouse=s3a ://ilum-files/ilum-tables/iceberg/warehouse

et vous devez inclure org.apache.iceberg :iceberg-spark-runtime-3.5_2.13:1.6.1 package dans votre environnement en ajoutant son jar aux ressources ou en l’ajoutant à des configurations Spark telles que ceci:

spark.jars.packages=org.apache.iceberg :iceberg-spark-runtime-3.5_2.13:1.6.1 

Hudi

Pour utiliser Hudi, vous devez ajouter les configurations suivantes :

spark.serializer=org.apache.spark.serializer.KryoSerializer 
spark.sql.extensions=org.apache.spark.sql.hudi.Sweat à capucheSpearkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.Sweat à capucheCatalogue

et importer org.apache.hudi :hudi-spark3.5-bundle_2.12:0.15.0 packager dans votre environnement en ajoutant son jar aux ressources ou en l’ajoutant à des configurations Spark comme ceci :

spark.jars.packages=org.apache.hudi :hudi-spark3.5-bundle_2.12:0.15.0