Spark パフォーマンスの最適化 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Spark パフォーマンスの最適化

Amazon EMRは Spark に複数のパフォーマンス最適化機能を提供します。このトピックでは、それぞれの最適化機能について詳しく説明します。

Spark の設定方法に関する詳細については、「Spark の設定」を参照してください。

アダプティブクエリ実行

アダプティブクエリ実行は、ランタイム統計に基づいてクエリプランを再最適化するためのフレームワークです。Amazon EMR 5.30.0 以降、Apache Spark 3 からの次のアダプティブクエリ実行最適化は、Spark 2 の Apache Amazon EMR Runtime で利用できます。

  • アダプティブ結合変換

  • シャッフルパーティションのアダプティブ結合

アダプティブ結合変換

アダプティブ結合変換は、クエリステージのランタイムサイズに基づいてオペレーションを broadcast-hash-joinsオペレーションに変換 sort-merge-joinすることで、クエリパフォーマンスを向上させます。 Broadcast-hash-joins は、結合の片側がすべてのエグゼキューターに効率的に出力をブロードキャストするのに十分なほど小さい場合、パフォーマンスが向上する傾向があるため、交換をシャッフルして結合の両側をソートする必要がなくなります。アダプティブ結合変換は、Spark が自動的に を実行する場合のケースの範囲を広げます broadcast-hash-joins。

この機能は、デフォルトでご利用になれます。これは、spark.sql.adaptive.enabledfalse に設定することで無効にできます。これにより、アダプティブクエリ実行フレームワークも無効になります。Spark は、結合側の 1 つのランタイムサイズの統計が を超えない場合spark.sql.autoBroadcastJoinThreshold、 sort-merge-join を broadcast-hash-joinに変換することを決定します。デフォルトでは 10,485,760 バイト (10 MiB) です。

シャッフルパーティションのアダプティブ結合

シャッフルパーティションのアダプティブ結合により、小さな連続するシャッフルパーティションが結合され、小さなタスクが多すぎることによるオーバーヘッドを回避することで、クエリのパフォーマンスが向上します。これにより、事前に初期シャッフルパーティションの数をより高く設定することができ、ランタイムでターゲットサイズまで削減されるため、シャッフルパーティションがより均等に分散される可能性が高まります。

この機能は、spark.sql.shuffle.partitions が明示的に設定されていない限り、デフォルトで有効になっています。これは、spark.sql.adaptive.coalescePartitions.enabledtrue に設定することで有効にできます。シャッフルパーティションの初期数とターゲットパーティションのサイズは、それぞれ spark.sql.adaptive.coalescePartitions.minPartitionNum プロパティと spark.sql.adaptive.advisoryPartitionSizeInBytes プロパティを使用して調整できます。この機能に関連する Spark プロパティの詳細については、以下の表を参照してください。

Spark アダプティブ結合パーティションのプロパティ
プロパティ デフォルト値 説明

spark.sql.adaptive.coalescePartitions.enabled

spark.sql.shuffle.partitions が明示的に設定されている場合を除き、true

true であり、かつ、spark.sql.adaptive.enabled が true の場合、Spark はターゲットサイズ (spark.sql.adaptive.advisoryPartitionSizeInBytes によって指定) に従って、連続するシャッフルパーティションを結合し、小さいタスクが多くなり過ぎないようにします。

spark.sql.adaptive.advisoryPartitionSizeInBytes

64MB

結合時のシャッフルパーティションのアドバイザリサイズ (バイト単位)。この設定は、spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled が両方とも true の場合にのみ効果があります。

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

結合後のシャッフルパーティションの最小数。この設定は、spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled が両方とも true の場合にのみ効果があります。

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1,000

結合前のシャッフルパーティションの初期数。この設定は、spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled が両方とも true の場合にのみ効果があります。

ダイナミックパーティションプルーニング

ダイナミックパーティションプルーニングでは、特定のパーティションを特定のクエリを読み取って処理するテーブル内でより正確に選択し、ジョブパフォーマンスを改善します。読み取って処理するデータ量を減らすと、ジョブの実行において大幅な時間の節約になります。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon 5.24.0 EMR および 5.25.0 では、Spark spark.sql.dynamicPartitionPruning.enabled内またはクラスターの作成時に Spark プロパティを設定することで、この機能を有効にできます。

Spark ダイナミックパーティションプルーニングのパーティションのプロパティ
プロパティ デフォルト値 説明

spark.sql.dynamicPartitionPruning.enabled

true

true の場合、ダイナミックパーティションプルーニングが有効になります。

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

true の場合、Spark はクエリの実行前に防御チェックを行い、ダイナミックプルーニングフィルターでのブロードキャスト交換の再利用が、ユーザー定義の列指向ルールなどの後続の準備ルールによって破損していないことを確認します。再利用が壊れていて、この config が true の場合、Spark は影響を受けるダイナミックプルーニングフィルターを除去し、パフォーマンスと正確性に関する問題から保護します。ダイナミックプルーニングフィルターのブロードキャスト交換によって、対応する結合オペレーションのブロードキャスト交換とは異なる、整合しない結果が生じると、正確性に関する問題が発生します。この構成ではユーザー定義の列指向ルールによって再利用が破損している場合などのシナリオを回避できるため、これを false に設定する場合は慎重に行ってください。アダプティブクエリ実行が有効になっている場合、ブロードキャストの再利用は常に適用されます。

この最適化は、予定時間に解決される固定述語のプッシュダウンにのみ対応している Spark 2.4.2 の既存の機能を改良するものです。

Spark 2.4.2 での固定述語のプッシュダウンの例を以下に示します。

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

ダイナミックパーティションプルーニングにより、Spark エンジンでランタイムに積極的に情報から推計でき、パーティションが読み取り、安全に削除できます。たとえば、以下のクエリには、すべてのストアの販売合計を含み、リージョンごとに分けられた store_sales テーブルと、各国のリージョンのマッピングを含む store_regions テーブルという、2 つのテーブルが含まれます。テーブルには世界中に分散しているストアに関するデータが含まれていますが、北米のデータのみをクエリします。

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

ダイナミックパーティションプルーニングを使用しない場合、このクエリはサブクエリ結果に一致する地域のサブセットをフィルタリングする前に、すべての地域のデータを読み取ります。ダイナミックパーティションプルーニングを使用すると、このクエリはサブクエリで返された地域のパーティションのみを読み取り処理します。このため、ストレージのデータの読み取りや履歴の処理が少なくなり、時間とリソースが削減されます。

スカラサブクエリの平坦化

この最適化では、同じテーブルにスカラサブクエリのあるクエリのパフォーマンスを向上させます。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon 5.24.0 EMR および 5.25.0 では、Spark spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled内またはクラスターの作成時に Spark プロパティを設定することで有効にできます。このプロパティを true に設定すると、可能な場合に、クエリオプティマイザが同じ関係式を使用する集約スカラサブクエリを平坦化します。スカラサブクエリは、サブクエリ内に存在する述語を集約関数にプッシュし、リレーションごとにすべての集約関数を使用して 1 つの集約を実行することによって平坦化されます。

以下は、この最適化を行うメリットがあるクエリの例です。

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

最適化により、以前のクエリが次のように書き換えられます。

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

書き換えられたクエリは一度だけ student テーブルを読み取ります。また、3 つのサブクエリの述語が avg 関数にプッシュされます。

DISTINCT より前 INTERSECT

この最適化により、 の使用時に結合が最適化されますINTERSECT。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon 5.24.0 EMR および 5.25.0 では、Spark spark.sql.optimizer.distinctBeforeIntersect.enabled内またはクラスターの作成時に Spark プロパティを設定することで有効にできます。を使用するクエリINTERSECTは、左半結合を使用するように自動的に変換されます。このプロパティを true に設定すると、DISTINCTオペレータが BroadcastHashJoin の代わりに左半結合を にできることを検出INTERSECTした場合、クエリオプティマイザはDISTINCTオペレータを の子にプッシュします SortMergeJoin。

以下は、この最適化を行うメリットがあるクエリの例です。

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

このプロパティが有効になっていないと spark.sql.optimizer.distinctBeforeIntersect.enabled 、クエリは以下のように書き換えられます。

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

このプロパティを有効にすると spark.sql.optimizer.distinctBeforeIntersect.enabled 、クエリは以下のように書き換えられます。

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

ブルームフィルター結合

この最適化では、結合の一方の側の値から生成されたBloom フィルターを使用して、結合のもう一方の側を事前にフィルタリングすることで、一部の結合のパフォーマンスを向上させます。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon EMR5.25.0 では、Spark プロパティを spark.sql.bloomFilterJoin.enabled Spark 内trueまたはクラスターの作成時に に設定することで、この機能を有効にできます。

以下は、Bloom フィルターを使用することによってメリットがあるクエリの例です。

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

この機能を有効にすると、Bloom フィルターが、クエリの対象となるカテゴリのセットに属するカテゴリを持つすべてのアイテム ID から構築されます。sales テーブルのスキャン中に Bloom フィルターを使用して、Bloom フィルターで定義されたセット内に確実に含まれていないアイテムの売上を洗い出します。こうすることで、特定されたこれらの売上をできるだけ早い段階で除外できます。

結合順序の最適化

この最適化では、フィルターを持つテーブルを含む結合の順序を変更することでクエリのパフォーマンスを向上させることができます。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon EMR5.25.0 では、Spark 設定パラメータを spark.sql.optimizer.sizeBasedJoinReorder.enabled true に設定することで、この機能を有効にできます。Spark のデフォルトでは、クエリにリストされている順に、テーブルと左から右に結合するようになっています。この手法では、フィルターを含む小さい結合を最初に実行し、コストの高い結合を後で行うというチャンスを逃してしまいます。

このクエリの例では、ある国のすべての店舗に返品されたすべての商品が報告されます。結合順序を最適化しなかった場合、Spark は、大きなテーブル 2 つ (store_salesstore_returns) を最初に結合した後、これを store と結合し、最後に item と結合します。

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

結合順序を最適化した場合、Spark は、まず store_salesstore と結合します。store にはフィルターがあり、store_returns および broadcastable よりも小さいためです。次に store_returns、最後に item と結合します。item にフィルターがあり、プロードキャストが可能な場合は、結合順序の変更の対象となるため、store_salesstore と結合され、続いてitem、最後に store_returns と結合されます。