翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
EMRFS S3-optimizedコミットプロトコルの要件
EMRFS S3-optimized 最適化コミットプロトコルは、次の条件が満たされた場合に使用されます。
-
Spark SQL、 DataFrames、またはデータセットを使用してパーティションテーブルを上書きする Spark ジョブを実行します。
-
パーティション上書きモードが
dynamic
である Spark ジョブを実行する。 -
マルチパートアップロードは Amazon EMR で有効になっています。これがデフォルトです。詳細については、「EMRFS S3-optimizedコミットプロトコルとマルチパートアップロード」を参照してください。
-
のファイルシステムキャッシュEMRFSが有効になっています。これがデフォルトです。
fs.s3.impl.disable.cache
設定がfalse
に設定されていることを確認します。 -
Spark のビルトインデータソースサポートが使用されています。組み込みデータソースサポートは以下の状況で使用されます。
-
ジョブでビルトインのデータソースまたはテーブルに書き込む場合。
-
ジョブで Hive メタストア Parquet テーブルに書き込む場合。これは
spark.sql.hive.convertInsertingPartitionedTable
とspark.sql.hive.convertMetastoreParquet
が両方とも true に設定されている場合に発生します。これらはデフォルトの設定です。 -
ジョブが Hive メタストアORCテーブルに書き込む場合。これは、
spark.sql.hive.convertInsertingPartitionedTable
とspark.sql.hive.convertMetastoreOrc
が両方true
に設定されている場合に発生します。これらはデフォルトの設定です。
-
-
デフォルトのパーティションの場所 (
${table_location}/k1=v1/k2=v2/
など) に書き込む Spark ジョブオペレーションでコミットプロトコルが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所がALTER TABLE SQL
コマンドを使用して設定されている場合、プロトコルは使用されません。 -
Spark で以下の値を使用する。
-
spark.sql.sources.commitProtocolClass
をorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
に設定する必要があります。これは、Amazon EMRリリース 5.30.0 以降、および 6.2.0 以降のデフォルト設定です。 -
partitionOverwriteMode
書き込みオプションまたはspark.sql.sources.partitionOverwriteMode
はdynamic
に設定する必要があります。デフォルトの設定は、static
です。注記
partitionOverwriteMode
書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMRリリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode
プロパティを設定します。 -
Spark ジョブが Hive メタストアの Parquet テーブルに上書きされる場合は、
spark.sql.hive.convertMetastoreParquet
、spark.sql.hive.convertInsertingPartitionedTable
およびspark.sql.hive.convertMetastore.partitionOverwriteMode
をtrue
に設定する必要があります。これらはデフォルトの設定です。 -
Spark ジョブが Hive メタストアORCテーブルに上書きされる場合、
spark.sql.hive.convertMetastoreOrc
、spark.sql.hive.convertInsertingPartitionedTable
、および を に設定spark.sql.hive.convertMetastore.partitionOverwriteMode
する必要がありますtrue
。これらはデフォルトの設定です。
-
例 - 動的パーティション上書きモード
この Scala の例では、最適化がトリガーされます。まず、partitionOverwriteMode
プロパティを dynamic
に設定します。これにより、データを書き込んでいるパーティションのみが上書きされます。次に、動的パーティション列が partitionBy
で指定され、書き込みモードが overwrite
に設定されます。
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
EMRFS S3-optimizedコミットプロトコルが使用されていない場合
通常、EMRFSS3-optimizedコミットプロトコルは、オープンソースのデフォルト Spark SQLコミットプロトコル と同じように機能しますorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
。次の状況では、最適化は行われません。
状況 | コミットプロトコルが使用されない理由 |
---|---|
に書き込む場合 HDFS | コミットプロトコルは、 を使用した Amazon S3 への書き込みのみをサポートしますEMRFS。 |
S3A ファイルシステムを使用する場合 | コミットプロトコルは のみをサポートしますEMRFS。 |
MapReduce または Spark の RDD API | コミットプロトコルは、Spark SQL、 DataFrame、またはデータセット の使用のみをサポートしますAPIs。 |
動的パーティションの上書きがトリガーされない場合 | コミットプロトコルは、動的なパーティション上書きの場合のみを最適化します。その他の場合については、「EMRFS S3-optimizedコミッターを使用する」を参照してください。 |
次の Scala の例は、S3-optimizedコミットプロトコルが EMRFS に委任する追加の状況を示していますSQLHadoopMapReduceCommitProtocol
。
例 - カスタムパーティションの場所を使用する動的パーティション上書きモード
この例では、Scala プログラムは動的パーティション上書きモードで 2 つのパーティションを上書きします。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3-optimized 最適化コミットプロトコルは、デフォルトのパーティションの場所を使用するパーティションのみを改善します。
val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
Scala コードは以下の Amazon S3 オブジェクトを作成します。
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
注記
以前の Spark バージョンでは、カスタムパーティションの場所に書き込むと、データが失われる可能性があります。この例では、パーティション dt='2019-01-28'
が失われます。詳細については、SPARK「-35106
カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。
Spark 2.4.0 のアルゴリズムは以下の手順に従います。
-
カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの衝突から保護UUIDするためのランダムな が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。
-
タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。
-
すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。
-
ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。