AWS Glue 3.0 이상에서는 데이터 레이크에 대한 Apache Hudi 프레임워크를 지원합니다. Hudi는 증분 데이터 처리 및 데이터 파이프라인 개발을 간소화하는 오픈 소스 데이터 레이크 스토리지 프레임워크입니다. 이 항목에서는 Hudi 테이블에서 데이터를 전송하거나 저장할 때 AWS Glue에서 데이터를 사용하는 데 사용할 수 있는 기능에 대해 설명합니다. Hudi에 대한 자세한 내용은 공식 Apache Hudi 설명서를 참조하세요.
AWS Glue를 사용하여 Amazon S3의 Hudi 테이블에서 읽기 및 쓰기 작업을 수행하거나 AWS Glue 데이터 카탈로그를 사용하여 Hudi 테이블로 작업할 수 있습니다. 삽입, 업데이트, 모든 Apache Spark 작업을 포함한 추가 작업도 지원됩니다.
Apache Hudi 0.10.1 for AWS Glue 3.0은 Hudi 읽을 때 병합(MoR) 테이블을 지원하지 않습니다.
다음 표에는 각 AWS Glue 버전에 포함된 Hudi 버전이 나와 있습니다.
AWS Glue 버전 |
지원되는 Hudi 버전 |
4.0 |
0.12.1 |
3.0 |
0.10.1 |
AWS Glue가 지원하는 데이터 레이크 프레임워크에 대한 자세한 내용은 AWS Glue ETL 작업에서 데이터 레이크 프레임워크 사용 섹션을 참조하세요.
Hudi for AWS Glue를 활성화하려면 다음 작업을 완료합니다.
-
hudi
를 --datalake-formats
작업 파라미터의 값으로 지정합니다. 자세한 내용은 AWS Glue 작업 파라미터 섹션을 참조하세요.
-
AWS Glue 작업에 대한 --conf
키를 생성하고 다음 값으로 설정합니다. 또는 스크립트에서 SparkConf
를 사용하여 다음 구성을 설정할 수 있습니다. 이러한 설정은 Apache Spark에서 Hudi 테이블을 올바르게 처리하는 데 도움이 됩니다.
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
다른 Hudi 버전 사용
AWS Glue에서 지원하지 않는 Hudi 버전을 사용하려면 --extra-jars
작업 파라미터를 사용하여 고유한 Hudi JAR 파일을 지정하세요. --datalake-formats
작업 파라미터의 값으로 hudi
를 포함하지 마세요.
이 예제 스크립트는 Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 테이블을 등록하는 방법을 보여줍니다. 이 예제에서는 Hudi Hive Sync 도구를 사용하여 테이블을 등록합니다.
이 예에서는 AWS Glue 데이터 카탈로그를 Apache Spark Hive 메타스토어로 사용하기 위해 --enable-glue-datacatalog
작업 파라미터를 설정해야 합니다. 자세한 내용은 AWS Glue 작업 파라미터 단원을 참조하세요.
- Python
-
# Example: Create a Hudi table from a DataFrame
# and register the table to Glue Data Catalog
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"path": "s3://<s3Path/>
"
}
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save()
- Scala
-
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog
val additionalOptions = Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms",
"path" -> "s3://<s3Path/>
")
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()
이 예에서는 Amazon S3의 예: Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 등록에서 생성한 Hudi 테이블을 읽습니다.
이 예에서는 AWS Glue 데이터 카탈로그를 Apache Spark Hive 메타스토어로 사용하기 위해 --enable-glue-datacatalog
작업 파라미터를 설정해야 합니다. 자세한 내용은 AWS Glue 작업 파라미터 단원을 참조하세요.
- Python
-
이 예에서는 GlueContext.create_data_frame.from_catalog()
메서드를 사용합니다.
# Example: Read a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
dataFrame = glueContext.create_data_frame.from_catalog(
database = "<your_database_name>
",
table_name = "<your_table_name>
"
)
- Scala
-
이 예에서는 getCatalogSource 메서드를 사용합니다.
// Example: Read a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val dataFrame = glueContext.getCatalogSource(
database = "<your_database_name>
",
tableName = "<your_table_name>
"
).getDataFrame()
}
}
이 예제에서는 AWS Glue 데이터 카탈로그를 사용하여 예: Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 등록에서 생성한 Hudi 테이블에 DataFrame을 삽입합니다.
이 예에서는 AWS Glue 데이터 카탈로그를 Apache Spark Hive 메타스토어로 사용하기 위해 --enable-glue-datacatalog
작업 파라미터를 설정해야 합니다. 자세한 내용은 AWS Glue 작업 파라미터 단원을 참조하세요.
- Python
-
이 예에서는 GlueContext.write_data_frame.from_catalog()
메서드를 사용합니다.
# Example: Upsert a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.write_data_frame.from_catalog(
frame = dataFrame,
database = "<your_database_name>
",
table_name = "<your_table_name>
",
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms"
}
)
- Scala
-
이 예에서는 getCatalogSink 메서드를 사용합니다.
// Example: Upsert a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.getCatalogSink("<your_database_name>
", "<your_table_name>
",
additionalOptions = JsonOptions(Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms"
)))
.writeDataFrame(dataFrame, glueContext)
}
}
이 예에서는 Spark DataFrame API를 사용하여 Amazon S3에서 Hudi 테이블을 읽습니다.
- Python
-
# Example: Read a Hudi table from S3 using a Spark DataFrame
dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
- Scala
-
// Example: Read a Hudi table from S3 using a Spark DataFrame
val dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
이 예에서는 Spark를 사용하여 Amazon S3에 Hudi 테이블을 씁니다.
- Python
-
# Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save("s3://<s3Path/>
)
- Scala
-
// Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("overwrite")
.save("s3://<s3path/>
")