EMRFS S3 向けに最適化されたコミット・プロトコルの要件 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMRFS S3 向けに最適化されたコミット・プロトコルの要件

EMRFS S3 向けに最適化されたコミット・プロトコルは、次の条件が満たされます。

  • Spark SQL、 DataFrames Dataset を使用して分割テーブルを上書き込む Spark ジョブを実行する。

  • パーティション上書きモードがのSparkジョブを実行しますdynamic

  • Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「EMRFS S3 向けに最適化されたコミット・プロトコルとマルチパートアップロード」を参照してください。

  • EMRFS のファイルシステムキャッシュは有効になっています。これがデフォルトです。fs.s3.impl.disable.cache設定がに設定されていることを確認しますfalse

  • Spark の組み込みデータソースサポートが使用されます。ビルトインデータソースサポートは、

    • ジョブが組み込みのデータソースまたはテーブルに書き込むとき。

    • ジョブが Hive メタストアの Parquet テーブルに書き込まれるとき。これはspark.sql.hive.convertInsertingPartitionedTablespark.sql.hive.convertMetastoreParquetおよびが両方とも true に設定されている場合に発生します。これらはデフォルト設定です。

    • ジョブが Hive メタストア ORC テーブルに書き込まれるとき。これはspark.sql.hive.convertInsertingPartitionedTablespark.sql.hive.convertMetastoreOrcおよびが両方に設定されている場合に発生しますtrue。これらはデフォルト設定です。

  • たとえば、デフォルトのパーティションの場所に書き込むSparkジョブ操作は、${table_location}/k1=v1/k2=v2/コミットプロトコルを使用します。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、ALTER TABLE SQLコマンドを使用してカスタムのパーティション場所が設定されている場合、プロトコルは使用されません。

  • Spark で以下の値を使用する。

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol に設定する必要があります。これは Amazon EMR リリース 5.30.0 以降、6.2.0 以降のデフォルト設定です。

    • partitionOverwriteModewritespark.sql.sources.partitionOverwriteMode オプションまたははに設定する必要がありますdynamic。デフォルトの設定は、static です。

      注記

      partitionOverwriteMode 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode プロパティを設定します。

    • Spark ジョブが Hive メタストアの Parquet テーブルに上書きされる場合はspark.sql.hive.convertMetastoreParquetspark.sql.hive.convertInsertingPartitionedTable、、、spark.sql.hive.convertMetastore.partitionOverwriteModeをに設定する必要がありますtrue。デフォルト設定があります。

    • Spark ジョブが Hive メタストア ORC テーブルに上書きされる場合はspark.sql.hive.convertMetastoreOrcspark.sql.hive.convertInsertingPartitionedTable、、spark.sql.hive.convertMetastore.partitionOverwriteModeをに設定する必要がありますtrue。デフォルト設定があります。

例 -動的パーティション上書きモード

この Scala の例では、最適化がトリガーされます。まず、partitionOverwriteModeプロパティをに設定しますdynamic。これにより、データを書き込んでいるパーティションのみが上書きされます。次に、partitionByで動的パーティション列を指定し、書き込みモードをに設定しますoverwrite

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use EMR file system, instead of "s3a://" or "hdfs://"

EMRFS S3 向けに最適化されたコミット・プロトコルが使用されない場合

一般的に、EMRFS S3 に最適化されたコミットプロトコルは、オープンソースのデフォルトの Spark SQL コミットプロトコルと同じように機能しますorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol。以下の状況では、最適化は行われません。

状況 コミットプロトコルが使用されない理由
HDFS に書き込むとき コミットプロトコルは、EMRFS を使用した Amazon S3 への書き込みのみをサポートします。
S3A ファイルシステムを使用する場合 コミットプロトコルは EMRFS のみをサポートします。
当社の Spark の RDD API を使用する場合 MapReduce コミットプロトコルは、SparkSQL、 DataFrame、またはデータセット API の使用のみをサポートします。
動的パーティションの上書きがトリガーされない場合 コミットプロトコルは、動的パーティションの上書きの場合のみ最適化します。その他のケースについては、を参照してくださいEMRFS S3 向けに最適化されたコミッターを使用する

以下の Scala の例は、EMRFS S3 に最適化されたコミットプロトコルが委任するその他の状況を示していますSQLHadoopMapReduceCommitProtocol

例 -カスタムのパーティション位置での動的パーティション上書きモード

この例では、Scala プログラムは動的パーティション上書きモードで 2 つのパーティションを上書きします。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 に最適化されたコミットプロトコルは、デフォルトのパーティションロケーションを使用するパーティションのみを改善します。

val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")

Scala コードは以下の Amazon S3 オブジェクトを作成します。

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
注記

以前のバージョンのSparkでカスタムパーティションの場所に書き込むと、データが失われる可能性があります。この例では、dt='2019-01-28'パーティションは失われます。詳細については、「SPARK-35106」を参照してください。この問題は Amazon EMR リリース 5.33.0 以降で修正されています。ただし、6.0.x と 6.1.x は対象外です。

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。前の例と同様に、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスに悪影響を及ぼす可能性があります。

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

  1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

  2. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

  3. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

  4. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。