優化 Spark 效能 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

優化 Spark 效能

Amazon EMR 為 Spark 提供多種效能優化功能。本主題將詳細說明每個最佳化功能。

如需如何設定 Spark 組態的詳細資訊,請參閱 設定 Spark

自適應查詢執行

自適應查詢執行是一種依據執行期統計資料對查詢計畫進行重新優化的框架。從 Amazon EMR 5.30.0 開始,以下來自 Apache Spark 3 的自適應查詢執行優化也在 Apache Amazon EMR Runtime for Spark 2 上提供。

  • 自適應聯結轉換

  • 隨機分割區的自適應合併

自適應聯結轉換

自適應聯結轉換可根據查詢階段的執行時間大小將 broadcast-hash-joins 作 sort-merge-join業轉換為作業,藉此改善查詢效能。當連接的一側足夠小,以便在所有執行程序中有效地廣播其輸出時,B roadcast-hash-joins 傾向於表現更好,從而避免了對連接的兩側進行洗牌交換和排序的需要。自適應聯結轉換擴大 Spark 自動執行 broadcast-hash-joins時的情況範圍。

此功能預設為啟用。它可透過將 spark.sql.adaptive.enabled 設為 false 停用,但也會停用自適應查詢執行框架。當其中一個 sort-merge-join 連接端的運行 broadcast-hash-join 時大小統計信息不超過時,Spark 決定將 a 轉換為 aspark.sql.autoBroadcastJoinThreshold,默認為 10,485,760 字節(10 MiB)。

隨機分割區的自適應合併

隨機分割區的自適應合併小型接續隨機分割區,以避免過多小型任務產生額外負荷,從而提升查詢的效能。這可讓您預先設定較高數量的初始隨機分割區,然後在執行期縮減為目標大小,提高使分散式隨機分割區變得更均勻的機率。

除非明確設定 spark.sql.shuffle.partitions,否則此功能預設為啟用。它可透過將 spark.sql.adaptive.coalescePartitions.enabled 設為 true 啟用。使用 spark.sql.adaptive.coalescePartitions.minPartitionNumspark.sql.adaptive.advisoryPartitionSizeInBytes 屬性,可分別調整隨機分割區的初始數量和目標分割區大小。請見下表,了解此功能相關 Spark 屬性的更詳細資訊。

Spark 自適應合併分割區屬性
屬性 預設值 描述

spark.sql.adaptive.coalescePartitions.enabled

true,除非明確設定 spark.sql.shuffle.partitions

當為 true 且 spark.sql.adaptive.enabled 也設為 true 時,Spark 會依據目標大小合併接續的隨機分割區 (由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定),以避免有過多小型任務。

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 MB

合併時隨機分割區的建議大小 (以位元組為單位)。此組態僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 均為 true 時有影響。

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

合併後隨機分割區的最小數量。此組態僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 均為 true 時有影響。

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1000

合併前隨機分割區的初始數量。此組態僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 均為 true 時有影響。

動態分割區剔除

動態分割區清除可透過更準確選取表格中需要供特定查詢讀取和處理的特定分割區,來改善任務效能。透過降低讀取和處理的資料量,即可省下任務執行所用的大量時間。使用 Amazon EMR 5.26.0,此功能預設為啟用。在 Amazon EMR 5.24.0 和 5.25.0 中,您可以透過從 Spark 設定 Spark 屬性 spark.sql.dynamicPartitionPruning.enabled 或在建立叢集時,來啟用此功能。

Spark 動態分割區剔除分割區屬性
屬性 預設值 描述

spark.sql.dynamicPartitionPruning.enabled

true

如果為 true,啟用動態分割區剔除。

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

若為 true,Spark 會在查詢執行前執行防禦性檢查,以確保動態剔除篩選條件中廣播交換的重複使用不會被後續的準備規則 (例如使用者定義的單欄規則) 中斷。當重複使用被中斷且此組態為 true,Spark 會移除受影響的動態剔除篩選條件,以防範發生效能與正確性問題。當動態剔除篩選條件的廣播交換產生與對應聯結操作的廣播交換不同或不一致的結果時,即可能發生正確性問題。將此組態設為 false 時應保持謹慎;它支援規避一些情形,例如重複使用被使用者定義的單欄規則中斷。啟用「自適應查詢執行」時,始終強制執行廣播重複使用。

此最佳化可改善 Spark 2.4.1 現有功能,此版本僅支援下推可在計劃時間解析的靜態述詞。

以下是在 Spark 2.4.2 中下推的靜態述詞範例。

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

動態分割區清除允許 Spark 引擎在執行時間動態推斷需要讀取哪些分割區,以及可安全消除哪些分割區。例如,以下查詢包含兩個表格:store_sales 表格,其中包含所有商店的所有總銷售,並依區域分區,而 store_regions 表格包含每個國家的區域對應。此表格包含分佈於世界各地的商店相關資料,但我們只會查詢北美洲的資料。

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

在沒有使用動態分割區清除時,這個查詢會讀取所有區域,再篩選符合子查詢結果的區域子集。使用動態分割區清除時,這個查詢只會為在子查詢中傳回的區域讀取和處理分割區。這可透過讀取較少儲存區的資料和處理較少記錄來節省時間和資源。

扁平化純量子查詢

此最佳化可改善在相同表格具備純量子查詢的查詢效能。使用 Amazon EMR 5.26.0,此功能預設為啟用。在 Amazon EMR 5.24.0 和 5.25.0 中,您可以透過從 Spark 設定 Spark 屬性 spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled 或在建立叢集時,來啟用此功能。當此屬性設為 true,如有可能,此查詢最佳化工具會將使用相同關係的彙總純量子查詢扁平化。純量子查詢的扁平化是透過將子查詢中存在的任何述詞推送至彙總函數,然後再使用每個關係的所有彙總函數來執行一個彙總。

以下是將受益於此最佳化的查詢範本。

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

此最佳化會透過以下方式重寫先前的查詢:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

請注意,重寫查詢只會讀取一次學生的表格,三個子查詢的述詞會被推送至該 avg 函數。

INTERSECT 前的 DISTINCT

使用 INTERSECT 時此最佳化會將聯結最佳化。使用 Amazon EMR 5.26.0,此功能預設為啟用。在 Amazon EMR 5.24.0 和 5.25.0 中,您可以透過從 Spark 設定 Spark 屬性 spark.sql.optimizer.distinctBeforeIntersect.enabled 或在建立叢集時,來啟用此功能。使用 INTERSECT 的查詢會自動轉換為使用 left-semi 聯結。當這個屬性設定為 true 時,如果查詢最佳化工具偵測到 DISTINCT 運算子可以讓左半聯結 BroadcastHashJoin 而不是一個,就會將 DISTINCT 運算子推送至 INTERSECT 的子系。 SortMergeJoin

以下是將受益於此最佳化的查詢範本。

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

沒有啟用此屬性 spark.sql.optimizer.distinctBeforeIntersect.enabled 時,查詢的重寫方式如下。

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

啟用此屬性 spark.sql.optimizer.distinctBeforeIntersect.enabled 時,查詢的重寫方式如下。

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Bloom 篩選條件聯結

此最佳化可以使用從聯結另一側之值產生的 Bloom 篩選條件,預先篩選聯結的一側,進而提升某些聯結的效能。使用 Amazon EMR 5.26.0,此功能預設為啟用。在 Amazon EMR 5.25.0 中,您可以透過從 Spark 設定 Spark 屬性 spark.sql.bloomFilterJoin.enabledtrue 或在建立叢集時,來啟用此功能。

以下是受益於 Bloom 篩選條件的範例查詢。

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

啟用此功能時,Bloom 篩選條件是以其類別落於正接受查詢之類別集合的所有項目 ID 所建立。掃描銷售資料表時,Bloom 篩選條件用於判定哪些銷售是肯定不在 Bloom Filter 所定義之集合中的項目。因此能夠盡早篩選掉這些已識別的銷售。

優化的聯結重新排序

此最佳化可透過篩選條件重新排序涉及資料表的聯結,以提升查詢效能。使用 Amazon EMR 5.26.0,此功能預設為啟用。在 Amazon EMR 5.25.0 中,您可以將 Spark 組態參數 spark.sql.optimizer.sizeBasedJoinReorder.enabled 設為 true,來啟用此功能。如在查詢中所列出,在 Spark 中的預設行為是由左至右聯結資料表。此策略會以篩選條件跳過執行較小聯結的機會,以受益於之後成本較高的聯結。

以下範例查詢回報一國內所有商店的全部退貨項目。若無最佳化的聯結重新排序功能,Spark 會先聯結兩張大型資料表 store_salesstore_returns,然後將這些資料表與 store 聯結,最後與 item 聯結。

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

有了最佳化的聯結重新排序功能,由於 store 有篩選條件且小於 store_returnsbroadcastable,因此 Spark 會先將 store_salesstore 聯結。然後,Spark 與 store_returns 聯結,最後與 item 聯結。如果 item 有篩選條件且是 broadcastable,它也會符合重新排序的資格,因而讓 store_salesstore 聯結,然後與 item 聯結,最後再與 store_returns 聯結。