AWS Glue 3.0 and later supports Apache Hudi framework for data lakes. Hudi is an open-source
data lake storage framework that simplifies incremental data processing and data pipeline
development. This topic covers available features for using your data in AWS Glue when you
transport or store your data in a Hudi table. To learn more about Hudi, see the official
Apache Hudi documentation.
You can use AWS Glue to perform read and write operations on Hudi tables in Amazon S3, or work
with Hudi tables using the AWS Glue Data Catalog. Additional operations including insert, update, and
all of the Apache Spark
operations are also supported.
Apache Hudi 0.10.1 for AWS Glue 3.0 doesn't support Hudi Merge on Read (MoR)
tables.
The following table lists the Hudi version that is included in each AWS Glue version.
AWS Glue version |
Supported Hudi version |
4.0 |
0.12.1 |
3.0 |
0.10.1 |
To learn more about the data lake frameworks that AWS Glue supports, see Using data lake
frameworks with AWS Glue ETL jobs.
To enable Hudi for AWS Glue, complete the following tasks:
-
Specify hudi
as a value for the --datalake-formats
job parameter. For more information, see AWS Glue job
parameters.
-
Create a key named --conf
for your AWS Glue job, and set it to the
following value. Alternatively, you can set the following configuration using
SparkConf
in your script. These settings help Apache Spark
correctly handle Hudi tables.
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
Using a different Hudi version
To use a version of Hudi that AWS Glue doesn't support, specify your own Hudi JAR files
using the --extra-jars
job parameter. Do not include hudi
as a
value for the --datalake-formats
job parameter.
This example script demonstrates how to write a Hudi table to Amazon S3 and
register the table to the AWS Glue Data Catalog. The example uses the Hudi Hive Sync tool to
register the table.
This example requires you to set the --enable-glue-datacatalog
job parameter in order to use the AWS Glue Data Catalog as an Apache Spark Hive metastore.
To learn more, see AWS Glue job
parameters.
- Python
-
# Example: Create a Hudi table from a DataFrame
# and register the table to Glue Data Catalog
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"path": "s3://<s3Path/>
"
}
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save()
- Scala
-
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog
val additionalOptions = Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms",
"path" -> "s3://<s3Path/>
")
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()
This example reads the Hudi table that you created in the Example: Write a Hudi table
to Amazon S3 and register it in the AWS Glue Data Catalog from
Amazon S3.
This example requires you to set the --enable-glue-datacatalog
job parameter in order to use the AWS Glue Data Catalog as an Apache Spark Hive metastore.
To learn more, see AWS Glue job
parameters.
- Python
-
For this example, use the GlueContext.create_data_frame.from_catalog()
method.
# Example: Read a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
dataFrame = glueContext.create_data_frame.from_catalog(
database = "<your_database_name>
",
table_name = "<your_table_name>
"
)
- Scala
-
For this example, use the getCatalogSource method.
// Example: Read a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val dataFrame = glueContext.getCatalogSource(
database = "<your_database_name>
",
tableName = "<your_table_name>
"
).getDataFrame()
}
}
This example uses the AWS Glue Data Catalog to insert a DataFrame into the Hudi table that you
created in Example: Write a Hudi table
to Amazon S3 and register it in the AWS Glue Data Catalog.
This example requires you to set the --enable-glue-datacatalog
job parameter in order to use the AWS Glue Data Catalog as an Apache Spark Hive metastore.
To learn more, see AWS Glue job
parameters.
- Python
-
For this example, use the GlueContext.write_data_frame.from_catalog()
method.
# Example: Upsert a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.write_data_frame.from_catalog(
frame = dataFrame,
database = "<your_database_name>
",
table_name = "<your_table_name>
",
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms"
}
)
- Scala
-
For this example, use the getCatalogSink method.
// Example: Upsert a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.getCatalogSink("<your_database_name>
", "<your_table_name>
",
additionalOptions = JsonOptions(Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms"
)))
.writeDataFrame(dataFrame, glueContext)
}
}
This example reads a Hudi table from Amazon S3 using the Spark DataFrame
API.
- Python
-
# Example: Read a Hudi table from S3 using a Spark DataFrame
dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
- Scala
-
// Example: Read a Hudi table from S3 using a Spark DataFrame
val dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
This example writes a Hudi table to Amazon S3 using Spark.
- Python
-
# Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save("s3://<s3Path/>
)
- Scala
-
// Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("overwrite")
.save("s3://<s3path/>
")