EMRFS S3 向けに最適化されたコミットプロトコルの要件
以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミットプロトコルが使用されます。
-
Spark SQL、DataFrame、または Dataset によりパーティション化されたテーブルを書き込む Spark ジョブを実行する。
-
パーティション上書きモードが
dynamic
である Spark ジョブを実行する。 -
Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「EMRFS S3 向けに最適化されたコミットプロトコルとマルチパートアップロード」を参照してください。
-
EMRFS のファイルシステムキャッシュは有効になっています。これがデフォルトです。
fs.s3.impl.disable.cache
設定がfalse
に設定されていることを確認します。 -
Spark のビルトインデータソースサポートが使用されています。組み込みデータソースサポートは以下の状況で使用されます。
-
ジョブでビルトインのデータソースまたはテーブルに書き込む場合。
-
ジョブで Hive メタストア Parquet テーブルに書き込む場合。これは
spark.sql.hive.convertInsertingPartitionedTable
とspark.sql.hive.convertMetastoreParquet
が両方とも true に設定されている場合に発生します。これらはデフォルトの設定です。 -
ジョブで Hive メタストア ORC テーブルに書き込む場合。これは、
spark.sql.hive.convertInsertingPartitionedTable
とspark.sql.hive.convertMetastoreOrc
が両方true
に設定されている場合に発生します。これらはデフォルトの設定です。
-
-
デフォルトのパーティションの場所 (
${table_location}/k1=v1/k2=v2/
など) に書き込む Spark ジョブオペレーションでコミットプロトコルが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所がALTER TABLE SQL
コマンドを使用して設定されている場合、プロトコルは使用されません。 -
Spark で以下の値を使用する。
-
spark.sql.sources.commitProtocolClass
をorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
に設定する必要があります。これは、Amazon EMR リリース 5.30.0 以降および 6.2.0 以降のデフォルト設定です。 -
partitionOverwriteMode
書き込みオプションまたはspark.sql.sources.partitionOverwriteMode
はdynamic
に設定する必要があります。デフォルトの設定は、static
です。注記
partitionOverwriteMode
書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode
プロパティを設定します。 -
Spark ジョブが Hive メタストアの Parquet テーブルに上書きされる場合は、
spark.sql.hive.convertMetastoreParquet
、spark.sql.hive.convertInsertingPartitionedTable
およびspark.sql.hive.convertMetastore.partitionOverwriteMode
をtrue
に設定する必要があります。これらはデフォルトの設定です。 -
Spark ジョブが Hive メタストアの ORC テーブルに上書きされる場合は、
spark.sql.hive.convertMetastoreOrc
、spark.sql.hive.convertInsertingPartitionedTable
およびspark.sql.hive.convertMetastore.partitionOverwriteMode
をtrue
に設定する必要があります。これらはデフォルトの設定です。
-
例 - 動的パーティション上書きモード
この Scala の例では、最適化がトリガーされます。まず、partitionOverwriteMode
プロパティを dynamic
に設定します。これにより、データを書き込んでいるパーティションのみが上書きされます。次に、動的パーティション列が partitionBy
で指定され、書き込みモードが overwrite
に設定されます。
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use EMR file system, instead of "s3a://" or "hdfs://"
EMRFS S3 向けに最適化されたコミットプロトコルが使用されない場合
一般的に、EMRFS S3 向けに最適化されたコミットプロトコルは、オープンソースのデフォルトの Spark SQL コミットプロトコル org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
と同じように機能します。次の状況では、最適化は行われません。
状況 | コミットプロトコルが使用されない理由 |
---|---|
HDFS に書き込む場合 | コミットプロトコルは、EMRFS を使用した Amazon S3 への書き込みのみをサポートします。 |
S3A ファイルシステムを使用する場合 | コミットプロトコルは EMRFS のみをサポートします。 |
MapReduce または Spark の RDD API を使用する場合 | コミットプロトコルは、SparkSQL、DataFrame、またはデータセット API の使用のみをサポートします。 |
動的パーティションの上書きがトリガーされない場合 | コミットプロトコルは、動的なパーティション上書きの場合のみを最適化します。その他の場合については、「EMRFS S3 向けに最適化されたコミッターを使用する」を参照してください。 |
次の Scala の例は、EMRFS S3 向けに最適化されたコミットプロトコルが SQLHadoopMapReduceCommitProtocol
に委任するその他の状況を示しています。
例 - カスタムパーティションの場所を使用する動的パーティション上書きモード
この例では、Scala プログラムは動的パーティション上書きモードで 2 つのパーティションを上書きします。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミットプロトコルは、デフォルトのパーティション場所を使用するパーティションのみを改善します。
val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
Scala コードは以下の Amazon S3 オブジェクトを作成します。
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
注記
以前の Spark バージョンでは、カスタムパーティションの場所に書き込むと、データが失われる可能性があります。この例では、パーティション dt='2019-01-28'
が失われます。詳細については「SPARK-35106
カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。
Spark 2.4.0 のアルゴリズムは以下の手順に従います。
-
カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。
-
タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。
-
すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。
-
ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。