本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
優化洗牌
某些作業 (例如join()
和groupByKey()
) 需要 Spark 才能執行隨機播放。隨機播放是 Spark 重新分配資料的機制,以便在RDD分割區之間以不同的方式分組資料。洗牌可協助修正效能瓶頸。但是,由於洗牌通常涉及在 Spark 執行程序之間複製數據,因此隨機播放是一個複雜且昂貴的操作。例如,隨機播放會產生下列成本:
-
磁碟 I/O:
-
在磁碟上產生大量的中繼檔案。
-
-
網路 I/O:
-
需要許多網絡連接(連接數 =
Mapper × Reducer
)。 -
由於記錄彙總到可能託管在不同 Spark 執行程序上的新分RDD區,因此您的數據集中很大一部分可能會在網絡上的 Spark 執行程序之間移動。
-
-
CPU和內存負載:
-
排序值並合併資料集。這些操作計劃在執行人身上,對執行人造成沉重的負荷。
-
隨機播放是 Spark 應用程式效能下降的最重要因素之一。在存儲中間數據時,它可能會耗盡執行程序的本地磁盤上的空間,這會導致 Spark 作業失敗。
您可以在 CloudWatch 指標和 Spark UI 中評估隨機播放效能。
CloudWatch 度量
如果隨機播放字節寫入值與隨機字節讀取相比較高,您的 Spark 作業可能會使用隨機播放操join()
groupByKey()
Spark UI
在 Spark UI 的「舞台」索引標籤上,您可以檢查「隨機播放讀取大小/記錄」值。您也可以在「執行者」選項卡上看到它。
在下面的屏幕截圖中,每個執行者與隨機播放過程交換大約 18.6GB/4020000 條記錄,總隨機播放讀取大小約為 75 GB)。
隨機溢出 (磁碟) 欄會顯示大量的資料溢滿記憶體至磁碟,這可能會造成磁碟已滿或效能問題。
如果您觀察到這些症狀,而且階段與您的績效目標相比需要太長的時間,或者失敗Out Of Memory
或No space
left on device
錯誤,請考慮下列解決方案。
優化加入
連接資料表的join()
作業是最常用的隨機播放作業,但通常是效能瓶頸。由於 join 是一項昂貴的操作,因此我們建議您不要使用它,除非它對您的業務需求至關重要。請提出以下問題,仔細檢查您是否有效地使用資料管線:
-
您是否正在重新計算也在其他可重複使用的工作中執行的連接?
-
您是否加入以將外鍵解析為輸出消費者未使用的值?
確認加入作業對您的業務需求至關重要之後,請參閱下列選項,以符合您需求的方式最佳化您的聯結。
加入前使用下推
在執行聯結 DataFrame 之前,請先篩選出中不必要的列和欄。這具有以下優點:
-
減少隨機播放期間的資料傳輸量
-
減少了 Spark 執行程序處理的量
-
減少資料掃描量
# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])
使用 DataFrame 加入
嘗試使用 Spark 高級別dyf.toDF()
。如 Apache Spark 中「關鍵主題」一節所述,這些聯結作業會在內部利用 Catalyst 最佳化工具的查詢最佳化功能。
隨機播放和廣播雜湊連接和提示
星火支持兩種類型的連接:隨機連接和廣播哈希加入。廣播哈希連接不需要混洗,並且與隨機連接相比,它可能需要更少的處理。但是,僅在將小桌子連接到大桌子時才適用。當加入可容納單個 Spark 執行程序內存的表時,請考慮使用廣播哈希聯接。
下圖顯示了廣播哈希聯接和隨機連接的高級結構和步驟。
每個聯結的詳細資訊如下:
-
隨機加入:
-
洗牌哈希連接連接兩個表,而不排序和分配兩個表之間的連接。它適用於可以存儲在 Spark 執行程序內存中的小表的連接。
-
排序合併連接分配兩個表通過鍵連接和加入之前對它們進行排序。它適用於大型表格的連接。
-
-
廣播哈希加入:
-
廣播雜湊聯結會將較小的RDD或資料表推送至每個工作節點。然後它會將地圖側與較大RDD或表格的每個分區結合在一起。
當您的RDDs或表中的一個可以放入內存或可以設置為適合內存時,它適用於連接。盡可能進行廣播哈希連接是有益的,因為它不需要隨機播放。您可以使用連接提示從 Spark 請求廣播加入,如下所示。
# DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;
如需有關聯結提示的詳細資訊,請參閱聯結提示
。
-
在 AWS Glue 3.0 及更新版本中,您可以啟用「調適性查詢執行
在 AWS Glue 3.0 中,您可以通過設置啟用自適應查詢執行spark.sql.adaptive.enabled=true
。在 AWS Glue 4.0 中預設會啟用調適性查詢執行。
您可以設置與洗牌和廣播哈希連接相關的其他參數:
-
spark.sql.adaptive.localShuffleReader.enabled
-
spark.sql.adaptive.autoBroadcastJoinThreshold
如需相關參數的詳細資訊,請參閱將排序合併聯結轉換為廣播聯結
在 AWS Glue 3.0 和更高版本中,您可以使用其他連接提示進行隨機播放來調整您的行為。
-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;
使用分段設定
排序合併連接需要兩個階段,隨機排序和排序,然後合併。這兩個階段可能會超載 Spark 執行程序,並導致OOM和性能問題,當一些執行者正在合併和其他執行程序同時排序。在這種情況下,可能可以通過使用分段
分組表格對下列項目非常有用:
-
經常透過同一個金鑰關連的資料,例如
account_id
-
載入每日累計資料表,例如可在公用資料欄上分組的基礎和差異表格
您可以通過使用下面的代碼創建一個分組的表。
df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")
在聯接之前 DataFrames 對聯接鍵進行重新分區
若要 DataFrames 在連接之前重新分割兩個連接鍵,請使用下列陳述式。
df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")
這將RDDs在啟動聯接之前對連接鍵進行兩個分區(仍然是分開的)。如果兩者在具有相同分區代碼的同一個密鑰上進行分區,則您計劃聯接在一起的RDD記錄將RDDs很有可能在混洗聯接之前共同位於同一個 Worker 上。這可能會透過減少聯結期間的網路活動和資料偏差來改善效能。
克服資料偏斜
資料偏斜是 Spark 工作瓶頸的最常見原因之一。當數據不是跨RDD分區均勻分佈時,就會發生這種情況。這會導致該分區的任務比其他分區花費更長的時間,從而延遲應用程序的整體處理時間。
若要識別資料偏差,請在 Spark UI 中評估下列量度:
-
在 Spark UI 的「舞台」索引標籤上,檢查「事件時間軸」頁面。您可以在下面的屏幕截圖中看到任務分佈不均勻。分佈不均或花費太長時間執行的工作可能表示資料偏斜。
-
另一個重要頁面是「摘要量度」,它會顯示 Spark 工作的統計資料。下列螢幕擷取畫面顯示「持續時間」、「GC 時間」、「溢出 (記憶體)」、「溢出 (磁碟)」等百分位數的度量。
當任務均勻分佈時,您將在所有百分位數中看到相似的數字。當數據偏斜時,您將在每個百分位數中看到非常偏差的值。在此範例中,工作持續時間小於 13 秒,以最小值、第 25 個百分位數、中位數和第 75 個百分位數為單位。雖然「最大」工作處理的資料量是第 75 個百分位數的 100 倍,但其 6.4 分鐘的持續時間大約是 30 倍。這意味著至少有一個任務(或多達 25% 的任務)花費的時間遠遠超過其餘任務。
如果您看到資料偏斜,請嘗試下列動作:
-
如果您使用 AWS Glue 3.0,請透過設定啟用調適性查詢執行
spark.sql.adaptive.enabled=true
。自適應查詢執行默認情況下在 AWS Glue 4.0 中啟用。您也可以透過設定下列相關參數,對聯結引入的資料偏斜使用調適性查詢執行:
-
spark.sql.adaptive.skewJoin.skewedPartitionFactor
-
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
-
spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)
-
spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)
如需詳細資訊,請參閱 Apache 星火文件
。 -
-
使用具有大範圍值的鍵作為連接鍵。在隨機連接,分區是為一個鍵的每個哈希值確定。如果聯接鍵的基數太低,則散列函數更有可能在分區之間分配數據做了不好的工作。因此,如果您的應用程式和商務邏輯支援它,請考慮使用較高的基數索引鍵或複合索引鍵。
# Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])
使用快取
使用重複性時 DataFrames,請使用或df.persist()
將計算結果緩存在每個 Spark 執行程序的內存和磁盤上,避免額外的隨機播放df.cache()
或計算。Spark 還支持RDDs在磁盤上保存或跨多個節點(存儲級別
例如,您可以 DataFrames 通過添加來保留df.persist()
. 當不再需要緩存時,您可以使unpersist
用丟棄緩存的數據。
df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()
移除不需要的火花動作
避免執行不必要的動作count
,例如show
、或collect
。正如在 Apache 星火一節中的關鍵主題討論的那樣,星火是懶惰的。每次對其執行動作時,都RDD可能會重新計算每個轉換。當您使用許多 Spark 動作時,會呼叫每個動作的多個來源存取、工作計算和隨機播放執行。
如果您不需要collect()
或在商業環境中執行其他動作,請考慮將其移除。
注意
盡可能避免collect()
在商業環境中使用 Spark。collect()
動作會將 Spark 執行程式中計算的所有結果傳回給 Spark 驅動程式,這可能會造成 Spark 驅動程式傳回錯OOM誤。為了避免錯OOM誤,Spark spark.driver.maxResultSize = 1GB
默認設置,這將返回給 Spark 驅動程序的最大數據大小限制為 1 GB。