本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
经 EMRFS S3 优化的提交程序的要求
满足以下条件时,将使用经 EMRFS S3 优化的提交程序:
-
您运行使用 Spark 或数据集将文件写入 Amazon S3 的 Spark 任务。 DataFrames从 Amazon EMR 6.4.0 开始,此提交程序可用于所有常见格式,包括 parquet、ORC 和基于文本的格式(包括 CSV 和 JSON)。对于 Amazon EMR 6.4.0 之前的发行版,仅支持 Parquet 格式。
-
分段上传在 Amazon EMR 中已启用。这是默认值。有关更多信息,请参阅 经 EMRFS S3 优化的提交程序和分段上传。
-
使用 Spark 的内置文件格式支持。内置文件格式支持用于以下情况:
-
对于 Hive 元存储表,当
spark.sql.hive.convertMetastoreParquet
设置为true
时,可用于 Parquet 表,或spark.sql.hive.convertMetastoreOrc
设置为true
时,可用于 Amazon EMR 6.4.0 或更高版本的 Orc 表。这些是默认设置。 -
当任务写入文件格式数据来源或表时,例如,使用
USING parquet
子句创建目标表。 -
当作业写入未分区的 Hive 元存储 Parquet 表时。Spark 的内置 Parquet 支持不支持分区的 Hive 表,这是一个已知限制。有关更多信息,请参阅《Apac he Spark》 DataFrames 和《数据集指南》中的 Hive metastore Parquet 表转换
。
-
-
写入默认分区位置的 Spark 任务操作,例如
${table_location}/k1=v1/k2=v2/
,使用提交程序。如果任务操作写入自定义分区位置,则不使用提交程序,例如,如果使用ALTER TABLE SQL
命令设置自定义分区位置。 -
必须使用 Spark 的以下值:
-
spark.sql.parquet.fs.optimized.committer.optimization-enabled
属性必须设置为true
。这是 Amazon EMR 5.20.0 及更高版本的默认设置。对于 Amazon EMR 5.19.0,默认值是false
。有关配置此值的信息,请参阅为 Amazon EMR 5.19.0 启用经 EMRFS S3 优化的提交程序。 -
如果写入非分区的 Hive 元存储表,则仅支持 Parquet 和 Orc 文件格式。如果写入非分区的 Parquet Hive 元存储表,则须将
spark.sql.hive.convertMetastoreParquet
设置为true
。如果写入非分区的 Orc Hive 元存储表,则spark.sql.hive.convertMetastoreOrc
须设置为true
。这些是默认设置。 -
spark.sql.parquet.output.committer.class
必须设置为com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
。这是默认设置。 -
必须将
spark.sql.sources.commitProtocolClass
设置为org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
或org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
。org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
是 Amazon EMR 5.x 系列 5.30.0 及更高版本以及 Amazon EMR 6.x 系列 6.2.0 及更高版本的默认设置。org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
是以前 Amazon EMR 版本的默认设置。 -
如果 Spark 作业用动态分区列覆盖分区的 Parquet 数据集,则
partitionOverwriteMode
写入选项和spark.sql.sources.partitionOverwriteMode
必须设置为static
。这是默认设置。注意
Spark 2.4.0 中引入了
partitionOverwriteMode
写入选项。对于随附了 Amazon EMR 版本 5.19.0 的 Spark 版本 2.3.2,请设置spark.sql.sources.partitionOverwriteMode
属性。
-
不使用经 EMRFS S3 优化的提交程序的情况
通常,经 EMRFS S3 优化的提交程序不会在以下情况下使用。
情况 | 为什么不使用提交程序 |
---|---|
当您向 HDFS 写入时 | 提交程序只支持使用 EMRFS 写入 Amazon S3。 |
当您使用 S3A 文件系统时 | 提交程序只支持 EMRFS。 |
当你使用 MapReduce 或 Spark 的 RDD API 时 | 提交者仅支持使用 sparkSQL DataFrame、或数据集。 APIs |
以下 Scala 示例演示了防止经 EMRFS S3 优化的提交程序被整个使用(第一个示例)和部分使用(第二个示例)的一些其他情况。
例 – 动态分区覆盖模式
以下 Scala 示例指示 Spark 使用不同的提交算法,这完全阻止了经 EMRFS S3 优化的提交程序的使用。该代码将 partitionOverwriteMode
属性设置为 dynamic
,以仅覆盖您的数据所写入到的分区。然后,通过 partitionBy
指定动态分区列,并将写入模式设置为 overwrite
。
val dataset = spark.range(0, 10)
.withColumn("dt", expr("date_sub(current_date(), id)"))
dataset.write.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.partitionBy("dt")
.parquet("s3://amzn-s3-demo-bucket1
/output")
您必须配置所有三项设置,以避免使用经 EMRFS S3 优化的提交程序。当您执行此操作时,Spark 会执行在 Spark 的提交协议中指定的不同提交算法。对于早于 5.30.0 的 Amazon EMR 5.x 发行版和早于 6.2.0 的 Amazon EMR 6.x 发行版,提交协议使用 Spark 的暂存目录,该目录是在以 .spark-staging
开头的输出位置下创建的临时目录。该算法按顺序对分区目录进行重命名,这可能会对性能产生负面影响。有关 Amazon EMR 发行版 5.30.0 及更高版本和 6.2.0 及更高版本的更多信息,请参阅 使用经 EMRFS S3 优化的提交协议。
Spark 2.4.0 中的算法遵循以下步骤:
-
任务尝试将其输出写入 Spark 的暂存目录下的分区目录,例如
${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/
。 -
对于写入的每个分区,任务尝试都跟踪相对分区路径,例如
k1=v1/k2=v2
。 -
任务成功完成后,它会为驱动程序提供它跟踪的所有相对分区路径。
-
完成所有任务后,作业提交阶段将收集成功任务尝试在 Spark 的暂存目录下写入的所有分区目录。Spark 使用目录树重命名操作按顺序将这些目录的每一个都重命名为其最终输出位置。
-
暂存目录会在作业提交阶段完成之前删除。
例 – 自定义分区位置
在此示例中,Scala 代码插入到两个分区中。一个分区具有自定义分区位置。另一个分区使用默认分区位置。经 EMRFS S3 优化的提交程序仅用于将任务输出写入到使用默认分区位置的分区。
val table = "dataset"
val location = "s3://bucket/table"
spark.sql(s"""
CREATE TABLE $table (id bigint, dt date)
USING PARQUET PARTITIONED BY (dt)
LOCATION '$location'
""")
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
ALTER TABLE $table ADD PARTITION (dt='2019-01-28')
LOCATION '$customPartitionLocation'
""")
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
def asDate(text: String) = lit(text).cast("date")
spark.range(0, 10)
.withColumn("dt",
when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
.write.insertInto(table)
Scala 代码创建以下 Amazon S3 对象:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
当写入到自定义位置的分区时,Spark 会使用类似于上一个示例的提交算法,如下所述。与前面的示例一样,该算法会导致顺序重命名,这可能会影响性能。
-
在将输出写入自定义位置的分区时,任务会写入到 Spark 的暂存目录下的文件中,该目录是在最终输出位置下创建的。该文件的名称包含一个随机 UUID,以防止文件冲突。任务尝试跟踪每个文件以及最终所需的输出路径。
-
在任务成功完成后,它会为驱动程序提供这些文件及其最终所需的输出路径。
-
所有任务都完成后,作业提交阶段会按顺序将为自定义位置的分区写入的所有文件重命名为其最终输出路径。
-
暂存目录会在作业提交阶段完成之前删除。