Utilizzo del framework Hudi in AWS Glue - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo del framework Hudi in AWS Glue

AWS Glue 3.0 e versioni successive supportano il framework Apache Hudi per i data lake. Hudi è un framework di archiviazione di data lake open source che semplifica l'elaborazione incrementale dei dati e lo sviluppo di pipeline di dati. Questo argomento descrive le funzionalità disponibili per l'utilizzo dei dati in AWS Glue quando si trasportano o si archiviano i dati in una tabella Hudi. Per ulteriori informazioni su Hudi, consulta la documentazione ufficiale di Apache Hudi.

Puoi usare AWS Glue per eseguire operazioni di lettura e scrittura sulle tabelle Hudi in Amazon S3 o lavorare con le tabelle Hudi utilizzando il AWS Glue Data Catalog. Sono supportate anche operazioni aggiuntive, tra cui inserimento, aggiornamento e tutte le operazioni di Apache Spark.

Nota

Apache Hudi 0.10.1 per AWS Glue 3.0 non supporta le tabelle Hudi Merge on Read (MoR).

La tabella seguente elenca la versione Hudi inclusa in ogni versione di AWS Glue.

AWS Versione Glue Versione Hudi supportata
5.0 0.15.0
4.0 0.12.1
3.0 0,10,1

Per ulteriori informazioni sui framework di data lake supportati da AWS Glue, consulta. Utilizzo di framework di data lake con AWS Glue processi ETL

Abilitazione di Hudi

Per abilitare Hudi for AWS Glue, completa le seguenti attività:

  • Specifica hudi come valore per i parametri del processo --datalake-formats. Per ulteriori informazioni, consulta Utilizzo dei parametri del lavoro nei lavori AWS Glue.

  • Crea una chiave denominata --conf per il tuo lavoro AWS Glue e impostala sul seguente valore. In alternativa, puoi impostare la seguente configurazione usando SparkConf nel tuo script. Queste impostazioni consentono ad Apache Spark di gestire correttamente le tabelle Hudi.

    spark.serializer=org.apache.spark.serializer.KryoSerializer
  • Il supporto delle autorizzazioni Lake Formation per Hudi è abilitato di default per AWS Glue 4.0. Non è necessaria alcuna configurazione aggiuntiva per la lettura/scrittura su tabelle Hudi registrate da Lake Formation. Per leggere una tabella Hudi registrata, il ruolo IAM di AWS Glue job deve disporre dell'autorizzazione SELECT. Per scrivere su una tabella Hudi registrata, il ruolo IAM di AWS Glue job deve avere l'autorizzazione SUPER. Per ulteriori informazioni sulla gestione delle autorizzazioni di Lake Formation, consulta Concessione e revoca delle autorizzazioni del catalogo dati.

Utilizzo di una versione differente di Hudi

Per utilizzare una versione di Hudi non supportata da AWS Glue, specificate i vostri file JAR Hudi utilizzando il parametro --extra-jars job. Non includere hudi come valore per il parametro del processo --datalake-formats.

Esempio: scrivere una tabella Hudi su Amazon S3 e registrarla nel AWS Glue Data Catalog

Questo script di esempio dimostra come scrivere una tabella Hudi su Amazon S3 e registrarla nel AWS Glue Data Catalog. Per registrare la tabella, viene utilizzato lo strumento Hive Sync di Hudi.

Nota

Questo esempio richiede di impostare il parametro --enable-glue-datacatalog job per utilizzare AWS Glue Data Catalog come metastore Apache Spark Hive. Per ulteriori informazioni, consulta Utilizzo dei parametri del lavoro nei lavori AWS Glue.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.database.name": "<your_database_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.database.name" -> "<your_database_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

Esempio: leggere una tabella Hudi da Amazon S3 utilizzando il AWS Glue Data Catalog

In questo esempio viene letta la tabella Hudi che hai creato in Esempio: scrivere una tabella Hudi su Amazon S3 e registrarla nel AWS Glue Data Catalog da Amazon S3.

Nota

Questo esempio richiede di impostare il parametro --enable-glue-datacatalog job per utilizzare AWS Glue Data Catalog come metastore Apache Spark Hive. Per ulteriori informazioni, consulta Utilizzo dei parametri del lavoro nei lavori AWS Glue.

Python

Per questo esempio, usa il metodo GlueContext.create_data_frame.from_catalog().

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

Per questo esempio, usa il metodo getCatalogSource.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

Esempio: aggiornamento e inserimento di un DataFrame in una tabella Hudi in Amazon S3

Questo esempio utilizza il AWS Glue Data Catalog per inserire un DataFrame nella tabella Hudi in Esempio: scrivere una tabella Hudi su Amazon S3 e registrarla nel AWS Glue Data Catalog cui è stato creato.

Nota

Questo esempio richiede di impostare il parametro --enable-glue-datacatalog job per utilizzare AWS Glue Data Catalog come metastore Apache Spark Hive. Per ulteriori informazioni, consulta Utilizzo dei parametri del lavoro nei lavori AWS Glue.

Python

Per questo esempio, usa il metodo GlueContext.write_data_frame.from_catalog().

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.database.name": "<your_database_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

Per questo esempio, usa il metodo getCatalogSink.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.database.name" -> "<your_database_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

Esempio: lettura di una tabella Hudi da Amazon S3 tramite Spark

Questo esempio legge una tabella Hudi da Amazon S3 utilizzando l'API Spark. DataFrame

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

Esempio: scrittura di una tabella Hudi su Amazon S3 tramite Spark

In questo esempio viene scritta una tabella Hudi su Amazon S3 tramite Spark.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

Esempio: lettura e scrittura della tabella Hudi con il controllo delle autorizzazioni di Lake Formation

Questo esempio legge da e scrive su una tabella Hudi con il controllo delle autorizzazioni di Lake Formation.

  1. Crea una tabella Hudi e registrala in Lake Formation.

    1. Per abilitare il controllo delle autorizzazioni di Lake Formation, devi prima registrare il percorso della tabella Amazon S3 su Lake Formation. Per ulteriori informazioni, consulta la pagina Registrazione di una posizione Amazon S3. Puoi registrarlo dalla console di Lake Formation o utilizzando la AWS CLI:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Una volta registrata una posizione Amazon S3, qualsiasi tabella AWS Glue che punta alla posizione (o a una delle sue sedi secondarie) restituirà il valore del IsRegisteredWithLakeFormation parametro come true nella chiamata. GetTable

    2. Crea una tabella Hudi che punti al percorso registrato di Amazon S3 tramite l'API Spark DataFrame:

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.database.name': database_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. Concedi a Lake Formation l'autorizzazione per il ruolo IAM di AWS Glue job. Puoi concedere le autorizzazioni dalla console di Lake Formation o utilizzando la AWS CLI. Per ulteriori informazioni, consulta la pagina Concessione delle autorizzazioni alla tabella tramite la console di Lake Formation e il metodo delle risorse denominate

  3. Leggi la tabella Hudi registrata in Lake Formation. Il codice equivale a leggere una tabella Hudi non registrata. Tieni presente che il ruolo IAM di AWS Glue job deve disporre dell'autorizzazione SELECT affinché la lettura abbia esito positivo.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Scrivi sulla tabella Hudi registrata in Lake Formation. Il codice equivale a scrivere su una tabella Hudi non registrata. Tieni presente che il ruolo IAM di AWS Glue job deve disporre dell'autorizzazione SUPER affinché la scrittura abbia esito positivo.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.database.name" -> "<your_database_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)