AWS Glue 3.0 以降では、データレイク向けに Apache Hudi フレームワークが利用できます。Hudi は、増分データ処理とデータパイプラインの開発を簡素化する、オープンソースのデータレイク用ストレージフレームワークです。このトピックでは、AWS Glue 内でデータを Hudi テーブルに転送または保存する際に利用可能な、各機能について説明します。Hudi の詳細については、公式のApache Hudi ドキュメントを参照してください。
AWS Glue により、Amazon S3 内にある Hudi テーブルの読み取りおよび書き込み操作を実行できます。あるいは、AWS Glue データカタログを使用して、Hudi テーブルを操作することも可能です。挿入、更新、および、すべての Apache Spark オペレーションを含む操作も、追加でサポートされています。
Apache Hudi 0.10.1 for AWS Glue 3.0 では、Hudi Merge on Read (MoR) テーブルはサポートされません。
次の表に、AWS Glue の各バージョンに含まれている、Hudi のバージョンを一覧で示します。
AWS Glue のバージョン |
サポートされる Hudi バージョン |
4.0 |
0.12.1 |
3.0 |
0.10.1 |
AWS Glue がサポートするデータレイクフレームワークの詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。
AWS Glue で Hudi を有効化するには、以下のタスクを実行します。
-
hudi
を --datalake-formats
のジョブパラメータの値として指定します。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。
-
AWS Glue ジョブ用に、--conf
という名前でキーを作成し、それに次の値を設定します。または、スクリプトで SparkConf
を使用して、次の構成を設定することもできます。これらの設定は、Apache Spark が Hudi テーブルを適切に処理するために役立ちます。
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
Hudi に関する Lake Formation の許可のサポートは、AWS Glue 4.0 のためにデフォルトで有効になっています。Lake Formation に登録された Hudi テーブルの読み取り/書き込みに追加の設定は必要ありません。登録された Hudi テーブルを読み取るには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要です。登録された Hudi テーブルに書き込むには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要です。Lake Formation の許可の管理の詳細については、「Data Catalog リソースに対する許可の付与と取り消し」を参照してください。
別の Hudi バージョンの使用
AWS Glue でサポートされないバージョンの Hudi を使用するには、--extra-jars
job パラメーターにより独自の Hudi JAR ファイルを指定します。--datalake-formats
ジョブパラメータの値として、hudi
は含めないでください。
このスクリプト例では、Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する方法を示します。この例では、テーブルを登録するために、Hudi Hive Sync ツールを使用します。
この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog
ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。
- Python
-
# Example: Create a Hudi table from a DataFrame
# and register the table to Glue Data Catalog
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"path": "s3://<s3Path/>
"
}
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save()
- Scala
-
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog
val additionalOptions = Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms",
"path" -> "s3://<s3Path/>
")
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()
この例では、「例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する」で作成した Hudi テーブルを Amazon S3 から読み取ります。
この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog
ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。
- Python
-
この例では、GlueContext.create_data_frame.from_catalog()
メソッドを使用します。
# Example: Read a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
dataFrame = glueContext.create_data_frame.from_catalog(
database = "<your_database_name>
",
table_name = "<your_table_name>
"
)
- Scala
-
この例では getCatalogSource メソッドを使用します。
// Example: Read a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val dataFrame = glueContext.getCatalogSource(
database = "<your_database_name>
",
tableName = "<your_table_name>
"
).getDataFrame()
}
}
この例では、「例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する」で作成した Hudi テーブルに対し、AWS Glue データカタログを使用して DataFrame を挿入します。
この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog
ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。
- Python
-
この例では、GlueContext.write_data_frame.from_catalog()
メソッドを使用します。
# Example: Upsert a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.write_data_frame.from_catalog(
frame = dataFrame,
database = "<your_database_name>
",
table_name = "<your_table_name>
",
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms"
}
)
- Scala
-
この例では getCatalogSink メソッドを使用します。
// Example: Upsert a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.getCatalogSink("<your_database_name>
", "<your_table_name>
",
additionalOptions = JsonOptions(Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms"
)))
.writeDataFrame(dataFrame, glueContext)
}
}
この例では、Spark DataFrame API を使用して Amazon S3 から Hudi テーブルを読み取ります。
- Python
-
# Example: Read a Hudi table from S3 using a Spark DataFrame
dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
- Scala
-
// Example: Read a Hudi table from S3 using a Spark DataFrame
val dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
この例では、Spark を使用して Amazon S3 に Hudi テーブルを書き込みます。
- Python
-
# Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save("s3://<s3Path/>
)
- Scala
-
// Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("overwrite")
.save("s3://<s3path/>
")
この例では、Lake Formation の許可のコントロールを使用して Hudi テーブルの読み取りと書き込みを行います。
Hudi テーブルを作成して Lake Formation に登録します。
Lake Formation の許可のコントロールを有効にするには、まずテーブルの Amazon S3 パスを Lake Formation に登録する必要があります。詳細については、「Amazon S3 ロケーションの登録」を参照してください。Lake Formation コンソールから、または AWS CLI を使用して登録できます。
aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>
Amazon S3 の場所が登録されると、その場所 (またはその子である場所) をポイントするすべての AWS Glue テーブルが、GetTable
呼び出しで IsRegisteredWithLakeFormation
パラメータの値を true として返します。
Spark データフレーム API を介して登録された Amazon S3 パスをポイントする Hudi テーブルを作成します。
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'product_id',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
'path': <S3_TABLE_LOCATION>,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode': 'hms'
}
df_products.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save()
AWS Glue ジョブ IAM ロールに Lake Formation の許可を付与します。Lake Formation コンソールから、または AWS CLI を使用して許可を付与できます。詳細については、「Lake Formation コンソールと名前付きリソース方式を使用したテーブル許可の付与」を参照してください。
Lake Formation に登録されている Hudi テーブルを読み取ります。このコードは、未登録の Hudi テーブルを読み取る場合と同じです。読み取りを成功させるには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要であることに留意してください。
val dataFrame = glueContext.getCatalogSource(
database = "<your_database_name>",
tableName = "<your_table_name>"
).getDataFrame()
Lake Formation に登録されている Hudi テーブルに書き込みます。このコードは、未登録の Hudi テーブルに書き込む場合と同じです。書き込みを成功させるには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要であることに留意してください。
glueContext.getCatalogSink("<your_database_name>", "<your_table_name>",
additionalOptions = JsonOptions(Map(
"hoodie.table.name" -> "<your_table_name>",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "<write_operation>",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms"
)))
.writeDataFrame(dataFrame, glueContext)