Persyaratan untuk protokol komit yang EMRFS dioptimalkan S3 - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Persyaratan untuk protokol komit yang EMRFS dioptimalkan S3

Protokol komit yang EMRFS dioptimalkan S3 digunakan ketika kondisi berikut terpenuhi:

  • Anda menjalankan pekerjaan Spark yang menggunakan SparkSQL, DataFrames, atau Datasets untuk menimpa tabel yang dipartisi.

  • Anda menjalankan pekerjaan Spark yang mode timpa partisi. dynamic

  • Unggahan multibagian diaktifkan di Amazon. EMR Ini adalah opsi default. Untuk informasi selengkapnya, lihat Protokol komit yang EMRFS dioptimalkan S3 dan unggahan multibagian.

  • Cache sistem file untuk EMRFS diaktifkan. Ini adalah opsi default. Periksa apakah pengaturan fs.s3.impl.disable.cache diatur kefalse.

  • Dukungan sumber data bawaan Spark digunakan. Dukungan sumber data bawaan digunakan dalam keadaan berikut:

    • Ketika pekerjaan menulis ke sumber data bawaan atau tabel.

    • Ketika pekerjaan menulis ke meja Parket Hive metastore. Ini terjadi ketika spark.sql.hive.convertInsertingPartitionedTable dan spark.sql.hive.convertMetastoreParquet keduanya disetel ke true. Ini adalah pengaturan default.

    • Ketika pekerjaan menulis ke tabel ORC metastore Hive. Ini terjadi ketika spark.sql.hive.convertInsertingPartitionedTable dan spark.sql.hive.convertMetastoreOrc keduanya diatur ketrue. Ini adalah pengaturan default.

  • Spark job operations yang menulis ke lokasi partisi default — misalnya, ${table_location}/k1=v1/k2=v2/ — menggunakan protokol commit. Protokol tidak digunakan jika operasi pekerjaan menulis ke lokasi partisi kustom - misalnya, jika lokasi partisi kustom diatur menggunakan ALTER TABLE SQL perintah.

  • Nilai berikut untuk Spark mesti digunakan:

    • spark.sql.sources.commitProtocolClass harus diatur ke org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol. Ini adalah pengaturan default untuk Amazon EMR merilis 5.30.0 dan lebih tinggi, dan 6.2.0 dan lebih tinggi.

    • Opsi partitionOverwriteMode tulis atau spark.sql.sources.partitionOverwriteMode harus diatur kedynamic. Pengaturan default-nya adalah static.

      catatan

      Parameter partitionOverwriteMode menulis opsi diperkenalkan di Spark 2.4.0. Untuk Spark versi 2.3.2, disertakan dengan EMR rilis Amazon 5.19.0, atur properti. spark.sql.sources.partitionOverwriteMode

    • Jika pekerjaan Spark menimpa ke meja Parket Hive metastore,, spark.sql.hive.convertMetastoreParquetspark.sql.hive.convertInsertingPartitionedTable, dan harus diatur ke. spark.sql.hive.convertMetastore.partitionOverwriteMode true Ada pengaturan default.

    • Jika pekerjaan Spark menimpa ke ORC tabel metastore Hive,, spark.sql.hive.convertMetastoreOrcspark.sql.hive.convertInsertingPartitionedTable, dan spark.sql.hive.convertMetastore.partitionOverwriteMode harus diatur ke. true Ada pengaturan default.

contoh — Mode penimpaan partisi dinamis

Dalam contoh Scala ini, optimasi dipicu. Pertama, Anda mengatur partitionOverwriteMode properti kedynamic. Ini hanya menimpa partisi yang Anda tulis data. Kemudian, Anda menentukan kolom partisi dinamis dengan partitionBy dan mengatur mode tulis keoverwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"

Ketika protokol komit yang EMRFS dioptimalkan S3 tidak digunakan

Umumnya, protokol komit yang EMRFS dioptimalkan S3 bekerja sama dengan protokol SQL komit Spark default open source,. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol Optimasi tidak akan terjadi dalam situasi berikut.

Situasi Mengapa protokol komit tidak digunakan
Ketika Anda menulis ke HDFS Protokol komit hanya mendukung penulisan ke Amazon S3 menggunakan. EMRFS
Saat Anda menggunakan sistem file S3A Protokol komit hanya mendukungEMRFS.
Saat Anda menggunakan MapReduce atau Spark RDD API Protokol komit hanya mendukung penggunaan SparkSQL, DataFrame, atau APIs Dataset.
Saat penimpaan partisi dinamis tidak dipicu Protokol komit hanya mengoptimalkan kasus penimpaan partisi dinamis. Untuk kasus lain, lihatGunakan committer yang EMRFS dioptimalkan S3.

Contoh Scala berikut menunjukkan beberapa situasi tambahan yang didelegasikan oleh protokol komit yang EMRFS dioptimalkan oleh S3. SQLHadoopMapReduceCommitProtocol

contoh - Mode penimpaan partisi dinamis dengan lokasi partisi khusus

Dalam contoh ini, program Scala menimpa dua partisi dalam mode penimpaan partisi dinamis. Satu partisi memiliki lokasi partisi kustom. Partisi lain menggunakan lokasi partisi default. Protokol komit yang EMRFS dioptimalkan S3 hanya meningkatkan partisi yang menggunakan lokasi partisi default.

val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")

Kode Scala menciptakan objek Amazon S3 berikut:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
catatan

Menulis ke lokasi partisi khusus di versi Spark sebelumnya dapat mengakibatkan kehilangan data. Dalam contoh ini, partisi dt='2019-01-28' akan hilang. Untuk lebih jelasnya, lihat SPARK-35106. Ini diperbaiki di Amazon EMR rilis 5.33.0 dan yang lebih baru, tidak termasuk 6.0.x dan 6.1.x.

Ketika menulis ke partisi di lokasi kustom, Spark menggunakan algoritma komit mirip dengan contoh sebelumnya, yang diuraikan di bawah ini. Seperti contoh sebelumnya, algoritme menghasilkan penggantian nama berurutan, yang dapat berdampak negatif pada kinerja.

Algoritma di Spark 2.4.0 mengikuti langkah-langkah berikut:

  1. Ketika menulis output ke partisi di lokasi kustom, tugas menulis ke file di bawah direktori pementasan Spark ini, yang dibuat di bawah lokasi output akhir. Nama file termasuk acak UUID untuk melindungi terhadap tabrakan file. Upaya tugas melacak setiap file bersama dengan path output akhir yang diinginkan.

  2. Ketika tugas selesai berhasil, menyediakan driver dengan file dan akhir yang diinginkan output jalan mereka.

  3. Setelah semua tugas selesai, pekerjaan commit fase berurutan mengganti nama semua file yang ditulis untuk partisi di lokasi kustom ke jalur output akhir mereka.

  4. Direktori pementasan dihapus sebelum pekerjaan komit fase selesai.