Amazon EMR
Amazon EMR Release Guide

Optimizing Spark Performance

With Amazon EMR version 5.24.0, you can enable the following performance optimization features for Spark: dynamic partition pruning, flattening scalar subqueries, and improving joins when using INTERSECT. Each optimization feature is disabled by default, and can be enabled by setting an optimization-specific Spark configuration property. This topic explains each optimization feature in detail, and the properties you can use to enable them.

For more information on how to set Spark configuration, see Configure Spark.

Dynamic Partition Pruning

Dynamic partition pruning improves job performance by more accurately selecting the specific partitions within a table that need to be read and processed for a specific query. By reducing the amount of data read and processed, significant time is saved in job execution. Dynamic partition pruning is disabled by default. You can enable this feature by setting the Spark property spark.sql.dynamicPartitionPruning.enabled from within Spark or when creating clusters.

This optimization improves upon the existing capabilities of Spark 2.4.2, which only supports pushing down static predicates that can be resolved at plan time.

The following are examples of static predicate push down in Spark 2.4.2.

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

Dynamic partition pruning allows the Spark engine to dynamically infer at runtime which partitions need to be read and which can be safely eliminated. For example, the following query involves two tables: the store_sales table that contains all of the total sales for all stores and is partitioned by region, and the store_regions table that contains a mapping of regions for each country. The tables contain data about stores distributed around the globe, but we are only querying data for North America.

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

Without dynamic partition pruning, this query will read all regions before filtering out the subset of regions that match the results of the subquery. With dynamic partition pruning, this query will read and process only the partitions for the regions returned in the subquery. This saves time and resources by reading less data from storage and processing less records.

Note

Dynamic partition pruning works only when the table to be pruned is on the left side of the join. In the example above, the store_sales table is being dynamically pruned and the query only reads and processes the partitions returned for North America.

Flattening Scalar Subqueries

This optimization improves the performance of queries that have scalar subqueries over the same table. It is disabled by default. You can enable it by setting the Spark property spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled from within Spark or when creating clusters. When this property is set to true, the query optimizer flattens aggregate scalar subqueries that use the same relation if possible. The scalar subqueries are flattened by pushing any predicates present in the subquery into the aggregate functions and then performing one aggregation, with all the aggregate functions, per relation.

Following is a sample of a query that will benefit from this optimization.

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

The optimization rewrites the previous query as:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

Notice that the rewritten query reads the student table only once, and the predicates of the three subqueries are pushed into the avg function.

DISTINCT Before INTERSECT

This optimization optimizes joins when using INTERSECT. It is disabled by default. You can enable it by setting the Spark property spark.sql.optimizer.distinctBeforeIntersect.enabled from within Spark or when creating clusters. Queries using INTERSECT are automatically converted to use a left-semi join. When this property is set to true, the query optimizer pushes the DISTINCT operator to the children of INTERSECT if it detects that the DISTINCT operator can make the left-semi join a BroadcastHashJoin instead of a SortMergeJoin.

Following is a sample of a query that will benefit from this optimization.

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

Without enabling this property spark.sql.optimizer.distinctBeforeIntersect.enabled, the query will be rewritten as follows.

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

When you enable this property spark.sql.optimizer.distinctBeforeIntersect.enabled, the query will be rewritten as follows.

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand