Optimalkan kinerja Spark - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Optimalkan kinerja Spark

Amazon EMR menyediakan beberapa fitur optimasi kinerja untuk Spark. Topik ini menerangkan setiap ciri pengoptimuman secara terperinci.

Untuk informasi selengkapnya tentang cara mengatur konfigurasi Spark, lihat. Konfigurasi Spark.

Eksekusi kueri adaptif

eksekusi query adaptif adalah kerangka kerja untuk mengoptimalkan rencana permintaan berdasarkan statistik runtime. Dimulai dengan Amazon EMR 5.30.0, berikut optimasi eksekusi query adaptif dari Apache Spark 3 tersedia di Apache EMR Runtime untuk Spark 2.

  • Konversi adaptif

  • Penggabungan adaptif dari partisi shuffle

Konversi adaptif

Adaptive bergabung konversi meningkatkan kinerja query dengan mengkonversi sort-merge-join operasi ke broadcast-hash-joins operasi berdasarkan ukuran runtime dari tahap query. Broadcast-hash-joins cenderung tampil lebih baik ketika salah satu sisi bergabung cukup kecil untuk secara efisien menyiarkan outputnya di semua pelaksana, sehingga menghindari kebutuhan untuk mengoyak pertukaran dan mengurutkan kedua sisi bergabung. Adaptive bergabung konversi memperluas jangkauan kasus ketika Spark secara otomatis tampil broadcast-hash-joins.

Fitur ini diaktifkan secara default. Hal ini dapat dinonaktifkan dengan menetapkan spark.sql.adaptive.enabledkefalse, yang juga menonaktifkan kerangka eksekusi kueri adaptif. Spark memutuskan untuk mengkonversi sort-merge-join ke broadcast-hash-join ketika ukuran runtime statistik dari salah satu bergabung sisi tidak melebihispark.sql.autoBroadcastJoinThreshold, yang defaultnya 10,485,760 byte (10 MiB).

Adaptif Coalescing Partisi Shuffle

adaptif koalescing partisi shuffle meningkatkan kinerja query dengan menggabungkan partisi mengoyak bersebelahan kecil untuk menghindari overhead memiliki terlalu banyak tugas kecil. Hal ini memungkinkan Anda untuk mengkonfigurasi jumlah yang lebih tinggi dari partisi shuffle awal dimuka yang kemudian akan berkurang pada saat runtime untuk ukuran yang ditargetkan, meningkatkan kemungkinan memiliki lebih merata partisi shuffle.

Fitur ini diaktifkan secara default kecuali jika spark.sql.shuffle.partitions secara eksplisit diatur. Hal ini dapat diaktifkan dengan menetapkan spark.sql.adaptive.coalescePartitions.enabled ke true. Kedua jumlah awal partisi shuffle dan ukuran partisi target dapat disetel menggunakan spark.sql.adaptive.coalescePartitions.minPartitionNum dan spark.sql.adaptive.advisoryPartitionSizeInBytes sifat masing-masing. Lihat tabel berikut untuk rincian lebih lanjut tentang sifat Spark terkait untuk fitur ini.

Spark adaptif menyatu properti partisi
Properti Nilai default Deskripsi

spark.sql.adaptive.coalescePartitions.enabled

true, kecuali spark.sql.shuffle.partitions secara eksplisit diatur

Ketika benar dan spark.sql.adaptive.enabled benar, Spark menyatukan partisi mengoyak bersebelahan sesuai dengan ukuran target (ditentukan oleh spark.sql.adaptive.advisoryPartitionSizeInBytes), untuk mengelakkan terlalu banyak tugas kecil.

spark.sql.adaptive.advisoryPartitionSizeInBytes

64MB

Ukuran penasehat dalam byte dari partisi shuffle ketika coalescing. Konfigurasi ini hanya memiliki efek ketika spark.sql.adaptive.enabled dan spark.sql.adaptive.coalescePartitions.enabled keduanya true.

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

Jumlah partisi shuffle minimum setelah menggabungkan. Konfigurasi ini hanya memiliki efek ketika spark.sql.adaptive.enabled dan spark.sql.adaptive.coalescePartitions.enabled keduanya true.

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1000

Jumlah awal partisi shuffle sebelum coalescing. Konfigurasi ini hanya memiliki efek ketika spark.sql.adaptive.enabled dan spark.sql.adaptive.coalescePartitions.enabled keduanya true.

Pemangkasan partisi dinamis

Pemangkasan partisi dinamis meningkatkan kinerja pekerjaan dengan lebih akurat memilih partisi tertentu dalam tabel yang perlu dibaca dan diproses untuk permintaan tertentu. Dengan mengurangi jumlah data yang dibaca dan diproses, waktu yang signifikan akan disimpan dalam pelaksanaan pekerjaan. Dengan Amazon EMR 5.26.0, fitur ini diaktifkan secara default. Dengan Amazon EMR 5.24.0 dan 5.25.0, Anda dapat mengaktifkan fitur ini dengan menetapkan properti Spark spark.sql.dynamicPartitionPruning.enabled dari dalam Spark atau saat membuat cluster.

Memicu properti partisi pemangkasan partisi dinamis
Properti nilai default Deskripsi

spark.sql.dynamicPartitionPruning.enabled

true

Bila benar, aktifkan pemangkasan partisi dinamis.

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

Kapantrue, Spark melakukan pemeriksaan defensif sebelum eksekusi kueri untuk memastikan bahwa penggunaan kembali pertukaran siaran dalam filter pemangkasan dinamis tidak dilanggar oleh aturan persiapan nanti, seperti aturan kolumnar yang ditentukan pengguna. Saat penggunaan kembali rusak dan konfigurasi initrue, Spark menghapus filter pemangkasan dinamis yang terpengaruh untuk mencegah masalah kinerja dan kebenaran. Masalah kebenaran mungkin timbul ketika pertukaran siaran filter pemangkasan dinamis menghasilkan hasil yang berbeda dan tidak konsisten dari pertukaran siaran dari operasi gabungan yang sesuai. Pengaturan konfigurasi inifalse harus dilakukan dengan hati-hati; memungkinkan bekerja di sekitar skenario, seperti ketika penggunaan kembali rusak oleh aturan kolumnar yang ditetapkan pengguna. Saat Eksekusi Kueri Adaptif diaktifkan, penggunaan kembali siaran selalu diberlakukan.

Optimasi ini meningkatkan kemampuan yang ada Spark 2.4.2, yang hanya mendukung mendorong turun predikat statis yang dapat diselesaikan pada waktu rencana.

Berikut ini adalah contoh dari statis predikat push down di Spark 2.4.2.

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

Pemangkasan partisi dinamis memungkinkan mesin Spark untuk secara dinamis menyimpulkan pada saat runtime partisi yang perlu dibaca dan yang dapat dengan aman dihilangkan. Sebagai contoh, query berikut melibatkan dua tabel: store_sales tabel yang berisi semua total penjualan untuk semua toko dan dipartisi oleh wilayah, dan store_regions tabel yang berisi pemetaan daerah untuk setiap negara. Tabel berisi data tentang toko yang didistribusikan di seluruh dunia, namun kami hanya menanyakan data untuk Amerika Utara.

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

Tanpa pemangkasan partisi dinamis, query ini akan membaca semua daerah sebelum menyaring subset dari daerah yang cocok dengan hasil subquery. Dengan pemangkasan partisi dinamis, query ini akan membaca dan memproses hanya partisi untuk daerah kembali dalam subquery. Ini menghemat waktu dan sumber daya dengan membaca lebih sedikit data dari penyimpanan dan memproses lebih sedikit catatan.

Perataan subqueries skalar

Optimalisasi ini meningkatkan kinerja query yang memiliki subqueries skalar atas meja yang sama. Dengan Amazon EMR 5.26.0, fitur ini diaktifkan secara default. Dengan Amazon EMR 5.24.0 dan 5.25.0, Anda dapat mengaktifkannya dengan menetapkan properti Spark spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled dari dalam Spark atau saat membuat cluster. Ketika properti ini diatur ke true, query optimizer flattens subqueries skalar agregat yang menggunakan hubungan yang sama jika mungkin. Subqueries skalar diratakan dengan mendorong predikat apapun hadir dalam subquery ke dalam fungsi agregat dan kemudian melakukan satu agregasi, dengan semua fungsi agregat, per relasi.

Berikut ini adalah contoh dari query yang akan mendapatkan keuntungan dari optimasi ini.

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

optimasi menulis ulang query sebelumnya sebagai:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

Perhatikan bahwa permintaan ditulis ulang membaca tabel mahasiswa hanya sekali, dan predikat dari tiga subqueries didorong ke avg fungsi.

berbeda sebelum INTERSECT

optimasi ini mengoptimalkan bergabung ketika menggunakan INTERSECT. Dengan Amazon EMR 5.26.0, fitur ini diaktifkan secara default. Dengan Amazon EMR 5.24.0 dan 5.25.0, Anda dapat mengaktifkannya dengan menetapkan properti Spark spark.sql.optimizer.distinctBeforeIntersect.enabled dari dalam Spark atau saat membuat cluster. Query menggunakan INTERSECT secara otomatis dikonversi untuk menggunakan kiri-semi bergabung. Ketika properti ini diatur ke true, query optimizer mendorong operator yang berbeda untuk anak-anak INTERSECT jika mendeteksi bahwa operator yang berbeda dapat membuat kiri-semi bergabung BroadcastHashJoin bukannya a SortMergeJoin.

Berikut ini adalah contoh dari query yang akan mendapatkan keuntungan dari optimasi ini.

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

Tanpa mengaktifkan properti ini spark.sql.optimizer.distinctBeforeIntersect.enabled, query akan ditulis ulang sebagai berikut.

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Bila Anda mengaktifkan properti ini spark.sql.optimizer.distinctBeforeIntersect.enabled, query akan ditulis ulang sebagai berikut.

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Gabung filter

optimasi ini dapat meningkatkan kinerja beberapa bergabung dengan pra-penyaringan satu sisi bergabung menggunakan Filter Bloom dihasilkan dari nilai-nilai dari sisi lain dari bergabung. Dengan Amazon EMR 5.26.0, fitur ini diaktifkan secara default. Dengan Amazon EMR 5.25.0, Anda dapat mengaktifkan fitur ini dengan menetapkan properti Spark spark.sql.bloomFilterJoin.enabled ke true dari dalam Spark atau saat membuat cluster.

Berikut ini adalah contoh query yang bisa mendapatkan keuntungan dari filter Bloom.

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

Ketika fitur ini diaktifkan, filter Bloom dibuat dari semua id item yang kategorinya ada di kumpulan kategori yang dipertanyakan. Sementara memindai tabel penjualan, filter Bloom digunakan untuk menentukan penjualan untuk item yang pasti tidak dalam set didefinisikan oleh filter Bloom. Dengan demikian penjualan diidentifikasi ini dapat disaring sedini mungkin.

Urutan ulang gabungan yang dioptimalkan

Optimasi ini dapat meningkatkan kinerja query dengan penataan kembali bergabung melibatkan tabel dengan filter. Dengan Amazon EMR 5.26.0, fitur ini diaktifkan secara default. Dengan Amazon EMR 5.25.0, Anda dapat mengaktifkan fitur ini dengan menetapkan parameter konfigurasi Spark spark.sql.optimizer.sizeBasedJoinReorder.enabled benar. Perilaku default di Spark adalah untuk bergabung tabel dari kiri ke kanan, seperti yang tercantum dalam query. Strategi ini dapat kehilangan peluang untuk mengeksekusi lebih kecil bergabung dengan filter pertama, untuk mendapatkan keuntungan lebih mahal bergabung kemudian.

Contoh query di bawah ini melaporkan semua item kembali dari semua toko di suatu negara. Tanpa dioptimalkan bergabung menyusun ulang, Spark bergabung dengan dua meja besar store_sales dan store_returns pertama, dan kemudian bergabung dengan mereka store dan akhirnya dengan item.

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

Dengan dioptimalkan bergabung menyusun ulang, Spark bergabung store_sales dengan store FIRST store memiliki filter dan lebih kecil dari store_returns dan broadcastable. Kemudian Spark bergabung dengan store_returns dan akhirnya dengan item. Jika item memiliki filter dan dapat disiarkan, itu juga akan memenuhi syarat untuk menyusun ulang, sehingga store_sales bergabung dengan store, kemudian item, dan akhirnya dengan store_returns.