翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
EMRFS S3 向けに最適化されたコミッターの要件
以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミッターが使用されます。
-
Spark SQL、、、、、 DataFrames、またはまたはまたはまたはまたはまたはまたはまたはまたはまたはまたはまたは Spark ジョブを実行して Amazon S3 にファイルを書き込む。Amazon EMR 6.4.0 以降では、Parquet、ORC、テキストベースの形式 (CSV と JSON を含む) など、一般的なあらゆる形式にこのコミッターを使用できます。Amazon EMR 6.4.0 より前のリリースでは、Parquet 形式のみがサポートされています。
-
Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「EMRFS S3 向けに最適化されたコミッターとマルチパートアップロード」を参照してください。
-
Spark の組み込みファイル形式のサポートが使用されます。組み込みのファイル形式サポートは、次のような場合に使用されます。
-
Hive メタストアテーブルでは、Parquet テーブルでは when
spark.sql.hive.convertMetastoreParquet
spark.sql.hive.convertMetastoreOrc
true
がに設定され、EMR 6.4.0 以降の Orc テーブルではに設定されます。true
これらはデフォルト設定です。 -
ジョブによってファイル形式のデータソースまたはテーブルに書き込まれる場合 (例えば、ターゲットテーブルが)、
USING parquet
句を使用して作成される場合などです。 -
ジョブでパーティション分割されていない Hive メタストア Parquet テーブルに書き込む場合。Spark の組み込み Parquet サポートはパーティション分割された Hive テーブルをサポートしていません。これは既知の制限です。詳細については、Apache Spark SQL の「Hive メタストア Parquet テーブル変換
」 DataFrames および「データセットガイド」を参照してください。
-
-
デフォルトのパーティションの場所 (
${table_location}/k1=v1/k2=v2/
など) に書き込む Spark ジョブオペレーションでコミッターが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所がALTER TABLE SQL
コマンドを使用して設定されている場合、コミッターは使用されません。 -
Spark で以下の値を使用する。
-
spark.sql.parquet.fs.optimized.committer.optimization-enabled
プロパティをtrue
に設定する必要があります。これは、Amazon EMR 5.20.0 以降でのデフォルト設定です。Amazon EMR 5.19.0 では、デフォルト値はfalse
です。この値の設定については、「Amazon EMR 5.19.0 の EMRFS S3 向けに最適化されたコミッターを有効にする」を参照してください。 -
パーティション化されていない Hive メタストアテーブルに書き込む場合は、Parquet と Orc のファイル形式のみがサポートされます。
spark.sql.hive.convertMetastoreParquet
パーティション化されていない Paquet Hivetrue
メタストアテーブルに書き込む場合は、に設定する必要があります。spark.sql.hive.convertMetastoreOrc
パーティション化されていない Orc Hivetrue
メタストアテーブルに書き込む場合は、に設定する必要があります。これらはデフォルト設定です。 -
spark.sql.parquet.output.committer.class
をcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
に設定する必要があります。これはデフォルトの設定です。 -
spark.sql.sources.commitProtocolClass
はorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
またはorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
に設定する必要があります。EMR 5.x シリーズバージョン 5.30.0 以降、および EMR 6.x シリーズバージョン 6.2.0 以降の場合、org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
がデフォルト設定です。それよりも前の EMR バージョンの場合、org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
がデフォルト設定です。 -
Spark ジョブでパーティション分割された Parquet データセットを動的パーティション列で上書きする場合は、
partitionOverwriteMode
書き込みオプションとspark.sql.sources.partitionOverwriteMode
をstatic
に設定する必要があります。これはデフォルトの設定です。注記 partitionOverwriteMode
書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode
プロパティを設定します。
-
EMRFS S3 向けに最適化されたコミッターが使用されない場合
一般的に、EMRFS 最適化コミッターは、次の状況では使用されません。
状況 | コミッターが使われない理由 |
---|---|
HDFS に書き込むとき | コミッターは EMRFS を使用した Amazon S3 への書き込みのみをサポートします。 |
S3A ファイルシステムを使用する場合 | コミッターは EMRFS のみをサポートします。 |
当社の Spark の RDD API を使用する場合 MapReduce | コミッターは SparkSQL、 DataFrame、またはデータセット API の使用のみをサポートします。 |
以下の Scala の例では、EMRFS S3 に最適化されたコミッターを全体的に (最初の例で) 使用したり、部分的に (2 番目の例で) 使用したりすることを妨げるいくつかの追加の状況を示しています。
例 -動的パーティション上書きモード
以下の Scala の例では、Spark に別のコミットアルゴリズムを使用するように指示しています。これにより、EMRFS S3 に最適化されたコミッターはまったく使用されません。このコードは、partitionOverwriteMode
dynamic
データを書き込んでいるパーティションのみを上書きするようにプロパティを設定します。次に、動的パーティション列がによって指定されpartitionBy
、overwrite
書き込みモードがに設定されます。
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://
EXAMPLE-DOC-BUCKET
/output")
EMRFS S3 最適化コミッターの使用を避ける必要があります。これを行うと、Spark は Spark のコミットプロトコルで指定されている別のコミットアルゴリズムを実行します。5.30.0 より前の Amazon EMR 5.x リリースと 6.2.0 より前の Amazon EMR 6.x リリースの場合、コミットプロトコルは Spark のステージングディレクトリを使用します。ステージングディレクトリは、で始まる出力場所に作成される一時ディレクトリです.spark-staging
。このアルゴリズムはパーティションディレクトリの名前を順番に変更するので、パフォーマンスに悪影響を及ぼす可能性があります。Amazon EMR リリース 5.30.0 以降および 6.2.0 以降の詳細については、を参照してくださいEMRFS S3 最適化コミット・プロトコルを使用する。
Spark 2.4.0 のアルゴリズムは以下の手順に従います。
-
タスク試行により、Spark のステージングディレクトリ (
${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/
など) の下のパーティションディレクトリに出力が書き込まれます。 -
書き込まれたパーティションごとに、タスク試行は相対パーティションパス (
k1=v1/k2=v2
など) を管理します。 -
タスクが正常に完了すると、追跡されたすべての相対パーティションパスがドライバーに渡されます。
-
すべてのタスクが完了した後、ジョブのコミットフェーズでは、成功したタスクの試行によって Spark のステージングディレクトリに書き込まれたすべてのパーティションディレクトリが収集されます。ディレクトリツリーの名前変更オペレーションを使用して、これらの各ディレクトリの名前が最終的な出力場所に順番に変更されます。
-
ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。
例 -カスタムのパーティション場所
この例では、Scala コードは 2 つのパーティションを挿入します。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミッターは、デフォルトのパーティション場所を使用するパーティションへのタスク出力の書き込みにのみ使用されます。
val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)
Scala コードは以下の Amazon S3 オブジェクトを作成します。
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。前の例と同様に、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスに悪影響を及ぼす可能性があります。
-
カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。
-
タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。
-
すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。
-
ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。