Amazon EMR
Amazon EMR リリースガイド

EMRFS S3 向けに最適化されたコミッターの要件

以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミッターが使用されます。

  • Spark SQL、DataFrame、または Dataset により Parquet ファイルを書き込む Spark ジョブを実行する。

  • Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルト値です。詳細については、「EMRFS S3 向けに最適化されたコミッターとマルチパートアップロード」を参照してください。

  • Spark の組み込み Parquet サポートが使用されている。組み込み Parquet サポートは以下の状況で使用されます。

    • spark.sql.hive.convertMetastoreParquettrue に設定されている場合。これがデフォルトの設定です。

    • ジョブで Parquet データソースまたはテーブルに書き込む場合。たとえば、ターゲットテーブルが USING parquet 句を使用して作成される場合です。

    • ジョブでパーティション分割されていない Hive メタストア Parquet テーブルに書き込む場合。Spark の組み込み Parquet サポートはパーティション分割された Hive テーブルをサポートしていません。これは既知の制限です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「Hive metastore Parquet table conversion」を参照してください。

  • デフォルトのパーティション場所 (${table_location}/k1=v1/k2=v2/ など) に書き込む Spark ジョブオペレーションでコミッターを使用する。ジョブオペレーションでカスタムのパーティション場所に書き込む場合、コミッターは使用されません。たとえば、カスタムのパーティション場所が ALTER TABLE SQL コマンドを使用して設定されている場合です。

  • Spark で以下の値を使用する。

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled プロパティを true に設定する必要があります。これは、Amazon EMR 5.20.0 以降でのデフォルト設定です。Amazon EMR 5.19.0 では、デフォルト値は false です。この値の設定については、「Amazon EMR 5.19.0 で EMRFS S3 向けに最適化されたコミッターを有効にする」を参照してください。

    • パーティション分割されていない Hive メタストアテーブルに書き込む場合は、spark.sql.hive.convertMetastoreParquettrue に設定する必要があります。これがデフォルトの設定です。

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter に設定する必要があります。これがデフォルトの設定です。

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol に設定する必要があります。これがデフォルトの設定です。

    • Spark ジョブでパーティション分割された Parquet データセットを動的パーティション列で上書きする場合は、partitionOverwriteMode 書き込みオプションと spark.sql.sources.partitionOverwriteModestatic に設定する必要があります。これがデフォルトの設定です。

      注記

      partitionOverwriteMode 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode プロパティを設定します。

EMRFS S3 向けに最適化されたコミッターが使用されない場合

コミッターは以下の状況では使用されません。

  • HDFS に書き込む場合

  • S3A ファイルシステムを使用する場合

  • ORC やテキストなど、Parquet 以外の出力形式を使用する場合

  • MapReduce または Spark の RDD API を使用する場合

以下の例では、Scala で記述された実装で、EMRFS S3 向けに最適化されたコミッターを全体に使用しないもの (最初の例) と、部分的に使用しないもの (2 番目の例) を示しています。

例 – 動的パーティション上書きモード

以下の Scala コードでは、partitionOverwriteModedynamic に設定され、動的パーティション列が partitionBy で指定され、書き込みモードが overwrite に設定されているため、コミッターは使用されません。

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://bucket/output")

この例では、EMRFS S3 向けに最適化されたコミッターや設定済みの出力コミッターは使用されません。代わりに、Spark のステージングディレクトリ (.spark-staging で始まる出力場所に作成される一時ディレクトリ) を使用する別のコミットアルゴリズムが実行されます。このアルゴリズムではパーティションディレクトリの名前が順番に変更されるため、パフォーマンスが低下する可能性があります。

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

  1. タスクの試行によって出力が Spark のステージングディレクトリの下のパーティションディレクトリ (${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/ など) に書き込まれます。

  2. 書き込まれたパーティションごとに、タスク試行によって相対パーティションパス (k1=v1/k2=v2 など) が追跡されます。

  3. タスクが正常に完了すると、追跡されたすべての相対パーティションパスがドライバーに渡されます。

  4. すべてのタスクが完了した後、ジョブのコミットフェーズでは、成功したタスクの試行によって Spark のステージングディレクトリに書き込まれたすべてのパーティションディレクトリが収集されます。ディレクトリツリーの名前変更オペレーションを使用して、これらの各ディレクトリの名前が最終的な出力場所に順番に変更されます。

  5. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。

例 – カスタムのパーティション場所

この例では、Scala コードは 2 つのパーティションを挿入します。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミッターは、デフォルトのパーティション場所を使用するパーティションへのタスク出力の書き込みにのみ使用されます。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala コードは以下の Amazon S3 オブジェクトを作成します。

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。手順は以下のとおりです。

  1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

  2. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

  3. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

  4. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。