AWS Glue에서 Hudi 프레임워크 사용 - AWS Glue

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS Glue에서 Hudi 프레임워크 사용

AWS Glue 3.0 이상에서는 데이터 레이크에 대한 Apache Hudi 프레임워크를 지원합니다. Hudi는 증분 데이터 처리 및 데이터 파이프라인 개발을 간소화하는 오픈 소스 데이터 레이크 스토리지 프레임워크입니다. 이 항목에서는 Hudi 테이블에서 데이터를 전송하거나 저장할 때 AWS Glue에서 데이터를 사용하는 데 사용할 수 있는 기능에 대해 설명합니다. Hudi에 대한 자세한 내용은 공식 Apache Hudi 설명서를 참조하세요.

AWS Glue를 사용하여 Amazon S3의 Hudi 테이블에서 읽기 및 쓰기 작업을 수행하거나 AWS Glue 데이터 카탈로그를 사용하여 Hudi 테이블로 작업할 수 있습니다. 삽입, 업데이트, 모든 Apache Spark 작업을 포함한 추가 작업도 지원됩니다.

참고

Apache Hudi 0.10.1 for AWS Glue 3.0은 Hudi 읽을 때 병합(MoR) 테이블을 지원하지 않습니다.

다음 표에는 각 AWS Glue 버전에 포함된 Hudi 버전이 나와 있습니다.

AWS Glue 버전 지원되는 Hudi 버전
4.0 0.12.1
3.0 0.10.1

AWS Glue가 지원하는 데이터 레이크 프레임워크에 대한 자세한 내용은 AWS Glue ETL 작업에서 데이터 레이크 프레임워크 사용 섹션을 참조하세요.

Hudi 활성화

Hudi for AWS Glue를 활성화하려면 다음 작업을 완료합니다.

  • hudi--datalake-formats 작업 파라미터의 값으로 지정합니다. 자세한 내용은 AWS Glue 작업 파라미터 섹션을 참조하세요.

  • AWS Glue 작업에 대한 --conf 키를 생성하고 다음 값으로 설정합니다. 또는 스크립트에서 SparkConf를 사용하여 다음 구성을 설정할 수 있습니다. 이러한 설정은 Apache Spark에서 Hudi 테이블을 올바르게 처리하는 데 도움이 됩니다.

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • Hudi에 대한 Lake Formation 권한 지원은 AWS Glue 4.0에서 기본으로 활성화되어 있습니다. Lake Formation에 등록된 Hudi 테이블을 읽고 쓰는 데 추가 구성이 필요하지 않습니다. 등록된 Hudi 테이블을 읽으려면 AWS Glue 작업 IAM 역할에 SELECT 권한이 있어야 합니다. 등록된 Hudi 테이블에 글을 쓰려면 AWS Glue 작업 IAM 역할에 SUPER 권한이 있어야 합니다. Lake Formation 권한 관리에 대해 자세히 알아보려면 Data Catalog 리소스에 대한 권한 부여 및 취소를 참조하십시오.

다른 Hudi 버전 사용

AWS Glue에서 지원하지 않는 Hudi 버전을 사용하려면 --extra-jars 작업 파라미터를 사용하여 고유한 Hudi JAR 파일을 지정하세요. --datalake-formats 작업 파라미터의 값으로 hudi를 포함하지 마세요.

예: Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 등록

이 예제 스크립트는 Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 테이블을 등록하는 방법을 보여줍니다. 이 예제에서는 Hudi Hive Sync 도구를 사용하여 테이블을 등록합니다.

참고

이 예에서는 AWS Glue 데이터 카탈로그를 Apache Spark Hive 메타스토어로 사용하기 위해 --enable-glue-datacatalog 작업 파라미터를 설정해야 합니다. 자세한 내용은 AWS Glue 작업 파라미터 단원을 참조하십시오.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

예: AWS Glue 데이터 카탈로그를 사용하여 Amazon S3에서 Hudi 테이블 읽기

이 예에서는 Amazon S3의 예: Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 등록에서 생성한 Hudi 테이블을 읽습니다.

참고

이 예에서는 AWS Glue 데이터 카탈로그를 Apache Spark Hive 메타스토어로 사용하기 위해 --enable-glue-datacatalog 작업 파라미터를 설정해야 합니다. 자세한 내용은 AWS Glue 작업 파라미터 단원을 참조하십시오.

Python

이 예에서는 GlueContext.create_data_frame.from_catalog() 메서드를 사용합니다.

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

이 예에서는 getCatalogSource 메서드를 사용합니다.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

예: Amazon S3의 Hudi 테이블에서 DataFrame 업데이트 및 삽입

이 예제에서는 AWS Glue 데이터 카탈로그를 사용하여 예: Amazon S3에 Hudi 테이블을 작성하고 AWS Glue 데이터 카탈로그에 등록에서 생성한 Hudi 테이블에 DataFrame을 삽입합니다.

참고

이 예에서는 AWS Glue 데이터 카탈로그를 Apache Spark Hive 메타스토어로 사용하기 위해 --enable-glue-datacatalog 작업 파라미터를 설정해야 합니다. 자세한 내용은 AWS Glue 작업 파라미터 단원을 참조하십시오.

Python

이 예에서는 GlueContext.write_data_frame.from_catalog() 메서드를 사용합니다.

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

이 예에서는 getCatalogSink 메서드를 사용합니다.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

예: Spark를 사용하여 Amazon S3에서 Hudi 테이블 읽기

이 예에서는 Spark DataFrame API를 사용하여 Amazon S3에서 Hudi 테이블을 읽습니다.

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

예: Spark를 사용하여 Amazon S3에 Hudi 테이블 쓰기

이 예에서는 Spark를 사용하여 Amazon S3에 Hudi 테이블을 씁니다.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

예: Lake Formation 권한 제어를 사용하여 Hudi 테이블 읽기 및 쓰기

이 예제에서는 Lake Formation 권한 제어가 있는 Hudi 테이블을 읽고 씁니다.

  1. Hudi 테이블을 생성하여 Lake Formation에 등록합니다.

    1. Lake Formation 권한 제어를 활성화하려면 먼저 Lake Formation에 Amazon S3 경로를 등록해야 합니다. 자세한 내용을 알아보려면 Registering an Amazon S3 location(Amazon S3 위치 등록)을 참조하세요. Lake Formation 콘솔에서 등록하거나 AWS CLI를 사용하여 등록할 수 있습니다.

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Amazon S3 위치를 등록하면 해당 위치(또는 하위 위치)를 가리키는 AWS Glue 테이블이 GetTable 호출 시 IsRegisteredWithLakeFormation 파라미터 값을 true로 반환합니다.

    2. Spark 데이터프레임 API를 통해 등록된 Amazon S3 경로를 가리키는 Hudi 테이블을 생성합니다.

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. AWSGlue 작업 IAM 역할에 Lake Formation 권한을 부여하십시오. Lake Formation 콘솔에서 권한을 부여하거나 AWS CLI를 사용하여 권한을 부여할 수 있습니다. 자세한 내용을 알아보려면 Lake Formation 콘솔 및 명명된 리소스 방법을 사용하여 데이터베이스 권한 부여를 참조하십시오

  3. Lake Formation에 등록된 Hudi 테이블을 읽어보십시오. 코드는 등록되지 않은 Hudi 테이블을 읽는 것과 같습니다. 읽기에 성공하려면 AWS Glue 작업 IAM 역할에 SELECT 권한이 있어야 한다는 점에 유의하십시오.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Lake Formation에 등록된 Hudi 테이블에 글을 작성하십시오. 코드는 등록되지 않은 Hudi 테이블에 쓰는 것과 같습니다. 참고로 AWS Glue 작업 IAM 역할에 SUPER 권한이 있어야 쓰기가 성공합니다.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)