Hudi 데이터 세트 작업
Hudi는 Spark를 통해 Hudi 데이터 세트에서의 데이터 삽입, 업데이트 및 삭제를 지원합니다. 자세한 내용은 Apache Hudi 설명서에서 Writing Hudi tables
다음 예제에서는 대화식 Spark 쉘을 시작하거나 Spark 제출을 사용하거나 Amazon EMR에서 Hudi를 작업하기 위해 Amazon EMR Notebooks를 사용하는 방법을 보여줍니다. 또한 Hudi DeltaStreamer 유틸리티 또는 기타 도구를 사용하여 데이터 세트에 쓸 수도 있습니다. 이 섹션에 소개된 예에서는 SSH를 기본 hadoop
사용자로 사용하여 마스터 노드에 연결되어 있는 동안 Spark 셸을 사용하여 데이터 세트로 작업하는 방법을 보여줍니다.
Amazon EMR 6.7.0 이상을 사용하여 spark-shell
, spark-submit
또는 spark-sql
을 실행하는 경우 다음 명령을 전달합니다.
참고
Amazon EMR 6.7.0은 Apache Hudi
프라이머리 노드에서 Spark 쉘을 여는 방법
-
SSH를 사용하여 프라이머리 노드에 연결합니다. 자세한 내용은 Amazon EMR 관리 안내서에서 SSH를 사용하여 프라이머리 노드에 연결을 참조하세요.
-
Spark 셸을 시작하려면 다음 명령을 입력합니다. PySpark 쉘을 사용하려면
spark-shell
을pyspark
로 바꿉니다.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Amazon EMR 6.6.x 이하를 사용하여 spark-shell
, spark-submit
또는 spark-sql
을 실행하는 경우 다음 명령을 전달합니다.
참고
-
Amazon EMR 6.2 및 5.31 이상(Hudi 0.6.x 이상)에서는 구성에서
spark-avro.jar
을 생략할 수 있습니다. -
Amazon EMR 6.5 및 5.35 이상(Hudi 0.9.x 이상)에서는 구성에서
spark.sql.hive.convertMetastoreParquet=false
를 생략할 수 있습니다. -
Amazon EMR 6.6 및 5.36 이상(Hudi 0.10.x 이상)에서는 Version: 0.10.0 Spark Guide
에 설명된 대로 HoodieSparkSessionExtension
구성을 포함해야 합니다.--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
프라이머리 노드에서 Spark 쉘을 여는 방법
-
SSH를 사용하여 프라이머리 노드에 연결합니다. 자세한 내용은 Amazon EMR 관리 안내서에서 SSH를 사용하여 프라이머리 노드에 연결을 참조하세요.
-
Spark 셸을 시작하려면 다음 명령을 입력합니다. PySpark 쉘을 사용하려면
spark-shell
을pyspark
로 바꿉니다.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Amazon EMR Notebooks에서 Hudi를 사용하려면 먼저 로컬 파일 시스템의 Hudi jar 파일을 노트북 클러스터의 프라이머리 노드에 있는 HDFS로 복사해야 합니다. 그런 다음, 노트북 편집기를 이용해 Hudi를 사용하도록 EMR 노트북을 구성합니다.
Amazon EMR Notebooks에서 Hudi를 사용하는 방법
-
Amazon EMR Notebooks에 대한 클러스터를 생성하고 시작합니다. 자세한 내용은 Amazon EMR 관리 안내서에서 노트북에 대한 Amazon EMR 클러스터 생성을 참조하세요.
-
SSH를 사용하여 클러스터의 마스터 노드에 연결하고 다음 예와 같이 로컬 파일 시스템의 jar 파일을 HDFS로 복사합니다. 이 예에서는 파일 관리의 명확성을 위해 HDFS에 디렉터리를 만듭니다. 원할 경우 HDFS에서 자체 대상을 선택할 수 있습니다.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
노트북 편집기를 열고 다음 예의 코드를 입력하고 실행합니다.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Amazon EMR Notebooks에서 Hudi를 사용하려면 먼저 로컬 파일 시스템의 Hudi jar 파일을 노트북 클러스터의 프라이머리 노드에 있는 HDFS로 복사해야 합니다. 그런 다음, 노트북 편집기를 이용해 Hudi를 사용하도록 EMR 노트북을 구성합니다.
Amazon EMR Notebooks에서 Hudi를 사용하는 방법
-
Amazon EMR Notebooks에 대한 클러스터를 생성하고 시작합니다. 자세한 내용은 Amazon EMR 관리 안내서에서 노트북에 대한 Amazon EMR 클러스터 생성을 참조하세요.
-
SSH를 사용하여 클러스터의 마스터 노드에 연결하고 다음 예와 같이 로컬 파일 시스템의 jar 파일을 HDFS로 복사합니다. 이 예에서는 파일 관리의 명확성을 위해 HDFS에 디렉터리를 만듭니다. 원할 경우 HDFS에서 자체 대상을 선택할 수 있습니다.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
노트북 편집기를 열고 다음 예의 코드를 입력하고 실행합니다.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Hudi용 Spark 세션 초기화
Scala를 사용할 때는 Spark 세션에서 다음 클래스를 가져와야 합니다. Spark 세션당 한 번 이 작업을 수행하면 됩니다.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Hudi 데이터 세트에 쓰기
다음 예제에서는 DataFrame을 생성하고 Hudi 데이터 세트로 쓰는 방법을 보여줍니다.
참고
코드 샘플을 Spark 셸에 붙여넣으려면 프롬프트에 :paste
를 입력하고 예제를 붙여넣은 다음 CTRL
+ D
를 누릅니다.
Hudi 데이터 세트에 DataFrame을 쓸 때마다 DataSourceWriteOptions
을 지정해야 합니다. 이러한 옵션 대부분이 쓰기 작업 간에 동일할 수 있습니다. 다음 예에서는
변수를 사용하여 일반 옵션을 지정합니다. 이 변수는 그 다음 예제에서도 사용합니다.hudiOptions
참고
Amazon EMR 6.7.0은 Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
참고
코드 예제 및 알림에서 Hudi 대신 'hoodie'가 표시될 수 있습니다. Hudi 코드베이스는 예전의 'hoodie' 철자를 널리 사용합니다.
옵션 | 설명 |
---|---|
TABLE_NAME |
데이터 세트를 등록할 테이블 이름입니다. |
TABLE_TYPE_OPT_KEY |
선택 사항입니다. 데이터 세트를 |
RECORDKEY_FIELD_OPT_KEY |
값의 레코드 키 필드는 |
PARTITIONPATH_FIELD_OPT_KEY |
값의 파티션 경로 필드는 |
PRECOMBINE_FIELD_OPT_KEY |
실제로 쓰기 전에 미리 결합하는 데 사용되는 필드입니다. 두 레코드에 동일한 키 값이 있는 경우 Hudi는 |
다음 옵션은 메타스토어에 Hudi 데이터 세트 테이블을 등록하는 데에만 필요합니다. Hudi 데이터 세트를 Hive 메타스토어의 테이블로 등록하지 않은 경우 이러한 옵션이 필요하지 않습니다.
옵션 | 설명 |
---|---|
HIVE_DATABASE_OPT_KEY |
동기화할 Hive 데이터베이스입니다. 기본값은 |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
파티션 필드 값을 Hive 파티션 열로 추출하는 데 사용되는 클래스입니다. |
HIVE_PARTITION_FIELDS_OPT_KEY |
Hive 파티션 열을 결정하는 데 사용할 데이터 세트의 필드입니다. |
HIVE_SYNC_ENABLED_OPT_KEY |
|
HIVE_TABLE_OPT_KEY |
필수 사항입니다. 동기화할 Hive의 테이블의 이름입니다. 예: |
HIVE_USER_OPT_KEY |
선택 사항입니다. 동기화할 때 사용할 Hive 사용자 이름입니다. 예: |
HIVE_PASS_OPT_KEY |
선택 사항입니다. |
HIVE_URL_OPT_KEY |
Hive 메타스토어 URL입니다. |
데이터 업서트
다음 예제에서는 DataFrame를 작성하여 데이터를 업서트하는 방법을 보여줍니다. 이전의 삽입 예제와 달리 OPERATION_OPT_KEY
값은 UPSERT_OPERATION_OPT_VAL
로 설정됩니다. 또한 레코드가 추가되어야 함을 나타내기 위해 .mode(SaveMode.Append)
가 지정됩니다.
참고
Amazon EMR 6.7.0은 Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
레코드 삭제
레코드를 하드 삭제하려면 빈 페이로드를 업서트할 수 있습니다. 이 경우 PAYLOAD_CLASS_OPT_KEY
옵션은 EmptyHoodieRecordPayload
클래스를 지정합니다. 이 예제에서는 동일한 레코드를 지정하도록 업서트 예제에서 사용된 동일한 DataFrame(updateDF
)을 사용합니다.
참고
Amazon EMR 6.7.0은 Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
제출한 데이터 세트의 모든 레코드를 제거하도록 OPERATION_OPT_KEY
를 DELETE_OPERATION_OPT_VAL
로 설정하여 데이터를 하드 삭제할 수도 있습니다. 소프트 삭제 수행 지침 및 Hudi 테이블에 저장된 데이터 삭제에 대한 자세한 내용은 Apache Hudi 설명서에서 Deletes
Hudi 데이터 세트에서 읽기
Hudi는 현재 특정 시점에서 데이터를 검색하기 위해 기본적으로 스냅샷 쿼리를 수행합니다. 다음은 Hudi 데이터 세트에 쓰기에서 S3에 기록된 데이터 세트를 쿼리하는 예제입니다. s3://amzn-s3-demo-bucket/myhudidataset
를 테이블 경로로 바꾸고 각 파티션 수준에 대한 와일드카드 별표와 하나의 추가 별표를 추가합니다. 이 예제에서는 파티션 수준이 하나이므로 와일드카드 기호를 두 개 추가했습니다.
참고
Amazon EMR 6.7.0은 Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
증분 쿼리
Hudi에서 증분 쿼리를 수행하여 지정된 커밋 타임스탬프 이후 변경된 레코드 스트림을 가져올 수도 있습니다. 이렇게 하려면 QUERY_TYPE_OPT_KEY
필드를 QUERY_TYPE_INCREMENTAL_OPT_VAL
로 설정합니다. 그런 다음 BEGIN_INSTANTTIME_OPT_KEY
에 대한 값을 추가하여 지정된 시간 이후에 기록된 모든 레코드를 가져옵니다. 증분 쿼리는 변경된 레코드만 처리하므로 배치 쿼리보다 보통 10배 더 효율적입니다.
증분 쿼리를 수행할 때는 스냅샷 쿼리에 와일드카드 별표를 사용하지 않고 루트(기본) 테이블 경로를 사용합니다.
참고
Presto는 증분 쿼리를 지원하지 않습니다.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
Hudi 데이터 세트에서 읽는 방법에 대한 자세한 내용은 Apache Hudi 설명서에서 Querying Hudi tables