EMRFS S3 向けに最適化されたコミッターのための要件 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMRFS S3 向けに最適化されたコミッターのための要件

以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミッターが使用されます。

  • Spark SQL、DataFrame、または Dataset により Parquet ファイルを書き込む Spark ジョブを実行する。

  • Amazon EMR では、マルチパートアップロードが有効になっています。これがデフォルト値です。詳細については、「EMRFS S3 向けに最適化されたコミッターとマルチパートアップロード」を参照してください。

  • Spark の組み込み Parquet サポートが使用されている。組み込み Parquet サポートは以下の状況で使用されます。

    • spark.sql.hive.convertMetastoreParquettrue に設定されている場合。これがデフォルトの設定です。

    • ジョブが Parquet データソースまたはテーブルに書き込まれる場合 (たとえば、ターゲットテーブルはUSING parquet句。

    • ジョブでパーティション分割されていない Hive メタストア Parquet テーブルに書き込む場合。Spark の組み込み Parquet サポートはパーティション分割された Hive テーブルをサポートしていません。これは既知の制限です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「Hive metastore Parquet table conversion」を参照してください。

  • デフォルトのパーティションの場所に書き込む Spark ジョブ操作 (例:${table_location}/k1=v1/k2=v2/— コミッターを使用します。ジョブ操作がカスタムパーティションの場所に書き込む場合、コミッターは使用されません。たとえば、カスタムパーティションの場所がALTER TABLE SQLコマンドを実行します。

  • Spark で以下の値を使用する。

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled プロパティを true に設定する必要があります。これは、Amazon EMR 5.20.0 以降でのデフォルト設定です。Amazon EMR 5.19.0 では、デフォルト値はfalse。この値の設定については、「Amazon EMR 5.19.0 で EMRFS S3 向けに最適化されたコミッターを有効にする」を参照してください。

    • パーティション分割されていない Hive メタストアテーブルに書き込む場合は、spark.sql.hive.convertMetastoreParquettrue に設定する必要があります。これがデフォルトの設定です。

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter に設定する必要があります。これがデフォルトの設定です。

    • spark.sql.sources.commitProtocolClassをに設定する必要があります。org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolまたはorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolは、EMR 5.x シリーズバージョン 5.30.0 以降、および EMR 6.x シリーズバージョン 6.2.0 以降のデフォルト設定です。org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolは、以前の EMR バージョンのデフォルト設定です。

    • Spark ジョブでパーティション分割された Parquet データセットを動的パーティション列で上書きする場合は、partitionOverwriteMode 書き込みオプションと spark.sql.sources.partitionOverwriteModestatic に設定する必要があります。これがデフォルトの設定です。

      注記

      partitionOverwriteMode 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteModeプロパティです。

EMRFS S3 向けに最適化されたコミッターが使用されない場合

コミッターは以下の状況では使用されません。

  • HDFS に書き込む場合

  • S3A ファイルシステムを使用する場合

  • ORC やテキストなど、Parquet 以外の出力形式を使用する場合

  • MapReduce または Spark の RDD API を使用する場合

以下の例では、Scala で記述された実装で、EMRFS S3 向けに最適化されたコミッターを全体に使用しないもの (最初の例) と、部分的に使用しないもの (2 番目の例) を示しています。

例 — 動的パーティション上書きモード

以下の Scala コードでは、partitionOverwriteModedynamic に設定され、動的パーティション列が partitionBy で指定され、書き込みモードが overwrite に設定されているため、コミッターは使用されません。

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://bucket/output")

この例では、EMRFS S3 向けに最適化されたコミッターや設定済みの出力コミッターは使用されません。代わりに、Spark のステージングディレクトリ (.spark-staging で始まる出力場所に作成される一時ディレクトリ) を使用する別のコミットアルゴリズムが実行されます。このアルゴリズムではパーティションディレクトリの名前が順番に変更されるため、パフォーマンスが低下する可能性があります。

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

  1. タスクは出力を Spark のステージングディレクトリの下にあるパーティションディレクトリ (例:${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/

  2. 書き込まれたパーティションごとに、タスクの試行は相対的なパーティションパスを追跡します。たとえば、k1=v1/k2=v2

  3. タスクが正常に完了すると、追跡されたすべての相対パーティションパスがドライバーに渡されます。

  4. すべてのタスクが完了した後、ジョブのコミットフェーズでは、成功したタスクの試行によって Spark のステージングディレクトリに書き込まれたすべてのパーティションディレクトリが収集されます。ディレクトリツリーの名前変更オペレーションを使用して、これらの各ディレクトリの名前が最終的な出力場所に順番に変更されます。

  5. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。

例 — カスタムのパーティションの場所

この例では、Scala コードは 2 つのパーティションを挿入します。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミッターは、デフォルトのパーティション場所を使用するパーティションへのタスク出力の書き込みにのみ使用されます。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala コードは、以下の Amazon S3 オブジェクトを作成します。

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。手順は以下のとおりです。

  1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

  2. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

  3. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

  4. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。