Hudi 데이터 세트 작업 - 아마존 EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Hudi 데이터 세트 작업

Hudi는 Spark를 통해 Hudi 데이터 세트에서의 데이터 삽입, 업데이트 및 삭제를 지원합니다. 자세한 내용은 Apache Hudi 설명서에서 Writing Hudi tables를 참조하세요.

다음 예는 대화형 Spark 셸을 실행하거나, Spark 제출을 사용하거나, Amazon EMR 노트북을 사용하여 Amazon에서 Hudi를 사용하는 방법을 보여줍니다. EMR Hudi DeltaStreamer 유틸리티 또는 기타 도구를 사용하여 데이터세트에 쓸 수도 있습니다. 이 섹션 전체의 예제는 기본 SSH 사용자로 마스터 노드에 연결된 상태에서 Spark 셸을 사용하여 데이터세트를 작업하는 방법을 보여줍니다. hadoop

Amazon EMR 6.7.0 이상을 spark-shell 실행하거나 spark-sql 사용할 때는 다음 명령을 전달하십시오. spark-submit

참고

Amazon EMR 6.7.0은 아파치 Hudi 0.11.0-amzn-0을 사용하며, 이는 이전 후디 버전에 비해 크게 개선되었습니다. 자세한 내용은 Apache Hudi 0.11.0 Migration Guide를 참조하세요. 이 탭의 예제에는 이러한 변경 사항이 반영되어 있습니다.

프라이머리 노드에서 Spark 쉘을 여는 방법
  1. 를 사용하여 기본 노드에 연결합니다SSH. 자세한 내용은 Amazon EMR 관리 안내서를 사용하여 SSH 기본 노드에 연결을 참조하십시오.

  2. Spark 셸을 시작하려면 다음 명령을 입력합니다. PySpark 셸을 사용하려면 다음을 대체하십시오.spark-shell with pyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Amazon EMR 6.6.x 또는 이전 버전을 spark-shell 실행하거나 spark-sql 사용하는 경우 다음 명령을 전달하십시오. spark-submit

참고
프라이머리 노드에서 Spark 쉘을 여는 방법
  1. 를 사용하여 기본 노드에 연결합니다SSH. 자세한 내용은 Amazon EMR 관리 안내서를 사용하여 SSH 기본 노드에 연결을 참조하십시오.

  2. Spark 셸을 시작하려면 다음 명령을 입력합니다. PySpark 셸을 사용하려면 다음을 대체하십시오.spark-shell with pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Amazon EMR Notebooks에서 Hudi를 사용하려면 먼저 로컬 파일 시스템에서 노트북 클러스터의 마스터 노드로 Hudi jar 파일을 복사해야 합니다. HDFS 그런 다음 노트북 편집기를 사용하여 노트북이 Hudi를 EMR 사용하도록 구성합니다.

Amazon EMR 노트북과 함께 Hudi를 사용하려면
  1. Amazon EMR 노트북용 클러스터를 생성하고 실행합니다. 자세한 내용은 Amazon EMR관리 안내서의 노트북용 Amazon EMR 클러스터 생성을 참조하십시오.

  2. 를 사용하여 SSH 클러스터의 마스터 노드에 연결한 다음 로컬 파일 시스템의 jar 파일을 다음 HDFS 예와 같이 복사합니다. 이 예제에서는 파일 관리를 명확히 하기 HDFS 위해 디렉터리를 생성합니다. 원하는 경우 에서 HDFS 직접 목적지를 선택할 수 있습니다.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. 노트북 편집기를 열고 다음 예의 코드를 입력하고 실행합니다.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Amazon EMR Notebooks에서 Hudi를 사용하려면 먼저 로컬 파일 시스템에서 노트북 클러스터의 마스터 노드로 Hudi jar 파일을 복사해야 합니다. HDFS 그런 다음 노트북 편집기를 사용하여 노트북이 Hudi를 EMR 사용하도록 구성합니다.

Amazon EMR 노트북과 함께 Hudi를 사용하려면
  1. Amazon EMR 노트북용 클러스터를 생성하고 실행합니다. 자세한 내용은 Amazon EMR관리 안내서의 노트북용 Amazon EMR 클러스터 생성을 참조하십시오.

  2. 를 사용하여 SSH 클러스터의 마스터 노드에 연결한 다음 로컬 파일 시스템의 jar 파일을 다음 HDFS 예와 같이 복사합니다. 이 예제에서는 파일 관리를 명확히 하기 HDFS 위해 디렉터리를 생성합니다. 원하는 경우 에서 HDFS 직접 목적지를 선택할 수 있습니다.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. 노트북 편집기를 열고 다음 예의 코드를 입력하고 실행합니다.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Hudi용 Spark 세션 초기화

Scala를 사용할 때는 Spark 세션에서 다음 클래스를 가져와야 합니다. Spark 세션당 한 번 이 작업을 수행하면 됩니다.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Hudi 데이터 세트에 쓰기

다음 예시는 a를 만들고 이를 Hudi 데이터셋으로 작성하는 방법을 보여줍니다. DataFrame

참고

코드 샘플을 Spark 셸에 붙여넣으려면 프롬프트에 :paste를 입력하고 예제를 붙여넣은 다음 CTRL + D를 누릅니다.

Hudi 데이터세트에 DataFrame a를 쓸 때마다 다음을 지정해야 합니다. DataSourceWriteOptions 이러한 옵션 대부분이 쓰기 작업 간에 동일할 수 있습니다. 다음 예에서는 hudiOptions 변수를 사용하여 일반 옵션을 지정합니다. 이 변수는 그 다음 예제에서도 사용합니다.

참고

Amazon EMR 6.7.0은 아파치 Hudi 0.11.0-amzn-0을 사용하며, 이는 이전 후디 버전에 비해 크게 개선되었습니다. 자세한 내용은 Apache Hudi 0.11.0 Migration Guide를 참조하세요. 이 탭의 예제에는 이러한 변경 사항이 반영되어 있습니다.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')
참고

코드 예제 및 알림에서 Hudi 대신 'hoodie'가 표시될 수 있습니다. Hudi 코드베이스는 예전의 'hoodie' 철자를 널리 사용합니다.

DataSourceWriteOptions 후디를 위한 레퍼런스
옵션 설명

TABLE_NAME

데이터 세트를 등록할 테이블 이름입니다.

TABLE_TYPE_OPT_KEY

선택 사항입니다. 데이터 세트를 "COPY_ON_WRITE" 또는 "MERGE_ON_READ"로 생성할지 여부를 지정합니다. 기본값은 "COPY_ON_WRITE"입니다.

RECORDKEY_FIELD_OPT_KEY

값의 레코드 키 필드는 recordKeyHoodieKey 구성 요소로 사용됩니다. 실제 값은 필드 값의 .toString()을 호출하여 얻을 수 있습니다. 중첩 필드는 점 표기법을 사용하여 지정할 수 있습니다(예: a.b.c).

PARTITIONPATH_FIELD_OPT_KEY

값의 파티션 경로 필드는 HoodieKeypartitionPath 구성 요소로 사용됩니다. 실제 값은 필드 값의 .toString()을 호출하여 얻을 수 있습니다.

PRECOMBINE_FIELD_OPT_KEY

실제로 쓰기 전에 미리 결합하는 데 사용되는 필드입니다. 두 레코드에 동일한 키 값이 있는 경우 Hudi는 Object.compareTo(..)에 의해 결정된 사전 결합 필드에 대해 가장 큰 값을 가진 레코드를 선택합니다.

다음 옵션은 메타스토어에 Hudi 데이터 세트 테이블을 등록하는 데에만 필요합니다. Hudi 데이터 세트를 Hive 메타스토어의 테이블로 등록하지 않은 경우 이러한 옵션이 필요하지 않습니다.

DataSourceWriteOptions 하이브에 대한 레퍼런스
옵션 설명

HIVE_DATABASE_OPT_KEY

동기화할 Hive 데이터베이스입니다. 기본값은 "default"입니다.

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

파티션 필드 값을 Hive 파티션 열로 추출하는 데 사용되는 클래스입니다.

HIVE_PARTITION_FIELDS_OPT_KEY

Hive 파티션 열을 결정하는 데 사용할 데이터 세트의 필드입니다.

HIVE_SYNC_ENABLED_OPT_KEY

"true"로 설정하면 데이터 세트를 Apache Hive 메타스토어에 등록합니다. 기본값은 "false"입니다.

HIVE_TABLE_OPT_KEY

필수 사항입니다. 동기화할 Hive의 테이블의 이름입니다. 예: "my_hudi_table_cow".

HIVE_USER_OPT_KEY

선택 사항입니다. 동기화할 때 사용할 Hive 사용자 이름입니다. 예: "hadoop".

HIVE_PASS_OPT_KEY

선택 사항입니다. HIVE_USER_OPT_KEY에 의해 지정된 사용자의 Hive 암호입니다.

HIVE_URL_OPT_KEY

더 하이브 메타스토어. URL

데이터 업서트

다음 예제는 a를 작성하여 데이터를 업서트하는 방법을 보여줍니다. DataFrame 이전의 삽입 예제와 달리 OPERATION_OPT_KEY 값은 UPSERT_OPERATION_OPT_VAL로 설정됩니다. 또한 레코드가 추가되어야 함을 나타내기 위해 .mode(SaveMode.Append)가 지정됩니다.

참고

Amazon EMR 6.7.0은 아파치 Hudi 0.11.0-amzn-0을 사용하며, 이는 이전 후디 버전에 비해 크게 개선되었습니다. 자세한 내용은 Apache Hudi 0.11.0 Migration Guide를 참조하세요. 이 탭의 예제에는 이러한 변경 사항이 반영되어 있습니다.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

레코드 삭제

레코드를 하드 삭제하려면 빈 페이로드를 업서트할 수 있습니다. 이 경우 PAYLOAD_CLASS_OPT_KEY 옵션은 EmptyHoodieRecordPayload 클래스를 지정합니다. 이 예제에서는 upsert 예제에서 사용한 것과 동일한 DataFrame 레코드를 사용하여 동일한 레코드를 지정합니다. updateDF

참고

Amazon EMR 6.7.0은 아파치 Hudi 0.11.0-amzn-0을 사용하며, 이는 이전 후디 버전에 비해 크게 개선되었습니다. 자세한 내용은 Apache Hudi 0.11.0 Migration Guide를 참조하세요. 이 탭의 예제에는 이러한 변경 사항이 반영되어 있습니다.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

제출한 데이터 세트의 모든 레코드를 제거하도록 OPERATION_OPT_KEY DELETE_OPERATION_OPT_VAL로 설정하여 데이터를 하드 삭제할 수도 있습니다. 소프트 삭제 수행 지침 및 Hudi 테이블에 저장된 데이터 삭제에 대한 자세한 내용은 Apache Hudi 설명서에서 Deletes를 참조하세요.

Hudi 데이터 세트에서 읽기

Hudi는 현재 특정 시점에서 데이터를 검색하기 위해 기본적으로 스냅샷 쿼리를 수행합니다. 다음은 Hudi 데이터 세트에 쓰기에서 S3에 기록된 데이터 세트를 쿼리하는 예제입니다. Replace s3://DOC-EXAMPLE-BUCKET/myhudidataset 테이블 경로를 사용하여 각 파티션 수준에 와일드카드 별표를 추가하고 별표 1개를 추가합니다. 이 예제에서는 파티션 수준이 하나이므로 와일드카드 기호를 두 개 추가했습니다.

참고

Amazon EMR 6.7.0은 아파치 Hudi 0.11.0-amzn-0을 사용하며, 이는 이전 후디 버전에 비해 크게 개선되었습니다. 자세한 내용은 Apache Hudi 0.11.0 Migration Guide를 참조하세요. 이 탭의 예제에는 이러한 변경 사항이 반영되어 있습니다.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset' + '/*/*') snapshotQueryDF.show()

증분 쿼리

Hudi에서 증분 쿼리를 수행하여 지정된 커밋 타임스탬프 이후 변경된 레코드 스트림을 가져올 수도 있습니다. 이렇게 하려면 QUERY_TYPE_OPT_KEY 필드를 QUERY_TYPE_INCREMENTAL_OPT_VAL로 설정합니다. 그런 다음 BEGIN_INSTANTTIME_OPT_KEY에 대한 값을 추가하여 지정된 시간 이후에 기록된 모든 레코드를 가져옵니다. 증분 쿼리는 변경된 레코드만 처리하므로 배치 쿼리보다 보통 10배 더 효율적입니다.

증분 쿼리를 수행할 때는 스냅샷 쿼리에 와일드카드 별표를 사용하지 않고 루트(기본) 테이블 경로를 사용합니다.

참고

Presto는 증분 쿼리를 지원하지 않습니다.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset') incQueryDF.show()

Hudi 데이터 세트에서 읽는 방법에 대한 자세한 내용은 Apache Hudi 설명서에서 Querying Hudi tables를 참조하세요.