EMRFS S3 向けに最適化されたコミッターの要件 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMRFS S3 向けに最適化されたコミッターの要件

以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミッターが使用されます。

  • Spark SQL、DataFrame、または Dataset を使用して Amazon S3 にファイルを書き込む Spark ジョブを実行する。Amazon EMR 6.4.0 以降では、Parquet、ORC、テキストベースの形式 (CSV と JSON を含む) など、一般的なあらゆる形式にこのコミッターを使用できます。Amazon EMR 6.4.0 より前のリリースバージョンでは、Parquet 形式のみがサポートされています。

  • Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「EMRFS S3 向けに最適化されたコミッターとマルチパートアップロード」を参照してください。

  • Spark の組み込み Parquet サポートが使用されている。組み込み Parquet サポートは以下の状況で使用されます。

    • spark.sql.hive.convertMetastoreParquettrue に設定されている場合。これはデフォルトの設定です。

    • ジョブによって Parquet データソースまたはテーブルに書き込まれる場合。例えば、ターゲットテーブルが USING parquet 句で作成される場合などです。

    • ジョブでパーティション分割されていない Hive メタストア Parquet テーブルに書き込む場合。Spark の組み込み Parquet サポートはパーティション分割された Hive テーブルをサポートしていません。これは既知の制限です。詳細については、次を参照してください。ハイブメタストア寄木細工テーブル変換Apache Spark SQLでは DataFrames およびデータセットガイド。

  • デフォルトのパーティションの場所 (${table_location}/k1=v1/k2=v2/ など) に書き込む Spark ジョブオペレーションでコミッターが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所が ALTER TABLE SQL コマンドを使用して設定されている場合、コミッターは使用されません。

  • Spark で以下の値を使用する。

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled プロパティを true に設定する必要があります。これは、Amazon EMR 5.20.0 以降でのデフォルト設定です。Amazon EMR 5.19.0 では、デフォルト値は false です。この値の設定については、「Amazon EMR 5.19.0 の EMRFS S3 向けに最適化されたコミッターを有効にする」を参照してください。

    • パーティション分割されていない Hive メタストアテーブルに書き込む場合は、spark.sql.hive.convertMetastoreParquettrue に設定する必要があります。これはデフォルトの設定です。

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter に設定する必要があります。これはデフォルトの設定です。

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol または org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol に設定する必要があります。EMR 5.x シリーズバージョン 5.30.0 以降、および EMR 6.x シリーズバージョン 6.2.0 以降の場合、org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol がデフォルト設定です。それよりも前の EMR バージョンの場合、org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol がデフォルト設定です。

    • Spark ジョブでパーティション分割された Parquet データセットを動的パーティション列で上書きする場合は、partitionOverwriteMode 書き込みオプションと spark.sql.sources.partitionOverwriteModestatic に設定する必要があります。これはデフォルトの設定です。

      注記

      partitionOverwriteMode 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode プロパティを設定します。

EMRFS S3 向けに最適化されたコミッターが使用されない場合

一般に、EMRFS S3 向けに最適化されたコミッターは、次のような状況では使用されません。

状況 コミッターが使用されない理由
HDFSに書き込むと コミッターは、EMRFS を使用した Amazon S3 への書き込みのみをサポートします。
S3A ファイルシステムを使用する場合 コミッターは EMRFS のみをサポートしています。
使用するバージョン MapReduce または Spark の RDD API コミッターは、SparkSQL、DataFrame、またはデータセット API の使用のみをサポートしています。

次の Scala 例では、EMRFS S3 向けに最適化されたコミッターを全体に使用しないもの (最初の例) と、部分的に使用しないもの (2 番目の例) を示しています。

例 - 動的パーティション上書きモード

次の Scala の例では、Spark に別のコミットアルゴリズムを使用するように指示しています。これにより、EMRFS S3 最適化コミッターが完全に使用されなくなります。このコードは、partitionOverwriteModeプロパティにdynamicを選択して、データを書き込むパーティションのみを上書きします。次に、動的パーティション列はpartitionByであり、書き込みモードはoverwrite

EMRFS S3 向けに最適化されたコミッターを使用しないように、3 つの設定をすべて構成する必要があります。これを行うと、Spark は Spark のステージングディレクトリ (で始まる出力場所に作成される一時ディレクトリ) を使用する別のコミットアルゴリズムを実行します。.spark-staging。アルゴリズムでは、パーティションディレクトリの名前が順番に変更されるため、パフォーマンスに悪影響を与える可能性があります。

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

  1. タスク試行により、Spark のステージングディレクトリ (${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/ など) の下のパーティションディレクトリに出力が書き込まれます。

  2. 書き込まれたパーティションごとに、タスク試行は相対パーティションパス (k1=v1/k2=v2 など) を管理します。

  3. タスクが正常に完了すると、追跡されたすべての相対パーティションパスがドライバーに渡されます。

  4. すべてのタスクが完了した後、ジョブのコミットフェーズでは、成功したタスクの試行によって Spark のステージングディレクトリに書き込まれたすべてのパーティションディレクトリが収集されます。ディレクトリツリーの名前変更オペレーションを使用して、これらの各ディレクトリの名前が最終的な出力場所に順番に変更されます。

  5. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。

例 - カスタムのパーティション場所

この例では、Scala コードは 2 つのパーティションを挿入します。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミッターは、デフォルトのパーティション場所を使用するパーティションへのタスク出力の書き込みにのみ使用されます。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala コードは以下の Amazon S3 オブジェクトを作成します。

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。手順は以下のとおりです。

  1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

  2. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

  3. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

  4. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。