Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Anforderungen für das EMRFS-S3-optimierte Commit-Protokoll
Das EMRFS-S3-optimierte Commit-Protokoll wird verwendet, wenn die folgenden Bedingungen erfüllt sind:
-
Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden DataFrames, um partitionierte Tabellen zu überschreiben.
-
Sie führen Spark-Jobs aus, deren Partitionsüberschreibmodus
dynamic
ist. -
In Amazon EMR sind mehrteilige Uploads aktiviert. Dies ist die Standardeinstellung. Weitere Informationen finden Sie unter Das S3-optimierte EMRFS-Commit-Protokoll und mehrteilige Uploads.
-
Der Dateisystem-Cache für EMRFS ist aktiviert. Dies ist die Standardeinstellung. Vergewissern Sie sich, dass die Einstellung
fs.s3.impl.disable.cache
auffalse
gesetzt ist. -
Die integrierte Datenquellenunterstützung von Spark wird verwendet. Die integrierte Datenquellenunterstützung wird in den folgenden Fällen verwendet:
-
Wenn Aufträge in integrierte Datenquellen oder Tabellen schreiben.
-
Wenn Aufträge in die Parquet-Tabelle des Hive-Metastores schreiben. Das passiert, wenn
spark.sql.hive.convertInsertingPartitionedTable
undspark.sql.hive.convertMetastoreParquet
beide auf „wahr“ gesetzt sind. Dies sind die Standardeinstellungen. -
Wenn Aufträge in die ORC-Tabelle des Hive-Metastores schreiben. Das passiert, wenn
spark.sql.hive.convertInsertingPartitionedTable
undspark.sql.hive.convertMetastoreOrc
beide auftrue
gesetzt sind. Dies sind die Standardeinstellungen.
-
-
Spark-Auftrags-Vorgänge, die z. B. in einen Standard-Partitionsspeicherort schreiben, z. B.
${table_location}/k1=v1/k2=v2/
, verwenden das Commit-Protokoll. Das Protokoll wird nicht verwendet, wenn ein Jobvorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, beispielsweise wenn ein benutzerdefinierter Partitionsspeicherort mit dem BefehlALTER TABLE SQL
festgelegt wird. -
Für Spark müssen die folgenden Werte verwendet werden:
-
muss
spark.sql.sources.commitProtocolClass
auforg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
festgelegt sein. Dies ist die Standardeinstellung für Amazon-EMR-Versionen 5.30.0 und höher sowie 6.2.0 und höher. -
Die Schreiboption
partitionOverwriteMode
oderspark.sql.sources.partitionOverwriteMode
muss aufdynamic
gesetzt sein. Die Standardeinstellung lautetstatic
.Anmerkung
Die Schreiboption
partitionOverwriteMode
wurde mit Spark 2.4.0 eingeführt. Legen Sie für Spark-Version 2.3.2, die in Amazon-EMR-Version 5.19.0 enthalten ist, die Eigenschaftspark.sql.sources.partitionOverwriteMode
fest. -
Wenn Spark-Aufträge in die Hive-Metastore-Parquet-Tabelle überschreiben muss
spark.sql.hive.convertMetastoreParquet
,spark.sql.hive.convertInsertingPartitionedTable
undspark.sql.hive.convertMetastore.partitionOverwriteMode
auftrue
gesetzt werden. Dies sind die Standardeinstellungen. -
Wenn Spark-Aufträge in die Hive-Metastore-ORC-Tabelle überschreiben muss
spark.sql.hive.convertMetastoreOrc
,spark.sql.hive.convertInsertingPartitionedTable
undspark.sql.hive.convertMetastore.partitionOverwriteMode
auftrue
gesetzt werden. Dies sind die Standardeinstellungen.
-
Beispiel – Dynamischer Partitionsüberschreibmodus
In diesem Scala-Beispiel wird die Optimierung ausgelöst. Als erstes setzen Sie die partitionOverwriteMode
-Eigenschaft auf dynamic
. Dadurch werden nur die Partitionen überschrieben, auf die Sie Daten schreiben. Anschließend geben Sie dynamische Partitionsspalten mit partitionBy
an und legen den Schreibmodus auf overwrite
fest.
val dataset = spark.range(0, 10)
.withColumn("dt", expr("date_sub(current_date(), id)"))
dataset.write.mode("overwrite") // "overwrite" instead of "insert"
.option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static"
.partitionBy("dt") // partitioned data instead of unpartitioned data
.parquet("s3://amzn-s3-demo-bucket1/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
Wenn das EMRFS-S3-optimierte Commit-Protokoll nicht verwendet wird
Im Allgemeinen funktioniert das für EMRFS S3 optimierte Commit-Protokoll genauso wie das Open-Source-Standard-Spark-Commit-Protokoll. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
In den folgenden Situationen wird keine Optimierung durchgeführt.
Situation | Warum das Commit-Protokoll nicht verwendet wird |
---|---|
Wenn Sie in HDFS schreiben | Das Commit-Protokoll unterstützt nur das Schreiben in Amazon S3 mit EMRFS-Technologie. |
Bei Verwendung des S3A-Dateisystems | Das Commit-Protokoll unterstützt nur EMRFS. |
Wenn Sie die RDD-API von MapReduce Spark verwenden | Das Commit-Protokoll unterstützt nur die Verwendung von SparkSQL oder DataFrame Dataset. APIs |
Wenn das dynamische Überschreiben der Partition nicht ausgelöst wird | Das Commit-Protokoll optimiert nur Fälle, in denen dynamische Partitionen überschrieben werden. Für andere Fälle siehe EMRFS-S3-optimierte Committer verwenden. |
Die folgenden Scala-Beispiele veranschaulichen einige zusätzliche Situationen, die das EMRFS-S3-optimierte Commit-Protokoll an SQLHadoopMapReduceCommitProtocol
delegiert.
Beispiel – Dynamischer Partitionsüberschreibmodus mit benutzerdefiniertem Partitionsspeicherort
In diesem Beispiel überschreiben die Scala-Programme zwei Partitionen im dynamischen Partitionsüberschreibmodus. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Das S3-optimierte EMRFS-Commit-Protokoll wird nur zum Verbessern der Aufgabenausgabe in die Partition genutzt, die den Standard-Partitionsspeicherort verwendet.
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
spark.sql(s"""
CREATE TABLE $table (id bigint, dt date)
USING PARQUET PARTITIONED BY (dt)
LOCATION '$location'
""")
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
ALTER TABLE $table ADD PARTITION (dt='2019-01-28')
LOCATION '$customPartitionLocation'
""")
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
def asDate(text: String) = lit(text).cast("date")
spark.range(0, 10)
.withColumn("dt",
when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
.createTempView(inputView)
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
Anmerkung
Das Schreiben in benutzerdefinierte Partitionsspeicherorte in früheren Spark-Versionen kann zu Datenverlust führen. In diesem Beispiel dt='2019-01-28'
würde die Partition verloren gehen. Weitere Informationen finden Sie unter SPARK-35106
Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.
Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:
-
Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält eine zufällige UUID zum Schutz vor Datei-Kollisionen. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.
-
Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.
-
Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.
-
Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.