翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Hudi データセットを操作する
Hudi は、Spark を介した Hudi データセットのデータの挿入、更新、削除をサポートしています。詳細については、Apache Hudi ドキュメントの「Writing Hudi Tables
以下の例では、インタラクティブな Spark シェルを起動し、Spark submit を使用するか、Amazon EMR Notebooks を使用して、Amazon EMR で Hudi を操作する方法を示しています。Hudi DeltaStreamer ユーティリティまたは他のツールを使用して、データセットに書き込むこともできます。このセクション全体を通して、例では、デフォルトの hadoop
ユーザーとして、SSH を使用してマスターノードに接続しながら、Spark シェルを使用してデータセットを操作する方法を示しています。
Amazon EMR 6.7.0 以降を使用して、spark-shell
、spark-submit
、または spark-sql
を実行している場合は、次のコマンドを渡します。
注記
Amazon EMR 6.7.0 は Apache Hudi
プライマリノードで Spark シェルを開くには
-
SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。
-
以下のコマンドを入力して、Spark シェルを起動します。PySpark shell を使用するには、
spark-shell
をpyspark
に置き換えてください。spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Amazon EMR 6.6.x 以前を使用して、spark-shell
、spark-submit
、または spark-sql
を実行している場合は、次のコマンドを渡します。
注記
-
Amazon EMR 6.2 および 5.31 以降 (Hudi 0.6.x 以降) では、
spark-avro.jar
を設定から省略できます。 -
Amazon EMR 6.5 および 5.35 以降 (Hudi 0.9.x 以降) では、
spark.sql.hive.convertMetastoreParquet=false
を設定から省略できます。 -
Amazon EMR 6.6 および 5.36 以降 (Hudi 0.10.x 以降) には、「Version: 0.10.0 Spark Guide
」に記載されている HoodieSparkSessionExtension
設定が含まれている必要があります。--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
プライマリノードで Spark シェルを開くには
-
SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。
-
以下のコマンドを入力して、Spark シェルを起動します。PySpark shell を使用するには、
spark-shell
をpyspark
に置き換えてください。spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Amazon EMR Notebooks で Hudi を使用するには、まずローカルファイルシステムから Hudi jar ファイルをノートブッククラスターのマスターノードの HDFS にコピーする必要があります。次に、ノートブックエディタで、Hudi を使用するように EMR notebook を設定します。
Amazon EMR Notebooks で Hudi を使用するには
-
Amazon EMR Notebooks 用のクラスターを作成して起動します。詳細については、「Amazon EMR 管理ガイド」の「ノートブックの Amazon EMR クラスターの作成」を参照してください。
-
SSH を使用してクラスターのマスターノードに接続し、以下の例に示すようにローカルファイルシステムから HDFS に jar ファイルをコピーします。この例では、ファイルの管理をわかりやすくするために、HDFS にディレクトリを作成しています。必要に応じて、HDFS で独自のターゲットを選択できます。
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
ノートブックエディタを開き、以下の例のコードを入力して実行します。
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Amazon EMR Notebooks で Hudi を使用するには、まずローカルファイルシステムから Hudi jar ファイルをノートブッククラスターのマスターノードの HDFS にコピーする必要があります。次に、ノートブックエディタで、Hudi を使用するように EMR notebook を設定します。
Amazon EMR Notebooks で Hudi を使用するには
-
Amazon EMR Notebooks 用のクラスターを作成して起動します。詳細については、「Amazon EMR 管理ガイド」の「ノートブックの Amazon EMR クラスターの作成」を参照してください。
-
SSH を使用してクラスターのマスターノードに接続し、以下の例に示すようにローカルファイルシステムから HDFS に jar ファイルをコピーします。この例では、ファイルの管理をわかりやすくするために、HDFS にディレクトリを作成しています。必要に応じて、HDFS で独自のターゲットを選択できます。
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
ノートブックエディタを開き、以下の例のコードを入力して実行します。
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Hudi の Spark セッションを初期化する
Scala を使用する場合、Spark セッションで次のクラスをインポートする必要があります。この操作は、Spark セッションごとに 1 回行う必要があります。
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
Hudi データセットに書き込む
以下の例では、DataFrame を作成し、それを Hudi データセットとして書き込む方法を示しています。
注記
コードサンプルを Spark シェルに貼り付けるには、プロンプトで「:paste
」と入力し、例を貼り付けて、CTRL
+ D
を押します。
DataFrame を Hudi データセットに書き込むたびに、DataSourceWriteOptions
を指定する必要があります。これらのオプションの多くは、書き込みオペレーション間で共通することがよくあります。この例では、
変数を使用して、後続の例でも使用する共通のオプションを指定しています。hudiOptions
注記
Amazon EMR 6.7.0 は Apache Hudi
// Create a DataFrame
val inputDF = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
).toDF("id", "creation_date", "last_update_time")
//Specify common DataSourceWriteOptions in the single hudiOptions variable
val hudiOptions = Map[String,String](
HoodieWriteConfig.TBL_NAME.key -> "tableName
",
DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
",
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
)
// Write the DataFrame as a Hudi dataset
(inputDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
.mode(SaveMode.Overwrite)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame
val inputDF = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
).toDF("id", "creation_date", "last_update_time")
//Specify common DataSourceWriteOptions in the single hudiOptions variable
val hudiOptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME -> "tableName
",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
// Write the DataFrame as a Hudi dataset
(inputDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.options(hudiOptions)
.mode(SaveMode.Overwrite)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame
inputDF = spark.createDataFrame(
[
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
],
["id", "creation_date", "last_update_time"]
)
# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'tableName
',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'tableName
',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/
')
注記
コードの例や通知には、Hudi ではなく「hoodie」と表示されている場合があります。Hudi コードベースは、古い「hoodie」の綴りが広く使用されています。
オプション | 説明 |
---|---|
TABLE_NAME |
データセットを登録するテーブルの名前。 |
TABLE_TYPE_OPT_KEY |
オプション。データセットが |
RECORDKEY_FIELD_OPT_KEY |
値が |
PARTITIONPATH_FIELD_OPT_KEY |
値が |
PRECOMBINE_FIELD_OPT_KEY |
実際の書き込みの前に事前結合で使用されるフィールド。2 つのレコードのキー値が同じ場合、Hudi は、 |
以下のオプションは、Hudi データセットテーブルをメタストアに登録するためにのみ必要です。Hudi データセットを Hive メタストアのテーブルとして登録しない場合、これらのオプションは必要ありません。
オプション | 説明 |
---|---|
HIVE_DATABASE_OPT_KEY |
同期先の Hive データベース。デフォルト: |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
パーティションフィールド値を Hive パーティション列に抽出するために使用されるクラス。 |
HIVE_PARTITION_FIELDS_OPT_KEY |
Hive パーティション列を決定するために使用するデータセット内のフィールド。 |
HIVE_SYNC_ENABLED_OPT_KEY |
|
HIVE_TABLE_OPT_KEY |
必須。同期先の Hive テーブルの名前。例えば、 |
HIVE_USER_OPT_KEY |
オプション。同期時に使用する Hive ユーザー名。例えば、 |
HIVE_PASS_OPT_KEY |
オプション。 |
HIVE_URL_OPT_KEY |
Hive メタストアの URL。 |
データのアップサート
以下の例では、DataFrame を書き込むことでデータをアップサートする方法を示しています。以前の挿入の例とは異なり、OPERATION_OPT_KEY
値は UPSERT_OPERATION_OPT_VAL
に設定されています。また、.mode(SaveMode.Append)
は、レコードを付加する必要があることを指示するために指定されています。
注記
Amazon EMR 6.7.0 は Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))
(updateDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert")
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value
"))
(updateDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.options(hudiOptions)
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit
# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value
'))
updateDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/
')
レコードを削除する
レコードをハード削除するには、空のペイロードをアップサートします。この場合、PAYLOAD_CLASS_OPT_KEY
オプションで EmptyHoodieRecordPayload
クラスを指定しています。この例では、upsert の例で使用したのと同じ DataFrame updateDF
を使用して、同じレコードが指定されるようにしています。
注記
Amazon EMR 6.7.0 は Apache Hudi
(updateDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete")
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/
')
OPERATION_OPT_KEY
を DELETE_OPERATION_OPT_VAL
に設定して、送信するデータセット内のすべてのレコードを削除し、データをハード削除することもできます。ソフト削除の実行方法、および Hudi テーブルに保存されたデータの削除の詳細については、Apache Hudi ドキュメントの「Deletes
Hudi データセットから読み込む
現時点でのデータを取得するために、Hudi はデフォルトでスナップショットクエリを実行します。以下に、Hudi データセットに書き込む で S3 に書き込まれたデータセットをクエリする例を示します。s3://amzn-s3-demo-bucket/myhudidataset
をテーブルパスに置き換え、パーティションレベルごとにワイルドカードのアスタリスクを追加し、さらにアスタリスクを 1 つ追加します。この例では、パーティションレベルが 1 つあるため、2 つのワイルドカードシンボルを追加しました。
注記
Amazon EMR 6.7.0 は Apache Hudi
val snapshotQueryDF = spark.read
.format("hudi")
.load("s3://amzn-s3-demo-bucket/myhudidataset"
)
.show()
(val snapshotQueryDF = spark.read
.format("org.apache.hudi")
.load("s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*"))
snapshotQueryDF.show()
snapshotQueryDF = spark.read \
.format('org.apache.hudi') \
.load('s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*')
snapshotQueryDF.show()
増分クエリ
Hudi で増分クエリを実行して、特定のコミットタイムスタンプ以降に変更されたレコードのストリームを取得することもできます。これを行うには、QUERY_TYPE_OPT_KEY
フィールドを QUERY_TYPE_INCREMENTAL_OPT_VAL
に設定します。次に、BEGIN_INSTANTTIME_OPT_KEY
の値を追加して、指定された時刻以降に書き込まれたすべてのレコードを取得します。増分クエリでは、変更されたレコードのみが処理されるため、通常、バッチクエリよりも 10 倍効率が高くなります。
増分クエリを実行するときは、スナップショットクエリに使用されるワイルドカードのアスタリスクを付けずに、ルート (ベース) テーブルパスを使用します。
注記
Presto は増分クエリをサポートしていません。
(val incQueryDF = spark.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>
)
.load("s3://amzn-s3-demo-bucket/myhudidataset
" ))
incQueryDF.show()
readOptions = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': <beginInstantTime>
,
}
incQueryDF = spark.read \
.format('org.apache.hudi') \
.options(**readOptions) \
.load('s3://amzn-s3-demo-bucket/myhudidataset
')
incQueryDF.show()
Hudi データセットからの読み取りの詳細については、Apache Hudi ドキュメントの「Querying Hudi tables