AWS Glue での Hudi フレームワークの使用 - AWS Glue

AWS Glue での Hudi フレームワークの使用

AWS Glue 3.0 以降では、データレイク向けに Apache Hudi フレームワークが利用できます。Hudi は、増分データ処理とデータパイプラインの開発を簡素化する、オープンソースのデータレイク用ストレージフレームワークです。このトピックでは、AWS Glue 内でデータを Hudi テーブルに転送または保存する際に利用可能な、各機能について説明します。Hudi の詳細については、公式のApache Hudi ドキュメントを参照してください。

AWS Glue により、Amazon S3 内にある Hudi テーブルの読み取りおよび書き込み操作を実行できます。あるいは、AWS Glue データカタログを使用して、Hudi テーブルを操作することも可能です。挿入、更新、および、すべての Apache Spark オペレーションを含む操作も、追加でサポートされています。

注記

Apache Hudi 0.10.1 for AWS Glue 3.0 では、Hudi Merge on Read (MoR) テーブルはサポートされません。

次の表に、AWS Glue の各バージョンに含まれている、Hudi のバージョンを一覧で示します。

AWS Glue のバージョン サポートされる Hudi バージョン
4.0 0.12.1
3.0 0.10.1

AWS Glue がサポートするデータレイクフレームワークの詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。

Hudi の有効化

AWS Glue で Hudi を有効化するには、以下のタスクを実行します。

  • hudi--datalake-formats のジョブパラメータの値として指定します。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。

  • AWS Glue ジョブ用に、--conf という名前でキーを作成し、それに次の値を設定します。または、スクリプトで SparkConf を使用して、次の構成を設定することもできます。これらの設定は、Apache Spark が Hudi テーブルを適切に処理するために役立ちます。

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • Hudi に関する Lake Formation の許可のサポートは、AWS Glue 4.0 のためにデフォルトで有効になっています。Lake Formation に登録された Hudi テーブルの読み取り/書き込みに追加の設定は必要ありません。登録された Hudi テーブルを読み取るには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要です。登録された Hudi テーブルに書き込むには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要です。Lake Formation の許可の管理の詳細については、「Data Catalog リソースに対する許可の付与と取り消し」を参照してください。

別の Hudi バージョンの使用

AWS Glue でサポートされないバージョンの Hudi を使用するには、--extra-jars job パラメーターにより独自の Hudi JAR ファイルを指定します。--datalake-formats ジョブパラメータの値として、hudi は含めないでください。

例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する

このスクリプト例では、Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する方法を示します。この例では、テーブルを登録するために、Hudi Hive Sync ツールを使用します。

注記

この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

例: AWS Glue データカタログを使用した Amazon S3 からの Hudi テーブルの読み取り

この例では、「例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する」で作成した Hudi テーブルを Amazon S3 から読み取ります。

注記

この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。

Python

この例では、GlueContext.create_data_frame.from_catalog() メソッドを使用します。

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

この例では getCatalogSource メソッドを使用します。

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

例: DataFrame を更新して、Amazon S3 内にある Hudi テーブルに挿入する

この例では、「例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する」で作成した Hudi テーブルに対し、AWS Glue データカタログを使用して DataFrame を挿入します。

注記

この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。

Python

この例では、GlueContext.write_data_frame.from_catalog() メソッドを使用します。

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

この例では getCatalogSink メソッドを使用します。

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

例: Spark を使用した Amazon S3 からの Hudi テーブルの読み取り

この例では、Spark DataFrame API を使用して Amazon S3 から Hudi テーブルを読み取ります。

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

例: スパークを使用した Amazon S3 への Hudi テーブルの書き込み

この例では、Spark を使用して Amazon S3 に Hudi テーブルを書き込みます。

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

例: Lake Formation の許可のコントロールを使用した Hudi テーブルの読み取りおよび書き込み

この例では、Lake Formation の許可のコントロールを使用して Hudi テーブルの読み取りと書き込みを行います。

  1. Hudi テーブルを作成して Lake Formation に登録します。

    1. Lake Formation の許可のコントロールを有効にするには、まずテーブルの Amazon S3 パスを Lake Formation に登録する必要があります。詳細については、「Amazon S3 ロケーションの登録」を参照してください。Lake Formation コンソールから、または AWS CLI を使用して登録できます。

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Amazon S3 の場所が登録されると、その場所 (またはその子である場所) をポイントするすべての AWS Glue テーブルが、GetTable 呼び出しで IsRegisteredWithLakeFormation パラメータの値を true として返します。

    2. Spark データフレーム API を介して登録された Amazon S3 パスをポイントする Hudi テーブルを作成します。

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. AWS Glue ジョブ IAM ロールに Lake Formation の許可を付与します。Lake Formation コンソールから、または AWS CLI を使用して許可を付与できます。詳細については、「Lake Formation コンソールと名前付きリソース方式を使用したテーブル許可の付与」を参照してください。

  3. Lake Formation に登録されている Hudi テーブルを読み取ります。このコードは、未登録の Hudi テーブルを読み取る場合と同じです。読み取りを成功させるには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要であることに留意してください。

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Lake Formation に登録されている Hudi テーブルに書き込みます。このコードは、未登録の Hudi テーブルに書き込む場合と同じです。書き込みを成功させるには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要であることに留意してください。

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)