Anforderungen für das EMRFS S3-optimierte Commit-Protokoll - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Anforderungen für das EMRFS S3-optimierte Commit-Protokoll

Das EMRFS S3-optimierte Commit-Protokoll wird verwendet, wenn die folgenden Bedingungen erfüllt sind:

  • Sie führen Spark-Jobs aus, die Spark oder Datasets verwenden SQL DataFrames, um partitionierte Tabellen zu überschreiben.

  • Sie führen Spark-Jobs aus, deren Partitionsüberschreibmodus dynamic ist.

  • Mehrteilige Uploads sind in Amazon aktiviert. EMR Dies ist die Standardeinstellung. Weitere Informationen finden Sie unter Das EMRFS S3-optimierte Commit-Protokoll und mehrteilige Uploads.

  • Der Dateisystem-Cache für EMRFS ist aktiviert. Dies ist die Standardeinstellung. Vergewissern Sie sich, dass die Einstellung fs.s3.impl.disable.cache auf false gesetzt ist.

  • Die integrierte Datenquellenunterstützung von Spark wird verwendet. Die integrierte Datenquellenunterstützung wird in den folgenden Fällen verwendet:

    • Wenn Aufträge in integrierte Datenquellen oder Tabellen schreiben.

    • Wenn Aufträge in die Parquet-Tabelle des Hive-Metastores schreiben. Das passiert, wenn spark.sql.hive.convertInsertingPartitionedTable und spark.sql.hive.convertMetastoreParquet beide auf „wahr“ gesetzt sind. Dies sind die Standardeinstellungen.

    • Wenn Jobs in die ORC Hive-Metastore-Tabelle schreiben. Das passiert, wenn spark.sql.hive.convertInsertingPartitionedTable und spark.sql.hive.convertMetastoreOrc beide auf true gesetzt sind. Dies sind die Standardeinstellungen.

  • Spark-Auftrags-Vorgänge, die z. B. in einen Standard-Partitionsspeicherort schreiben, z. B. ${table_location}/k1=v1/k2=v2/, verwenden das Commit-Protokoll. Das Protokoll wird nicht verwendet, wenn ein Jobvorgang in einen benutzerdefinierten Partitionsspeicherort schreibt, beispielsweise wenn ein benutzerdefinierter Partitionsspeicherort mit dem Befehl ALTER TABLE SQL festgelegt wird.

  • Für Spark müssen die folgenden Werte verwendet werden:

    • muss spark.sql.sources.commitProtocolClass auf org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol festgelegt sein. Dies ist die Standardeinstellung für EMR Amazon-Versionen 5.30.0 und höher sowie 6.2.0 und höher.

    • Die Schreiboption partitionOverwriteMode oder spark.sql.sources.partitionOverwriteMode muss auf dynamic gesetzt sein. Die Standardeinstellung lautet static.

      Anmerkung

      Die Schreiboption partitionOverwriteMode wurde mit Spark 2.4.0 eingeführt. Für Spark-Version 2.3.2, die in der EMR Amazon-Version 5.19.0 enthalten ist, legen Sie die Eigenschaft fest. spark.sql.sources.partitionOverwriteMode

    • Wenn Spark-Aufträge in die Hive-Metastore-Parquet-Tabelle überschreiben muss spark.sql.hive.convertMetastoreParquet, spark.sql.hive.convertInsertingPartitionedTable und spark.sql.hive.convertMetastore.partitionOverwriteMode auf true gesetzt werden. Dies sind die Standardeinstellungen.

    • Wenn Spark-Jobs die ORC Hive-Metastore-Tabelle überschreiben, spark.sql.hive.convertMetastoreOrcspark.sql.hive.convertInsertingPartitionedTable, und muss auf gesetzt werden. spark.sql.hive.convertMetastore.partitionOverwriteMode true Dies sind die Standardeinstellungen.

Beispiel – Dynamischer Partitionsüberschreibmodus

In diesem Scala-Beispiel wird die Optimierung ausgelöst. Als erstes setzen Sie die partitionOverwriteMode-Eigenschaft auf dynamic. Dadurch werden nur die Partitionen überschrieben, auf die Sie Daten schreiben. Anschließend geben Sie dynamische Partitionsspalten mit partitionBy an und legen den Schreibmodus auf overwrite fest.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"

Wenn das EMRFS S3-optimierte Commit-Protokoll nicht verwendet wird

Im Allgemeinen funktioniert das EMRFS S3-optimierte Commit-Protokoll genauso wie das SQL Open-Source-Standard-Spark-Commit-Protokoll. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol In den folgenden Situationen wird keine Optimierung durchgeführt.

Situation Warum das Commit-Protokoll nicht verwendet wird
Wenn Sie schreiben an HDFS Das Commit-Protokoll unterstützt nur das Schreiben in Amazon S3 mithilfe vonEMRFS.
Bei Verwendung des S3A-Dateisystems Das Commit-Protokoll unterstützt nurEMRFS.
Wenn Sie MapReduce oder Sparks verwenden RDD API Das Commit-Protokoll unterstützt nur die Verwendung von Spark SQL DataFrame,, oder DatasetAPIs.
Wenn das dynamische Überschreiben der Partition nicht ausgelöst wird Das Commit-Protokoll optimiert nur Fälle, in denen dynamische Partitionen überschrieben werden. Für andere Fälle siehe Verwenden Sie den EMRFS S3-optimierten Committer.

Die folgenden Scala-Beispiele veranschaulichen einige zusätzliche Situationen, in die das EMRFS S3-optimierte Commit-Protokoll delegiert. SQLHadoopMapReduceCommitProtocol

Beispiel – Dynamischer Partitionsüberschreibmodus mit benutzerdefiniertem Partitionsspeicherort

In diesem Beispiel überschreiben die Scala-Programme zwei Partitionen im dynamischen Partitionsüberschreibmodus. Eine Partition verfügt über einen benutzerdefinierten Partitionsspeicherort. Die andere Partition verwendet den Standard-Partitionsspeicherort. Das EMRFS S3-optimierte Commit-Protokoll verbessert nur die Partition, die den Standardspeicherort der Partition verwendet.

val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")

Der Scala-Code erstellt die folgenden Amazon-S3-Objekte:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
Anmerkung

Das Schreiben in benutzerdefinierte Partitionsspeicherorte in früheren Spark-Versionen kann zu Datenverlust führen. In diesem Beispiel dt='2019-01-28' würde die Partition verloren gehen. Weitere Informationen finden Sie unter SPARK -35106. Dies wurde in EMR Amazon-Version 5.33.0 und höher behoben, mit Ausnahme von 6.0.x und 6.1.x.

Beim Schreiben in Partitionen an benutzerdefinierten Speicherorten verwendet Spark einen Commit-Algorithmus, ähnlich wie im vorherigen Beispiel. Dies wird im Folgenden beschrieben. Genau wie bei dem vorherigen Beispiel führt der Algorithmus zu sequenziellen Umbenennungen, wodurch die Leistung beeinträchtigt werden kann.

Der Algorithmus in Spark 2.4.0 folgt diesen Schritten:

  1. Beim Schreiben von Ausgabe in eine Partition an einem benutzerdefinierten Speicherort werden Aufgaben in eine Datei unter dem Staging-Verzeichnis von Spark geschrieben, das unter dem endgültigen Ausgabespeicherort erstellt wird. Der Name der Datei enthält zum Schutz vor Dateikollisionen einen zufälligen WertUUID. Der Aufgabe-Versuch verfolgt jede Datei zusammen mit dem gewünschten endgültigen Ausgabepfad nach.

  2. Wenn eine Aufgabe erfolgreich abgeschlossen wird, stellt sie dem Treiber die Dateien und die für sie gewünschten endgültigen Ausgabepfade bereit.

  3. Nachdem alle Aufgaben beendet wurden, benennt die Auftrags-Commit-Phase alle Dateien, die für Partitionen an benutzerdefinierten Speicherorten geschrieben wurden, sequentiell in ihre endgültigen Ausgabepfade um.

  4. Das Staging-Verzeichnis wird gelöscht, bevor der Auftrags-Commit-Phase abgeschlossen ist.