Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Requisiti per il protocollo di commit ottimizzato per S3 EMRFS
Il protocollo di commit EMRFS ottimizzato per S3 viene utilizzato quando vengono soddisfatte le seguenti condizioni:
-
Si eseguono job Spark che utilizzano Spark o Datasets per SQL sovrascrivere DataFrames le tabelle partizionate.
-
Vengono eseguiti processi Spark la cui modalità di sovrascrittura delle partizioni è
dynamic
. -
I caricamenti multiparte sono abilitati in Amazon. EMR Questa è l'impostazione predefinita. Per ulteriori informazioni, consulta Il protocollo di commit EMRFS ottimizzato per S3 e i caricamenti in più parti.
-
La cache del filesystem per è abilitata. EMRFS Questa è l'impostazione predefinita. Verifica che l'impostazione
fs.s3.impl.disable.cache
sia impostata sufalse
. -
Viene utilizzato il supporto integrato per le origini dei dati di Spark. Il supporto integrato per le origini dei dati viene utilizzato nelle seguenti circostanze:
-
Quando i processi scrivono su origini dei dati o tabelle integrate.
-
Quando i processi scrivono tabelle Parquet del metastore Hive. Ciò accade quando
spark.sql.hive.convertInsertingPartitionedTable
espark.sql.hive.convertMetastoreParquet
sono entrambi impostati su true. Queste sono le impostazioni predefinite. -
Quando i job scrivono nella tabella dei metastore di Hive. ORC Ciò accade quando
spark.sql.hive.convertInsertingPartitionedTable
espark.sql.hive.convertMetastoreOrc
sono entrambi impostati sutrue
. Queste sono le impostazioni predefinite.
-
-
Le operazioni di processo Spark che scrivono in una posizione di partizione predefinita, ad esempio
${table_location}/k1=v1/k2=v2/
, usano il protocollo di commit. Il protocollo non viene utilizzato se un'operazione di processo scrive in una posizione di partizione personalizzata, ad esempio se un percorso di partizione personalizzato è impostato utilizzando il comandoALTER TABLE SQL
. -
Devono essere utilizzati i seguenti valori per Spark:
-
spark.sql.sources.commitProtocolClass
deve essere impostato suorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
. Questa è l'impostazione predefinita per le EMR versioni di Amazon 5.30.0 e successive e 6.2.0 e successive. -
L'opzione di scrittura
partitionOverwriteMode
ospark.sql.sources.partitionOverwriteMode
deve essere impostata sudynamic
. L'impostazione predefinita èstatic
.Nota
L'opzione di scrittura
partitionOverwriteMode
è stata introdotta in Spark 2.4.0. Per la versione 2.3.2 di Spark, inclusa nella EMR versione Amazon 5.19.0, imposta la proprietà.spark.sql.sources.partitionOverwriteMode
-
Se i processi Spark sovrascrivono la tabella Parquet del metastore Hive,
spark.sql.hive.convertMetastoreParquet
,spark.sql.hive.convertInsertingPartitionedTable
espark.sql.hive.convertMetastore.partitionOverwriteMode
devono essere impostati sutrue
. Queste sono le impostazioni predefinite. -
Se i job Spark si sovrascrivono nella ORC tabella dei metastore di Hive,, e devono essere impostati su.
spark.sql.hive.convertMetastoreOrc
spark.sql.hive.convertInsertingPartitionedTable
spark.sql.hive.convertMetastore.partitionOverwriteMode
true
Queste sono le impostazioni predefinite.
-
Esempio - Modalità di sovrascrittura dinamica delle partizioni
In questo esempio di Scala, viene attivata l'ottimizzazione. Innanzitutto, imposta la proprietà partitionOverwriteMode
su dynamic
. Questo sovrascrive solo le partizioni in cui stai scrivendo i dati. Quindi, specifichi le colonne delle partizioni dinamiche con partitionBy
e imposti la modalità scrittura su overwrite
.
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
Quando non viene utilizzato il protocollo di commit ottimizzato per EMRFS S3
In genere, il protocollo di commit EMRFS ottimizzato per S3 funziona allo stesso modo del protocollo di commit SQL Spark predefinito open source,. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
L'ottimizzazione non si verifica nelle seguenti situazioni.
Situazione | Perché il protocollo di commit non viene utilizzato |
---|---|
Quando scrivi a HDFS | Il protocollo commit supporta solo la scrittura su Amazon S3 utilizzando. EMRFS |
Quando si utilizza il file system S3A | Il protocollo di commit supporta EMRFS solo. |
Quando usi MapReduce o Spark RDD API | Il protocollo di commit supporta solo l'uso di Spark o SQL DataFrame Dataset. APIs |
Quando la sovrascrittura dinamica delle partizioni non viene attivata | Il protocollo di commit ottimizza solo i casi di sovrascrittura dinamica delle partizioni. Per altri casi, consulta la sezione Usa il committer ottimizzato per S3 EMRFS. |
I seguenti esempi di Scala mostrano alcune situazioni aggiuntive a cui delega il protocollo di commit EMRFS ottimizzato per S3. SQLHadoopMapReduceCommitProtocol
Esempio - Modalità di sovrascrittura delle partizioni con posizione della partizione personalizzata
In questo esempio, i programmi Scala sovrascrivono due partizioni in modalità di sovrascrittura dinamica delle partizioni. Una partizione ha una posizione di partizione personalizzata. L'altra partizione usa il percorso di partizione predefinito. Il protocollo di commit EMRFS ottimizzato per S3 migliora solo la partizione che utilizza la posizione di partizione predefinita.
val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
Il codice Scala crea i seguenti oggetti Amazon S3:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
Nota
La scrittura in posizioni di partizione personalizzate nelle versioni precedenti di Spark può causare la perdita di dati. In questo esempio, la partizione dt='2019-01-28'
andrebbe persa. Per ulteriori dettagli, vedere -35106. SPARK
Quando si scrive nelle partizioni in posizioni personalizzate, Spark utilizza un algoritmo di commit simile all'esempio precedente, che è descritto di seguito. Come nell'esempio precedente, l'algoritmo produce ridenominazioni sequenziali che possono influire negativamente sulle prestazioni.
L'algoritmo in Spark 2.4.0 segue questi passaggi:
-
Quando si scrive l'output in una partizione in una posizione personalizzata, le attività scrivono in un file nella directory di gestione temporanea di Spark, che viene creata nella posizione di output finale. Il nome del file include un codice casuale UUID per la protezione contro le collisioni di file. Il tentativo di attività tiene traccia di ogni file insieme al percorso di output finale desiderato.
-
Quando un'attività viene completata correttamente, fornisce al driver i file e i relativi percorsi di output finali desiderati.
-
Al termine di tutte le attività, la fase di commit dei lavori consente di rinominare in sequenza tutti i file scritti per le partizioni in percorsi personalizzati nei relativi percorsi di output finali.
-
La directory di gestione temporanea viene eliminata prima del completamento della fase di commit del processo.