EMRFS S3 向けに最適化されたコミッターの要件 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMRFS S3 向けに最適化されたコミッターの要件

以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミッターが使用されます。

  • Spark SQL、 DataFramesまたはデータセットを使用して Amazon S3 にファイルを書き込む Spark ジョブを実行する。Amazon EMR 6.4.0 以降では、Parquet、ORC、テキストベースの形式 (CSV と JSON を含む) など、一般的なあらゆる形式にこのコミッターを使用できます。Amazon EMR 6.4.0 より前のリリースバージョンでは、Parquet 形式のみがサポートされています。

  • Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「EMRFS S3 向けに最適化されたコミッターとマルチパートアップロード」を参照してください。

  • Spark の組み込み Parquet サポートが使用されている。組み込み Parquet サポートは以下の状況で使用されます。

    • spark.sql.hive.convertMetastoreParquettrue に設定されている場合。これはデフォルトの設定です。

    • ジョブによって Parquet データソースまたはテーブルに書き込まれる場合。例えば、ターゲットテーブルが USING parquet 句で作成される場合などです。

    • ジョブでパーティション分割されていない Hive メタストア Parquet テーブルに書き込む場合。Spark の組み込み Parquet サポートはパーティション分割された Hive テーブルをサポートしていません。これは既知の制限です。詳細については、Apache Spark SQL、 DataFrames および Datasets Guide の「Hive メタストア Parquet テーブル変換」を参照してください。

  • デフォルトのパーティションの場所 (${table_location}/k1=v1/k2=v2/ など) に書き込む Spark ジョブオペレーションでコミッターが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所が ALTER TABLE SQL コマンドを使用して設定されている場合、コミッターは使用されません。

  • Spark で以下の値を使用する。

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled プロパティを true に設定する必要があります。これは、Amazon EMR 5.20.0 以降でのデフォルト設定です。Amazon EMR 5.19.0 では、デフォルト値は false です。この値の設定については、「Amazon EMR 5.19.0 の EMRFS S3 向けに最適化されたコミッターを有効にする」を参照してください。

    • パーティション分割されていない Hive メタストアテーブルに書き込む場合は、spark.sql.hive.convertMetastoreParquettrue に設定する必要があります。これはデフォルトの設定です。

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter に設定する必要があります。これはデフォルトの設定です。

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol または org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol に設定する必要があります。EMR 5.x シリーズバージョン 5.30.0 以降、および EMR 6.x シリーズバージョン 6.2.0 以降の場合、org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol がデフォルト設定です。それよりも前の EMR バージョンの場合、org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol がデフォルト設定です。

    • Spark ジョブでパーティション分割された Parquet データセットを動的パーティション列で上書きする場合は、partitionOverwriteMode 書き込みオプションと spark.sql.sources.partitionOverwriteModestatic に設定する必要があります。これはデフォルトの設定です。

      注記

      partitionOverwriteMode 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode プロパティを設定します。

EMRFS S3 向けに最適化されたコミッターが使用されない場合

一般的に、EMRFS S3 最適化コミッターは、以下の状況では使用されません。

状況 コミッターが使われない理由
HDFS に書き込む場合 コミッターは、EMRFS を使用した Amazon S3 への書き込みのみをサポートします。
S3A ファイルシステムを使用する場合 コミッターは EMRFS のみをサポートします。
MapReduce スパークの RDD API を使用する場合 コミッターは、SparkSQL DataFrame、またはデータセット API の使用のみをサポートします。

以下の Scala の例は、EMRFS S3 に最適化されたコミッターを全体 (最初の例) と一部 (2 番目の例) で使用することを妨げるいくつかの追加の状況を示しています。

例 - 動的パーティション上書きモード

次の Scala の例では、Spark に別のコミットアルゴリズムを使用するよう指示しています。これにより、EMRFS S3 に最適化されたコミッターがまったく使用されなくなります。このコードは、partitionOverwriteModedynamicデータを書き込んでいるパーティションのみを上書きするようにプロパティを設定します。次に、動的パーティション列はで指定されpartitionByoverwrite書き込みモードはに設定されます。

EMRFS S3 最適化コミッターを使用しないようにするには、3 つの設定をすべて構成する必要があります。これを行うと、Spark は Spark のステージングディレクトリを使用する別のコミットアルゴリズムを実行します。ステージングディレクトリは、.spark-stagingで始まる出力場所の下に作成される一時ディレクトリです。このアルゴリズムはパーティションディレクトリの名前を順番に変更するため、パフォーマンスに悪影響を及ぼす可能性があります。

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

  1. タスク試行により、Spark のステージングディレクトリ (${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/ など) の下のパーティションディレクトリに出力が書き込まれます。

  2. 書き込まれたパーティションごとに、タスク試行は相対パーティションパス (k1=v1/k2=v2 など) を管理します。

  3. タスクが正常に完了すると、追跡されたすべての相対パーティションパスがドライバーに渡されます。

  4. すべてのタスクが完了した後、ジョブのコミットフェーズでは、成功したタスクの試行によって Spark のステージングディレクトリに書き込まれたすべてのパーティションディレクトリが収集されます。ディレクトリツリーの名前変更オペレーションを使用して、これらの各ディレクトリの名前が最終的な出力場所に順番に変更されます。

  5. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。

例 - カスタムのパーティション場所

この例では、Scala コードは 2 つのパーティションを挿入します。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミッターは、デフォルトのパーティション場所を使用するパーティションへのタスク出力の書き込みにのみ使用されます。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala コードは以下の Amazon S3 オブジェクトを作成します。

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。手順は以下のとおりです。

  1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

  2. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

  3. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

  4. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。