Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Requisitos para el comité optimizado para EMRFS S3
El comité EMRFS optimizado para S3 se utiliza cuando se cumplen las siguientes condiciones:
-
Ejecuta trabajos de Spark que utilizan Spark o Datasets para escribir archivos en Amazon S3. DataFrames A partir de Amazon EMR 6.4.0, este archivador se puede utilizar para todos los formatos habituales, incluidos los de parquetORC, y los basados en texto (incluidos y). CSV JSON Para las versiones anteriores a Amazon EMR 6.4.0, solo se admite el formato Parquet.
-
Las cargas multiparte están habilitadas en Amazon. EMR Esta es la opción predeterminada. Para obtener más información, consulte El archivador EMRFS optimizado para S3 y las cargas multiparte.
-
Se utiliza la compatibilidad de formatos de archivos integrados de Spark. La compatibilidad de formatos de archivos integrados se utiliza en las siguientes circunstancias:
-
Para las tablas de Hive Metastore, cuando
spark.sql.hive.convertMetastoreParquet
se establece en las mesas Parquet ospark.sql.hive.convertMetastoreOrc
se establece en las tablas Orc con Amazon EMR 6.4.0 o superior.true
true
Esta es la configuración predeterminada. -
Cuando los trabajos escriben orígenes de datos o tablas de formatos de archivos, por ejemplo, la tabla de destino se crea con la cláusula
USING parquet
. -
Cuando los trabajos escriben en tablas de Parquet de metaalmacenes de Hive no particionadas. La compatibilidad con Parquet integrada de Spark no admite tablas de Hive particionadas, se trata de una limitación conocida. Para obtener más información, consulte la conversión de tablas de Hive en Metastore Parquet
en la Guía de conjuntos de datos y Spark de Apache. DataFrames
-
-
Las operaciones de trabajos de Spark que escriben en una ubicación de partición predeterminada (por ejemplo,
${table_location}/k1=v1/k2=v2/
) utilizan el confirmador. El confirmador no se utiliza si una operación de trabajo escribe en una ubicación de partición personalizada, por ejemplo, si una ubicación de partición personalizada se establece con el comandoALTER TABLE SQL
. -
Se deben utilizar los valores siguientes para Spark:
-
La propiedad
spark.sql.parquet.fs.optimized.committer.optimization-enabled
debe establecerse entrue
. Esta es la configuración predeterminada en Amazon EMR 5.20.0 y versiones posteriores. Con Amazon EMR 5.19.0, el valor predeterminado es.false
Para obtener más información acerca de cómo configurar este valor, consulte Habilite el comité EMRFS optimizado para S3 para Amazon 5.19.0 EMR. -
Si se escribe en tablas del metaalmacén de Hive no particionadas, solo se admiten los formatos de archivo Parquet y Orc.
spark.sql.hive.convertMetastoreParquet
debe establecerse entrue
si se escribe en tablas Parquet no particionadas del metaalmacén de Hive.spark.sql.hive.convertMetastoreOrc
debe establecerse entrue
si se escribe en tablas Orc no particionadas del metaalmacén de Hive. Esta es la configuración predeterminada. -
spark.sql.parquet.output.committer.class
se debe establecer encom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
. Este es el valor predeterminado. -
spark.sql.sources.commitProtocolClass
debe estar establecido en o.org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
es la configuración predeterminada para la serie Amazon EMR 5.x, versión 5.30.0 y superior, y para la serie Amazon EMR 6.x, versión 6.2.0 y superior.org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
es la configuración por defecto de EMR las versiones anteriores de Amazon. -
Si los trabajos de Spark sobrescriben conjuntos de datos de Parquet particionados con columnas de partición, entonces la opción de escritura
partitionOverwriteMode
yspark.sql.sources.partitionOverwriteMode
se deben establecer enstatic
. Este es el valor predeterminado.nota
La opción de escritura
partitionOverwriteMode
se introdujo en Spark 2.4.0. Para la versión 2.3.2 de Spark, incluida en la EMR versión 5.19.0 de Amazon, defina la propiedad.spark.sql.sources.partitionOverwriteMode
-
Ocasiones en las que no se utiliza EMRFS un confirmador optimizado para S3
Por lo general, el EMRFS confirmador optimizado para S3 no se utiliza en las siguientes situaciones.
Situación | Por qué no se utiliza el confirmador |
---|---|
Cuando escribes a HDFS | El confirmante solo admite la escritura en Amazon S3 medianteEMRFS. |
Cuando utiliza el sistema de archivos S3A | El confirmador solo lo admite. EMRFS |
Cuando usas nuestro MapReduce Spark RDD API | El confirmador solo admite el uso de Spark SQL o DatasetAPIs. DataFrame |
Los siguientes ejemplos de Scala muestran algunas situaciones adicionales que impiden que el confirmador EMRFS optimizado para S3 se utilice en su totalidad (el primer ejemplo) y en parte (el segundo ejemplo).
ejemplo – Modo de sobrescritura de partición dinámica
El siguiente ejemplo de Scala indica a Spark que utilice un algoritmo de confirmación diferente, lo que impide por completo el uso del confirmador optimizado para S3. EMRFS El código establece la propiedad partitionOverwriteMode
en dynamic
para sobrescribir solo las particiones en las que escriba datos. A continuación, las columnas de particiones dinámicas se especifican mediante partitionBy
y el modo de escritura se establece en overwrite
.
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://
amzn-s3-demo-bucket1
/output")
Debes configurar los tres ajustes para evitar usar el confirmador optimizado para S3. EMRFS Al hacerlo, Spark ejecuta un algoritmo de confirmación diferente que se especifica en el protocolo de confirmación de Spark. Para las versiones de Amazon EMR 5.x anteriores a la 5.30.0 y para las versiones de Amazon EMR 6.x anteriores a la 6.2.0, el protocolo de confirmación usa el directorio provisional de Spark, que es un directorio temporal creado en la ubicación de salida que comienza con. .spark-staging
El algoritmo cambia de forma secuencial el nombre de los directorios de particiones, lo que puede afectar negativamente al rendimiento. Para obtener más información sobre las EMR versiones 5.30.0 y posteriores y 6.2.0 y posteriores de Amazon, consulte. Usa el protocolo de confirmación optimizado para S3 EMRFS
El algoritmo de Spark 2.4.0 sigue estos pasos:
-
La tarea intenta escribir su salida en los directorios de particiones en el directorio provisional de Spark (por ejemplo,
${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/
). -
Para cada partición escrita, la tarea intenta llevar un seguimiento de las rutas de partición relativas (por ejemplo,
k1=v1/k2=v2
). -
Cuando una tarea se completa de forma satisfactoria, proporciona al controlador todas las rutas de partición relativa cuyo seguimiento ha realizado.
-
Una vez completadas todas las tareas, la fase de confirmación del trabajo recopila todos los directorios de partición que los intentos de tarea correctos escribieron en el directorio de ensayo de Spark. Spark cambia el nombre de estos directorios de forma secuencial a su ubicación de salida final utilizando operaciones de cambio de nombre de árbol de directorio.
-
El directorio de ensayo se elimina antes de que se complete la fase de confirmación de trabajo.
ejemplo – Ubicación de partición personalizada
En este ejemplo, el código Scala se inserta en dos particiones. Una partición tiene una ubicación de partición personalizada. La otra partición utiliza la ubicación de partición predeterminada. El archivador EMRFS optimizado para S3 solo se usa para escribir el resultado de la tarea en la partición que usa la ubicación de partición predeterminada.
val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)
El código de Scala crea los siguientes objetos de Amazon S3:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
Al escribir en particiones de ubicaciones personalizadas, Spark utiliza un algoritmo de confirmación similar al del ejemplo anterior, que se detalla a continuación. Como ocurre en el ejemplo anterior, el algoritmo da como resultado cambios de nombre secuenciales, lo que puede afectar negativamente al rendimiento.
-
Al escribir la salida en una partición de una ubicación personalizada, las tareas escriben en un archivo del directorio de ensayo de Spark, que se crea en la ubicación de salida final. El nombre del archivo incluye un nombre aleatorio UUID para protegerlo contra las colisiones de archivos. La tarea intenta mantener un seguimiento de cada archivo junto con la ruta de salida deseada final.
-
Cuando una tarea se completa de forma satisfactoria, proporciona al controlador los archivos y las rutas de salida deseada final.
-
Una vez completadas todas las tareas, la fase de confirmación de trabajo cambia el nombre de forma secuencial de todos los archivos que se escribieron para particiones en ubicaciones personalizadas en sus rutas de salida finales.
-
El directorio de ensayo se elimina antes de que se complete la fase de confirmación de trabajo.