Amazon EMR
Amazon EMR Release Guide

The AWS Documentation website is getting a new look!
Try it now and let us know what you think. Switch to the new look >>

You can return to the original look by selecting English in the language selector above.

Requirements for the EMRFS S3-Optimized Committer

The EMRFS S3-optimized committer is used when the following conditions are met:

  • You run Spark jobs that use Spark SQL, DataFrames, or Datasets to write Parquet files.

  • Multipart uploads are enabled in Amazon EMR. This is the default. For more information, see The EMRFS S3-optimized Committer and Multipart Uploads.

  • Spark's built-in Parquet support is used. Built-in Parquet support is used in the following circumstances:

    • spark.sql.hive.convertMetastoreParquet set to true. This is the default setting.

    • When jobs write to Parquet data sources or tables—for example, the target table is created with the USING parquet clause.

    • When jobs write to non-partitioned Hive metastore Parquet tables. Spark's built-in Parquet support does not support partitioned Hive tables, which is a known limitation. For more information, see Hive metastore Parquet table conversion in the Apache Spark SQL, DataFrames and Datasets Guide.

  • Spark job operations that write to a default partition location—for example, ${table_location}/k1=v1/k2=v2/—use the committer. The committer is not used if a job operation writes to a custom partition location—for example, if a custom partition location is set using the ALTER TABLE SQL command.

  • The following values for Spark must be used:

    • The spark.sql.parquet.fs.optimized.committer.optimization-enabled property must be set to true. This is the default setting with Amazon EMR 5.20.0 and later. With Amazon EMR 5.19.0, the default value is false. For information about configuring this value, see Enabling the EMRFS S3-optimized Committer for Amazon EMR 5.19.0.

    • spark.sql.hive.convertMetastoreParquet must be set to true if writing to non-partitioned Hive metastore tables. This is the default setting.

    • spark.sql.parquet.output.committer.class must be set to com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter. This is the default setting.

    • spark.sql.sources.commitProtocolClass must be set to org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol. This is the default setting.

    • If Spark jobs overwrite partitioned Parquet datasets with dynamic partition columns, then the partitionOverwriteMode write option and spark.sql.sources.partitionOverwriteMode must be set to static. This is the default setting.

      Note

      The partitionOverwriteMode write option was introduced in Spark 2.4.0. For Spark version 2.3.2, included with Amazon EMR release 5.19.0, set the spark.sql.sources.partitionOverwriteMode property.

When the EMRFS S3-optimized Committer is Not Used

The committer is not used under the following circumstances:

  • When writing to HDFS

  • When using the S3A file system

  • When using an output format other than Parquet, such as ORC or text

  • When using MapReduce or Spark's RDD API

The following examples demonstrate implementations written in Scala that do not use the EMRFS S3-optimized committer in whole (the first example) and in part (the second example).

Example –Dynamic Partition Overwrite Mode

In the following Scala code, the committer is not used because partitionOverwriteMode is set to dynamic, dynamic partition columns are specified by partitionBy, and the write mode is set to overwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://bucket/output")

In this example, instead of using the EMRFS S3-optimized committer or any configured output committer, Spark executes a different commit algorithm that uses Spark's staging directory, which is a temporary directory created under the output location that starts with .spark-staging. The algorithm results in sequential renames of partition directories, which may negatively impact performance.

The algorithm in Spark 2.4.0 follows these steps:

  1. Task attempts write their output to partition directories under Spark's staging directory—for example, ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. For each partition written, the task attempt keeps track of relative partition paths—for example, k1=v1/k2=v2.

  3. When a task completes successfully, it provides the driver with all relative partition paths that it tracked.

  4. After all tasks complete, the job commit phase collects all the partition directories that successful task attempts wrote under Spark's staging directory. Spark sequentially renames each of these directories to its final output location using directory tree rename operations.

  5. The staging directory is deleted before the job commit phase completes.

Example –Custom Partition Location

In this example, the Scala code inserts into two partitions. One partition has a custom partition location. The other partition uses the default partition location. The EMRFS S3-optimized committer is only used for writing task output to the partition that uses the default partition location.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

The Scala code creates the following Amazon S3 objects:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

When writing to partitions at custom locations, Spark uses a commit algorithm similar to the previous example, which is outlined below. As with the earlier example, the algorithm results in sequential renames, which may negatively impact performance. steps:

  1. When writing output to a partition at a custom location, tasks write to a file under Spark's staging directory, which is created under the final output location. The name of the file includes a random UUID to protect against file collisions. The task attempt keeps track of each file along with the final desired output path.

  2. When a task completes successfully, it provides the driver with the files and their final desired output paths.

  3. After all tasks complete, the job commit phase sequentially renames all files that were written for partitions at custom locations to their final output paths.

  4. The staging directory is deleted before the job commit phase completes.