Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

Hudi データセットを操作する - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Hudi データセットを操作する

Hudi は、Spark を介した Hudi データセットのデータの挿入、更新、削除をサポートしています。詳細については、Apache Hudi ドキュメントの「Writing Hudi Tables」を参照してください。

以下の例では、インタラクティブな Spark シェルを起動し、Spark submit を使用するか、Amazon EMR Notebooks を使用して、Amazon EMR で Hudi を操作する方法を示しています。Hudi DeltaStreamer ユーティリティまたは他のツールを使用して、データセットに書き込むこともできます。このセクション全体を通して、例では、デフォルトの hadoop ユーザーとして、SSH を使用してマスターノードに接続しながら、Spark シェルを使用してデータセットを操作する方法を示しています。

Amazon EMR 6.7.0 以降を使用して、spark-shellspark-submit、または spark-sql を実行している場合は、次のコマンドを渡します。

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

プライマリノードで Spark シェルを開くには
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark shell を使用するには、spark-shellpyspark に置き換えてください。

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Amazon EMR 6.7.0 以降を使用して、spark-shellspark-submit、または spark-sql を実行している場合は、次のコマンドを渡します。

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

プライマリノードで Spark シェルを開くには
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark shell を使用するには、spark-shellpyspark に置き換えてください。

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Amazon EMR 6.6.x 以前を使用して、spark-shellspark-submit、または spark-sql を実行している場合は、次のコマンドを渡します。

注記
  • Amazon EMR 6.2 および 5.31 以降 (Hudi 0.6.x 以降) では、spark-avro.jar を設定から省略できます。

  • Amazon EMR 6.5 および 5.35 以降 (Hudi 0.9.x 以降) では、spark.sql.hive.convertMetastoreParquet=false を設定から省略できます。

  • Amazon EMR 6.6 および 5.36 以降 (Hudi 0.10.x 以降) には、「Version: 0.10.0 Spark Guide」に記載されている HoodieSparkSessionExtension 設定が含まれている必要があります。

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
プライマリノードで Spark シェルを開くには
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark shell を使用するには、spark-shellpyspark に置き換えてください。

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Amazon EMR 6.6.x 以前を使用して、spark-shellspark-submit、または spark-sql を実行している場合は、次のコマンドを渡します。

注記
  • Amazon EMR 6.2 および 5.31 以降 (Hudi 0.6.x 以降) では、spark-avro.jar を設定から省略できます。

  • Amazon EMR 6.5 および 5.35 以降 (Hudi 0.9.x 以降) では、spark.sql.hive.convertMetastoreParquet=false を設定から省略できます。

  • Amazon EMR 6.6 および 5.36 以降 (Hudi 0.10.x 以降) には、「Version: 0.10.0 Spark Guide」に記載されている HoodieSparkSessionExtension 設定が含まれている必要があります。

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
プライマリノードで Spark シェルを開くには
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark shell を使用するには、spark-shellpyspark に置き換えてください。

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Amazon EMR Notebooks で Hudi を使用するには、まずローカルファイルシステムから Hudi jar ファイルをノートブッククラスターのマスターノードの HDFS にコピーする必要があります。次に、ノートブックエディタで、Hudi を使用するように EMR notebook を設定します。

Amazon EMR Notebooks で Hudi を使用するには
  1. Amazon EMR Notebooks 用のクラスターを作成して起動します。詳細については、「Amazon EMR 管理ガイド」の「ノートブックの Amazon EMR クラスターの作成」を参照してください。

  2. SSH を使用してクラスターのマスターノードに接続し、以下の例に示すようにローカルファイルシステムから HDFS に jar ファイルをコピーします。この例では、ファイルの管理をわかりやすくするために、HDFS にディレクトリを作成しています。必要に応じて、HDFS で独自のターゲットを選択できます。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. ノートブックエディタを開き、以下の例のコードを入力して実行します。

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Amazon EMR Notebooks で Hudi を使用するには、まずローカルファイルシステムから Hudi jar ファイルをノートブッククラスターのマスターノードの HDFS にコピーする必要があります。次に、ノートブックエディタで、Hudi を使用するように EMR notebook を設定します。

Amazon EMR Notebooks で Hudi を使用するには
  1. Amazon EMR Notebooks 用のクラスターを作成して起動します。詳細については、「Amazon EMR 管理ガイド」の「ノートブックの Amazon EMR クラスターの作成」を参照してください。

  2. SSH を使用してクラスターのマスターノードに接続し、以下の例に示すようにローカルファイルシステムから HDFS に jar ファイルをコピーします。この例では、ファイルの管理をわかりやすくするために、HDFS にディレクトリを作成しています。必要に応じて、HDFS で独自のターゲットを選択できます。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. ノートブックエディタを開き、以下の例のコードを入力して実行します。

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Amazon EMR Notebooks で Hudi を使用するには、まずローカルファイルシステムから Hudi jar ファイルをノートブッククラスターのマスターノードの HDFS にコピーする必要があります。次に、ノートブックエディタで、Hudi を使用するように EMR notebook を設定します。

Amazon EMR Notebooks で Hudi を使用するには
  1. Amazon EMR Notebooks 用のクラスターを作成して起動します。詳細については、「Amazon EMR 管理ガイド」の「ノートブックの Amazon EMR クラスターの作成」を参照してください。

  2. SSH を使用してクラスターのマスターノードに接続し、以下の例に示すようにローカルファイルシステムから HDFS に jar ファイルをコピーします。この例では、ファイルの管理をわかりやすくするために、HDFS にディレクトリを作成しています。必要に応じて、HDFS で独自のターゲットを選択できます。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. ノートブックエディタを開き、以下の例のコードを入力して実行します。

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Amazon EMR Notebooks で Hudi を使用するには、まずローカルファイルシステムから Hudi jar ファイルをノートブッククラスターのマスターノードの HDFS にコピーする必要があります。次に、ノートブックエディタで、Hudi を使用するように EMR notebook を設定します。

Amazon EMR Notebooks で Hudi を使用するには
  1. Amazon EMR Notebooks 用のクラスターを作成して起動します。詳細については、「Amazon EMR 管理ガイド」の「ノートブックの Amazon EMR クラスターの作成」を参照してください。

  2. SSH を使用してクラスターのマスターノードに接続し、以下の例に示すようにローカルファイルシステムから HDFS に jar ファイルをコピーします。この例では、ファイルの管理をわかりやすくするために、HDFS にディレクトリを作成しています。必要に応じて、HDFS で独自のターゲットを選択できます。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. ノートブックエディタを開き、以下の例のコードを入力して実行します。

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Hudi の Spark セッションを初期化する

Scala を使用する場合、Spark セッションで次のクラスをインポートする必要があります。この操作は、Spark セッションごとに 1 回行う必要があります。

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Hudi データセットに書き込む

以下の例では、DataFrame を作成し、それを Hudi データセットとして書き込む方法を示しています。

注記

コードサンプルを Spark シェルに貼り付けるには、プロンプトで「:paste」と入力し、例を貼り付けて、CTRL + D を押します。

DataFrame を Hudi データセットに書き込むたびに、DataSourceWriteOptions を指定する必要があります。これらのオプションの多くは、書き込みオペレーション間で共通することがよくあります。この例では、hudiOptions 変数を使用して、後続の例でも使用する共通のオプションを指定しています。

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
注記

コードの例や通知には、Hudi ではなく「hoodie」と表示されている場合があります。Hudi コードベースは、古い「hoodie」の綴りが広く使用されています。

Hudi の DataSourceWriteOptions リファレンス
オプション 説明

TABLE_NAME

データセットを登録するテーブルの名前。

TABLE_TYPE_OPT_KEY

オプション。データセットが "COPY_ON_WRITE" として作成されるか、"MERGE_ON_READ" として作成されるかを指定します。デフォルト: "COPY_ON_WRITE"

RECORDKEY_FIELD_OPT_KEY

値が HoodieKeyrecordKey コンポーネントとして使用されるレコードキーフィールド。実際の値は、フィールド値に対して .toString() を呼び出すことで取得されます。ネストされたフィールドは、a.b.c など、ドット表記を使用して指定できます。

PARTITIONPATH_FIELD_OPT_KEY

値が HoodieKeypartitionPath コンポーネントとして使用されるパーティションパスフィールド。実際の値は、フィールド値に対して .toString() を呼び出すことで取得されます。

PRECOMBINE_FIELD_OPT_KEY

実際の書き込みの前に事前結合で使用されるフィールド。2 つのレコードのキー値が同じ場合、Hudi は、Object.compareTo(..) で決定された事前結合フィールドの値が最も大きいレコードを選択します。

以下のオプションは、Hudi データセットテーブルをメタストアに登録するためにのみ必要です。Hudi データセットを Hive メタストアのテーブルとして登録しない場合、これらのオプションは必要ありません。

Hive の DataSourceWriteOptions リファレンス
オプション 説明

HIVE_DATABASE_OPT_KEY

同期先の Hive データベース。デフォルト: "default"

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

パーティションフィールド値を Hive パーティション列に抽出するために使用されるクラス。

HIVE_PARTITION_FIELDS_OPT_KEY

Hive パーティション列を決定するために使用するデータセット内のフィールド。

HIVE_SYNC_ENABLED_OPT_KEY

"true" に設定すると、データセットを Apache Hive メタストアに登録します。デフォルト: "false"

HIVE_TABLE_OPT_KEY

必須。同期先の Hive テーブルの名前。例えば、"my_hudi_table_cow" と指定します。

HIVE_USER_OPT_KEY

オプション。同期時に使用する Hive ユーザー名。例えば、"hadoop" と指定します。

HIVE_PASS_OPT_KEY

オプション。HIVE_USER_OPT_KEY で指定されたユーザーの Hive パスワード。

HIVE_URL_OPT_KEY

Hive メタストアの URL。

データのアップサート

以下の例では、DataFrame を書き込むことでデータをアップサートする方法を示しています。以前の挿入の例とは異なり、OPERATION_OPT_KEY 値は UPSERT_OPERATION_OPT_VAL に設定されています。また、.mode(SaveMode.Append) は、レコードを付加する必要があることを指示するために指定されています。

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

レコードを削除する

レコードをハード削除するには、空のペイロードをアップサートします。この場合、PAYLOAD_CLASS_OPT_KEY オプションで EmptyHoodieRecordPayload クラスを指定しています。この例では、upsert の例で使用したのと同じ DataFrame updateDF を使用して、同じレコードが指定されるようにしています。

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

OPERATION_OPT_KEY DELETE_OPERATION_OPT_VAL に設定して、送信するデータセット内のすべてのレコードを削除し、データをハード削除することもできます。ソフト削除の実行方法、および Hudi テーブルに保存されたデータの削除の詳細については、Apache Hudi ドキュメントの「Deletes」(削除) を参照してください。

Hudi データセットから読み込む

現時点でのデータを取得するために、Hudi はデフォルトでスナップショットクエリを実行します。以下に、Hudi データセットに書き込む で S3 に書き込まれたデータセットをクエリする例を示します。s3://amzn-s3-demo-bucket/myhudidataset をテーブルパスに置き換え、パーティションレベルごとにワイルドカードのアスタリスクを追加し、さらにアスタリスクを 1 つ追加します。この例では、パーティションレベルが 1 つあるため、2 つのワイルドカードシンボルを追加しました。

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()

注記

Amazon EMR 6.7.0 は Apache Hudi 0.11.0-amzn-0 を使用していますが、これは以前の Hudi バージョンよりも大幅に改善されています。詳細については、「Apache Hudi 0.11.0 Migration Guide」を参照してください。このタブの例にはこれらの変更が反映されています。

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()

(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

増分クエリ

Hudi で増分クエリを実行して、特定のコミットタイムスタンプ以降に変更されたレコードのストリームを取得することもできます。これを行うには、QUERY_TYPE_OPT_KEY フィールドを QUERY_TYPE_INCREMENTAL_OPT_VAL に設定します。次に、BEGIN_INSTANTTIME_OPT_KEY の値を追加して、指定された時刻以降に書き込まれたすべてのレコードを取得します。増分クエリでは、変更されたレコードのみが処理されるため、通常、バッチクエリよりも 10 倍効率が高くなります。

増分クエリを実行するときは、スナップショットクエリに使用されるワイルドカードのアスタリスクを付けずに、ルート (ベース) テーブルパスを使用します。

注記

Presto は増分クエリをサポートしていません。

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

Hudi データセットからの読み取りの詳細については、Apache Hudi ドキュメントの「Querying Hudi tables」(Hudiテーブルをクエリ) を参照してください。

プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.