Spark のパフォーマンスの最適化 - Amazon EMR

Spark のパフォーマンスの最適化

Amazon EMR では、Spark 向けに複数のパフォーマンス最適化機能が用意されています。このトピックでは、それぞれの最適化機能について詳しく説明します。

Spark の設定方法に関する詳細については、「Spark の設定」を参照してください。

ダイナミックパーティションプルーニング

ダイナミックパーティションプルーニングでは、特定のパーティションを特定のクエリを読み取って処理するテーブル内でより正確に選択し、ジョブパフォーマンスを改善します。読み取って処理するデータ量を減らすと、ジョブの実行において大幅な時間の節約になります。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.24.0 および 5.25.0 では、Spark 内から Spark プロパティ spark.sql.dynamicPartitionPruning.enabled を設定できます。また、クラスターの作成時に有効にすることもできます。

この最適化は、予定時間に解決される固定述語のプッシュダウンにのみ対応している Spark 2.4.2 の既存の機能を改良するものです。

Spark 2.4.2 での固定述語のプッシュダウンの例を以下に示します。

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

ダイナミックパーティションプルーニングにより、Spark エンジンでランタイムに積極的に情報から推計でき、パーティションが読み取り、安全に削除できます。たとえば、以下のクエリには、すべてのストアの販売合計を含み、リージョンごとに分けられた store_sales テーブルと、各国のリージョンのマッピングを含む store_regions テーブルという、2 つのテーブルが含まれます。テーブルには世界中に分散しているストアに関するデータが含まれていますが、北米のデータのみをクエリします。

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

ダイナミックパーティションプルーニングを使用しない場合、このクエリはサブクエリ結果に一致する地域のサブセットをフィルタリングする前に、すべての地域のデータを読み取ります。ダイナミックパーティションプルーニングを使用すると、このクエリはサブクエリで返された地域のパーティションのみを読み取り処理します。このため、ストレージのデータの読み取りや履歴の処理が少なくなり、時間とリソースが削減されます。

スカラサブクエリの平坦化

この最適化では、同じテーブルにスカラサブクエリのあるクエリのパフォーマンスを向上させます。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.24.0 および 5.25.0 では、Spark プロパティ spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled をSpark 内で設定して有効にできます。また、クラスターの作成時に有効にすることもできます。このプロパティを true に設定すると、可能な場合に、クエリオプティマイザが同じ関係式を使用する集約スカラサブクエリを平坦化します。スカラサブクエリは、サブクエリ内に存在する述語を集約関数にプッシュし、リレーションごとにすべての集約関数を使用して 1 つの集約を実行することによって平坦化されます。

以下は、この最適化を行うメリットがあるクエリの例です。

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

最適化により、以前のクエリが次のように書き換えられます。

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

書き換えられたクエリは一度だけ student テーブルを読み取ります。また、3 つのサブクエリの述語が avg 関数にプッシュされます。

INTERSECT 前に DISTINCT

この最適化では、INTERSECT を使用する場合の結合を最適化します。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.24.0 および 5.25.0 では、Spark プロパティ spark.sql.optimizer.distinctBeforeIntersect.enabled をSpark 内で設定して有効にできます。また、クラスターの作成時に有効にすることもできます。INTERSECT を使用したクエリは、左準結合を使用するように自動変換されます。このプロパティを true に設定すると、DISTINCT 演算子が左準結合を SortMergeJoin ではなく BroadcastHashJoin にできることを検出すると、DISTINCT 演算子を INTERSECT の子にプッシュします。

以下は、この最適化を行うメリットがあるクエリの例です。

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

このプロパティが有効になっていないと spark.sql.optimizer.distinctBeforeIntersect.enabled 、クエリは以下のように書き換えられます。

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

このプロパティを有効にすると spark.sql.optimizer.distinctBeforeIntersect.enabled 、クエリは以下のように書き換えられます。

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Bloom Filter Join

この最適化では、結合の一方の側の値から生成されたBloom フィルターを使用して、結合のもう一方の側を事前にフィルタリングすることで、一部の結合のパフォーマンスを向上させます。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.25.0 では、Spark 内で Spark プロパティ spark.sql.bloomFilterJoin.enabledtrue に設定してこの機能を有効にできます。また、クラスターを作成するときにもこの機能を有効にできます。

以下は、Bloom フィルターを使用することによってメリットがあるクエリの例です。

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

この機能を有効にすると、Bloom フィルターが、クエリの対象となるカテゴリのセットに属するカテゴリを持つすべてのアイテム ID から構築されます。sales テーブルのスキャン中に Bloom フィルターを使用して、Bloom フィルターで定義されたセット内に確実に含まれていないアイテムの売上を洗い出します。こうすることで、特定されたこれらの売上をできるだけ早い段階で除外できます。

結合順序の最適化

この最適化では、フィルターを持つテーブルを含む結合の順序を変更することでクエリのパフォーマンスを向上させることができます。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.25.0 では、Spark 設定パラメータ spark.sql.optimizer.sizeBasedJoinReorder.enabled を true に設定することで、この機能を有効にすることができます。Spark のデフォルトでは、クエリにリストされている順に、テーブルと左から右に結合するようになっています。この手法では、フィルターを含む小さい結合を最初に実行し、コストの高い結合を後で行うというチャンスを逃してしまいます。

このクエリの例では、ある国のすべての店舗に返品されたすべての商品が報告されます。結合順序を最適化しなかった場合、Spark は、大きなテーブル 2 つ (store_salesstore_returns) を最初に結合した後、これを store と結合し、最後に item と結合します。

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

結合順序を最適化した場合、Spark は、まず store_salesstore と結合します。store にはフィルターがあり、store_returns および broadcastable よりも小さいためです。次に store_returns、最後に item と結合します。item にフィルターがあり、プロードキャストが可能な場合は、結合順序の変更の対象となるため、store_salesstore と結合され、続いてitem、最後に store_returns と結合されます。