Requisiti per il protocollo di commit ottimizzato per S3 EMRFS - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Requisiti per il protocollo di commit ottimizzato per S3 EMRFS

Il protocollo di commit ottimizzato per S3 EMRFS viene utilizzato quando si verificano le condizioni riportate di seguito:

  • Si eseguono job Spark che utilizzano Spark SQL o Datasets per sovrascrivere le DataFrames tabelle partizionate.

  • Vengono eseguiti processi Spark la cui modalità di sovrascrittura delle partizioni è dynamic.

  • I caricamenti in più parti sono abilitati in Amazon EMR. Questa è l'impostazione predefinita. Per ulteriori informazioni, consulta Protocollo di commit ottimizzato per S3 EMRFS e caricamenti in più parti.

  • La cache del file system per EMRFS è abilitata. Questa è l'impostazione predefinita. Verifica che l'impostazione fs.s3.impl.disable.cache sia impostata su false.

  • Viene utilizzato il supporto integrato per le origini dei dati di Spark. Il supporto integrato per le origini dei dati viene utilizzato nelle seguenti circostanze:

    • Quando i processi scrivono su origini dei dati o tabelle integrate.

    • Quando i processi scrivono tabelle Parquet del metastore Hive. Ciò accade quando spark.sql.hive.convertInsertingPartitionedTable e spark.sql.hive.convertMetastoreParquet sono entrambi impostati su true. Queste sono le impostazioni predefinite.

    • Quando i processi scrivono tabelle ORC del metastore Hive. Ciò accade quando spark.sql.hive.convertInsertingPartitionedTable e spark.sql.hive.convertMetastoreOrc sono entrambi impostati su true. Queste sono le impostazioni predefinite.

  • Le operazioni di processo Spark che scrivono in una posizione di partizione predefinita, ad esempio ${table_location}/k1=v1/k2=v2/, usano il protocollo di commit. Il protocollo non viene utilizzato se un'operazione di processo scrive in una posizione di partizione personalizzata, ad esempio se un percorso di partizione personalizzato è impostato utilizzando il comando ALTER TABLE SQL.

  • Devono essere utilizzati i seguenti valori per Spark:

    • spark.sql.sources.commitProtocolClass deve essere impostato su org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol. Questa è l'impostazione predefinita per Amazon EMR rilascio 5.30.0 e successivi, e rilascio 6.2.0 e successivi.

    • L'opzione di scrittura partitionOverwriteMode o spark.sql.sources.partitionOverwriteMode deve essere impostata su dynamic. L'impostazione predefinita è static.

      Nota

      L'opzione di scrittura partitionOverwriteMode è stata introdotta in Spark 2.4.0. Per Spark versione 2.3.2, incluso con Amazon EMR rilascio 5.19.0, imposta la proprietà spark.sql.sources.partitionOverwriteMode.

    • Se i processi Spark sovrascrivono la tabella Parquet del metastore Hive, spark.sql.hive.convertMetastoreParquet, spark.sql.hive.convertInsertingPartitionedTable e spark.sql.hive.convertMetastore.partitionOverwriteMode devono essere impostati su true. Queste sono le impostazioni predefinite.

    • Se i processi Spark sovrascrivono la tabella ORC del metastore Hive, spark.sql.hive.convertMetastoreOrc, spark.sql.hive.convertInsertingPartitionedTable e spark.sql.hive.convertMetastore.partitionOverwriteMode devono essere impostati su true. Queste sono le impostazioni predefinite.

Esempio - Modalità di sovrascrittura dinamica delle partizioni

In questo esempio di Scala, viene attivata l'ottimizzazione. Innanzitutto, imposta la proprietà partitionOverwriteMode su dynamic. Questo sovrascrive solo le partizioni in cui stai scrivendo i dati. Quindi, specifichi le colonne delle partizioni dinamiche con partitionBy e imposti la modalità scrittura su overwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"

Quando il protocollo di commit ottimizzato per S3 EMRFS non viene utilizzato

In genere, il protocollo di commit ottimizzato per S3 EMRFS funziona allo stesso modo del protocollo di commit SQL Spark predefinito open source, org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol. L'ottimizzazione non si verifica nelle seguenti situazioni.

Situazione Perché il protocollo di commit non viene utilizzato
Quando si scrive su HDFS Il protocollo di commit supporta solo la scrittura su Amazon S3 utilizzando EMRFS.
Quando si utilizza il file system S3A Il protocollo di commit supporta solo EMRFS.
Quando utilizzi l'API RDD di Spark MapReduce Il protocollo di commit supporta solo l'utilizzo di API SparkSQL o Dataset. DataFrame
Quando la sovrascrittura dinamica delle partizioni non viene attivata Il protocollo di commit ottimizza solo i casi di sovrascrittura dinamica delle partizioni. Per altri casi, consulta la sezione Utilizzare il committer ottimizzato S3 EMRFS.

I seguenti esempi di Scala mostrano altre situazioni che il protocollo di commit ottimizzato per S3 EMRFS delega SQLHadoopMapReduceCommitProtocol.

Esempio - Modalità di sovrascrittura delle partizioni con posizione della partizione personalizzata

In questo esempio, i programmi Scala sovrascrivono due partizioni in modalità di sovrascrittura dinamica delle partizioni. Una partizione ha una posizione di partizione personalizzata. L'altra partizione usa il percorso di partizione predefinito. Il protocollo di commit ottimizzato per S3 EMRFS viene utilizzato solo per scrivere l'output dell'attività nella partizione che utilizza la posizione della partizione predefinita.

val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")

Il codice Scala crea i seguenti oggetti Amazon S3:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
Nota

La scrittura in posizioni di partizione personalizzate nelle versioni precedenti di Spark può causare la perdita di dati. In questo esempio, la partizione dt='2019-01-28' andrebbe persa. Per ulteriori dettagli, consulta SPARK-35106. Questo problema è stato risolto in Amazon EMR rilascio 5.33.0 e successivi, esclusi i rilasci 6.0.x e 6.1.x.

Quando si scrive nelle partizioni in posizioni personalizzate, Spark utilizza un algoritmo di commit simile all'esempio precedente, che è descritto di seguito. Come nell'esempio precedente, l'algoritmo produce ridenominazioni sequenziali che possono influire negativamente sulle prestazioni.

L'algoritmo in Spark 2.4.0 segue questi passaggi:

  1. Quando si scrive l'output in una partizione in una posizione personalizzata, le attività scrivono in un file nella directory di gestione temporanea di Spark, che viene creata nella posizione di output finale. Il nome del file include un UUID casuale per proteggere il file de collisioni. Il tentativo di attività tiene traccia di ogni file insieme al percorso di output finale desiderato.

  2. Quando un'attività viene completata correttamente, fornisce al driver i file e i relativi percorsi di output finali desiderati.

  3. Al termine di tutte le attività, la fase di commit dei lavori consente di rinominare in sequenza tutti i file scritti per le partizioni in percorsi personalizzati nei relativi percorsi di output finali.

  4. La directory di gestione temporanea viene eliminata prima del completamento della fase di commit del processo.