Ilum Tables
Ilum Tables est un format Spark qui sert d’enveloppe pour les formats de données Delta, Iceberg et Hudi. Il vous permet d’accéder et de créer des ensembles de données dans ces formats à l’aide d’une interface unifiée, ce qui se traduit par une conception de code plus flexible.
Importer des tables Ilum
Pour utiliser Ilum Tables Vous devez inclure ilum-spark-format dans votre Emplois Ilum . Vous pouvez le faire en ajoutant cette configuration :
spark.jars.packages=cloud.ilum :ilum-spark-format :6.1.0
ou en ajoutant le package en tant que jar séparé dans vos ressources Ilum Job
Comment inclure un forfait dans votre Scala Application
- Utilisation de sbt :
libraryDependencies += "cloud.ilum" % "ilum-spark-format" % "6.1.0"
- Utilisation de maven :
< dépendance >
< groupId > cloud.ilum </ groupId >
< artefactId > ilum-spark-format </ artefactId >
< Version > 6.1.0 </ Version >
</ dépendance >
- Utilisation de gradle :
Groupe d’implémentation : 'cloud.ilum', Nom : 'ilum-spark-format', version : '6.1.0'
Comment l’utiliser ?
- Lire et écrire les données en spécifiant ' Format de l’ilum '
- Lire et écrire des données à l’aide de Méthode ilum
Pour ce faire, vous devez les importer comme ceci :
importation nuage . ilum . implicits. {
IlumDataFrameReader,
IlumDataFrameWriter,
IlumDataFrameWriterV2,
IlumDataStreamWriter,
IlumDataStreamReader
}
- Lire et écrire des données en préconfigurant le catalogue ( writeTo, read.table)
Lecture
val filepath = "s3a://ilum-files/ilum-tables/table"
val tableFormat = Some( "delta")
sans méthode ilum
val mydf = étincelle . lire . format( "ilum") . option ( "tableFormat", tableFormat) . load( filepath)
avec la méthode ilum
val mydf2 = sparkSession. lire . ilum ( filePath, tableFormat)
Écriture
val filepath = "s3a://ilum-files/ilum-tables/table"
val tableFormat = "delta"
valdonnées = Seq(
( 1 , "Alice") ,
( 2 , "Bob") ,
( 3 , "Cathy")
)
valDf = étincelle . createDataFrame ( données ) . toDF( « Pièce d’identité » , « nom » )
à l’aide de DataframeWriterV1
Vous pouvez utiliser une syntaxe comme celle-ci
Df . écrire . format( "ilum") . option ( "tableFormat", tableFormat) . save( filepath)
ou vous pouvez utiliser la fonction ilum
Df . écrire . ilum ( filepath + "/1", format)
utilisation de DataframeWriterV2 avec un catalogue Delta préconfiguré
valcatalogue = "catalog"
valtable = "tablename"
Df . writeTo( s " ${catalogue } . ${table } " ) . ilum ( format, None ) . createOrReplace( )
Streaming
Sans méthodes Ilum
val filepath = "s3a://ilum-files/ilum-tables/streaming"
val tableFormat = "delta"
val input = étincelle . readStream
. format( "ilum")
. option ( "tableFormat", tableFormat)
. load( filepath)
valrequête = input. writeStream
. outputMode( "append")
. format( "ilum")
. option ( "tableFormat", tableFormat)
. option ( "path", filepath + "_copy")
. option ( "checkpointLocation", filepath + "_checkpoint")
. start( )
requête . awaitTermination( )
Avec les méthodes Ilum :
val filePath = s "s3a://ilum-files/ilum-tables/smth"
val tableFormat = Some( "delta")
valDf = sparkSession. readStream. ilum ( filePath, tableFormat)
valrequête = Df . writeStream
. option ( "checkpointLocation", filePath + "_checkpoint")
. ilum ( filePath+ "_copy", tableFormat)
requête . awaitTermination( )
Configuration des formats de données
Delta
Pour utiliser Delta, vous devez utiliser les configurations d’étincelle suivantes :
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.warehouse.dir=s3a ://ilum-files/ilum-warehouse
et vous devez inclure le package Delta dans votre environnement. Pour ce faire, vous pouvez utiliser l’image Spark Kubernetes avec des extensions delta Préinstallé :
spark.kubernetes.container.image=ilum/spark :3.5.2-delta
ou installez vous-même le package d’extension requis
Iceberg
Pour utiliser Iceberg, vous devez ajouter ces configurations :
spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.iceberg_catalog.type=ruche
spark.sql.catalog.iceberg_catalog, org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.type=Hadoop
spark.sql.catalog.iceberg_catalog.warehouse=s3a ://ilum-files/ilum-tables/iceberg/warehouse
et vous devez inclure org.apache.iceberg :iceberg-spark-runtime-3.5_2.13:1.6.1 package dans votre environnement en ajoutant son jar aux ressources ou en l’ajoutant à des configurations Spark telles que
ceci:
spark.jars.packages=org.apache.iceberg :iceberg-spark-runtime-3.5_2.13:1.6.1
Hudi
Pour utiliser Hudi, vous devez ajouter les configurations suivantes :
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.extensions=org.apache.spark.sql.hudi.Sweat à capucheSpearkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.Sweat à capucheCatalogue
et importer org.apache.hudi :hudi-spark3.5-bundle_2.12:0.15.0 packager dans votre environnement en ajoutant son jar aux ressources ou en l’ajoutant à des configurations Spark comme ceci :
spark.jars.packages=org.apache.hudi :hudi-spark3.5-bundle_2.12:0.15.0