翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
動的パーティションのプルーニング
Spark3.0 以降には、動的パーティションプルーニング (DPP) が含まれています。動的パーティションプルーニングは、データの読み取り時に不要なパーティションのスキャンを防ぐ Spark の最適化手法です。以下は、DPP について知っておくべき重要な点です。
-
クエリフィルターと述語でリクエストされたパーティション値を調べ、クエリを満たすために必要なパーティションを決定します。不要と見なされるパーティションは、自動的に透過的にプルーニングされます。
-
DPP は、該当するデータを含まないパーティションをスキップすることで、処理時間とリソース使用率を削減します。これによりSpark、関連するパーティションのみに集中できます。
-
挿入または増分ロードによって追加される静的パーティションと動的に生成されたパーティションの両方で動作します。 は新しいパーティションSparkを認識し、動的プルーニングの適用を継続できます。
-
DPP は完全に透過的であるか、開発者には見えません。DPP を有効にするために特別なコーディングは必要ありません。これは、クエリプランの生成中に最適化としてバックグラウンドで自動的に行われます。
以下は、DPP が効率的に機能するようにするためのベストプラクティスです。
-
Spark データフレームオペレーションの早い段階でフィルターを適用して、述語のプッシュダウンを使用します。これによりSpark、パーティションメタデータを使用してパーティションを早期に排除できます。
-
を
ANALYZE TABLE
頻繁に実行して、データの統計を収集します。これにより、無視できるパーティションをより正確に判断Sparkするのに役立つ列レベルの統計が削減されます。 -
データを過剰に分割しないようにします。パーティションが多すぎると、統計を収集するときにドライバーノードが過負荷になる可能性があります。大きなテーブルごとに 10~100 個のパーティションを目指します。
-
結合前にデータフレームを再パーティション化します。これにより、すべてのデータの移動を必要とするシャッフル結合を防ぎ、読み取りデータの量をさらに最適化できます。
-
結合されているさまざまなテーブルで、一貫したパーティション列タイプと命名を使用します。これにより、結合の最適化のためにパーティションをよりSpark適切に一致させることができます。
-
でクエリをテスト
EXPLAIN
して DPP が適用されていることを確認し、追加チューニングが必要かどうかを確認します。
スタースキーマでは、テーブルはファクトテーブルとディメンションテーブルの 2 つの主要なタイプに分割されます。ディメンションテーブルは、ファクトテーブルよりもはるかに小さい傾向があります。ファクトテーブルをディメンションテーブルに結合すると、DPP はクエリプランを最適化します。ディメンションテーブルに適用されるフィルターからサブクエリを作成します。このサブクエリをブロードキャストし、そこからハッシュテーブルを構築します。次に、ファクトテーブルのデータを読み取る前に、ハッシュテーブルをファクトテーブルのスキャンフェーズに適用します。これにより、DPP はより大きなファクトテーブルから読み取る必要があるデータの量を減らすことができます。
次のクエリ例は、DPP が動作していることを示しています。クエリは国 (インド) からの注文数を取得し、ファクトテーブル () とディメンションテーブル (fact_orders
) 間の内部結合を含めますnation
。fact_orders
テーブルは列 によってパーティション化されますo_nationkey
。
- "select n.n_name as country, count(1) as no_of_orders from fact_orders o join nation n on o.o_nationkey = n.n_nationkey where n.n_name = 'INDIA' group by n.n_name"
以下は、EXPLAIN
プランで使用されるステップです。
-
小さいディメンションテーブル (
nation
) をスキャンし、列 でフィルタリングしますn_name = 'INDIA'
。 -
前のステップの結果をブロードキャストします。
-
最初のステップの結果をフィルタリングするサブクエリを作成します。
-
フルテーブルスキャンではなく、必要なファクトテーブルパーティションのみをスキャン
PartitionFilter
するように、 としてプッシュダウンします。
この DPP 最適化クエリのEXPLAIN
計画を次に示します。
== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#208L]) +- ShuffleQueryStage 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#353] +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#212L]) +- *(3) HashAggregate(keys=[n_name#31], functions=[], output=[]) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(n_name#31, 36), ENSURE_REQUIREMENTS, [id=#315] +- *(2) HashAggregate(keys=[n_name#31], functions=[], output=[n_name#31]) +- *(2) Project [n_name#31] +- *(2) BroadcastHashJoin [cast(o_nationkey#145 as bigint)], [n_nationkey#32L], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [o_nationkey#145] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://aws-spark-tuning/fact_orders], PartitionFilters: [isnotnull(o_nationkey#145), dynamicpruningexpression(cast(o_nationkey#145 as bigint) IN dynamicp..., PushedFilters: [], ReadSchema: struct<> : +- SubqueryBroadcast dynamicpruning#210, 0, [n_nationkey#32L], [id=#200] : +- OutputAdapter [n_name#31, n_nationkey#32L] : +- AdaptiveSparkPlan isFinalPlan=true : +- BroadcastQueryStage 2 : +- ReusedExchange [n_name#31, n_nationkey#32L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- BroadcastQueryStage 0 +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- *(1) Filter ((isnotnull(n_name#31) AND (n_name#31 = INDIA)) AND isnotnull(n_nationkey#32L)) +- FileScan json [n_name#31,n_nationkey#32L] Batched: false, DataFilters: [isnotnull(n_name#31), (n_name#31 = INDIA), isnotnull(n_nationkey#32L)], Format: JSON, Location: InMemoryFileIndex[s3://aws-spark-tuning/input/demo/json/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_name), EqualTo(n_name,INDIA), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,n_nationkey:bigint>
o_nationkey
列にダイレクトフィルターが追加されていなくても、DPP 機能のため、 はテーブル全体ではなく、必要なパーティションのみSparkを自動的にスキャンします。