Configure Spark - Amazon EMR

Configure Spark

You can configure Spark on Amazon EMR with configuration classifications. For more information about configuration classifications, see Configure applications.

Configuration classifications for Spark on Amazon EMR include the following:

  • spark – Sets the maximizeResourceAllocation property to true or false. When true, Amazon EMR automatically configures spark-defaults properties based on cluster hardware configuration. For more information, see Using maximizeResourceAllocation.

  • spark-defaults – Sets values in the spark-defaults.conf file. For more information, see Spark configuration in the Spark documentation.

  • spark-env – Sets values in the spark-env.sh file. For more information, see Environment variables in the Spark documentation.

  • spark-hive-site – Sets values in the hive-site.xml for Spark.

  • spark-log4j – (Amazon EMR releases 6.7.x and lower) Sets values in the log4j.properties file. For more information, see the log4j.properties.template file on Github.

  • spark-log4j2 – (Amazon EMR releases 6.8.0 and higher) Sets values in the log4j2.properties file. For more information, see the log4j2.properties.template file on Github.

  • spark-metrics – Sets values in the metrics.properties file. For settings and more information, see the metrics.properties.template file on Github, and Metrics in Spark documentation.

Note

If you're migrating Spark workloads to Amazon EMR from another platform, we recommend that you test your workloads with the Spark defaults set by Amazon EMR before you add custom configurations. Most customers see improved performance with our default settings.

Spark defaults set by Amazon EMR

The following table shows how Amazon EMR sets default values in spark-defaults that affect applications.

Spark defaults set by Amazon EMR
Setting Description Default value
spark.executor.memory

The amount of memory to use per executor process. For example: 1g, 2g.

This setting is determined by the core and task instance types in the cluster.

spark.executor.cores

The number of cores to use on each executor.

This setting is determined by the core and task instance types in the cluster.

spark.dynamicAllocation.enabled

When true, use dynamic resource allocation to scale the number of executors registered with an application up and down based on the workload.

true (with Amazon EMR 4.4.0 and higher)

Note

Spark shuffle service is automatically configured by Amazon EMR .

spark.sql.hive.advancedPartitionPredicatePushdown.enabled

When true, advanced partition predicate pushdown into Hive metastore is enabled.

true
spark.sql.hive.stringLikePartitionPredicatePushdown.enabled

Pushes down startsWith, contains, and endsWith filters into Hive metastore.

Note

Glue doesn't support predicate push down for startsWith, contains, or endsWith. If you are using Glue metastore and you encounter errors due to the predicate pushdown for these functions, set this configuration to false.

true

Configuring Spark garbage collection on Amazon EMR 6.1.0

Setting custom garbage collection configurations with spark.driver.extraJavaOptions and spark.executor.extraJavaOptions results in driver or executor launch failure with Amazon EMR 6.1 because of a conflicting garbage collection configuration with Amazon EMR 6.1.0. For Amazon EMR 6.1.0, the default garbage collection configuration is set through spark.driver.defaultJavaOptions and spark.executor.defaultJavaOptions. This configuration applies only to Amazon EMR 6.1.0. JVM options not related to garbage collection, such as those for configuring logging (-verbose:class), can still be set through extraJavaOptions. For more information, see Spark application properties.

Using maximizeResourceAllocation

To configure your executors to use the maximum resources possible on each node in a cluster, set maximizeResourceAllocation to true in your spark configuration classification. The maximizeResourceAllocation is specific to Amazon EMR . When you enable maximizeResourceAllocation, Amazon EMR calculates the maximum compute and memory resources available for an executor on an instance in the core instance group. It then sets the corresponding spark-defaults settings based on the calculated maximum values.

Amazon EMR calculates the maximum compute and memory resources available for an executor based on an instance type from the core instance fleet. Since each instance fleet can have different instance types and sizes within a fleet, the executor configuration that Amazon EMR uses might not be the best for your clusters, so we don't recommend using the default settings when using maximum resource allocation. Configure custom settings for your instance fleet clusters.

Note

You should not use the maximizeResourceAllocation option on clusters with other distributed applications like HBase. Amazon EMR uses custom YARN configurations for distributed applications, which can conflict with maximizeResourceAllocation and cause Spark applications to fail.

The following is an example Spark configuration classification with maximizeResourceAllocation set to true.

[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
Settings configured in spark-defaults when maximizeResourceAllocation is enabled
Setting Description Value
spark.default.parallelism Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

2X number of CPU cores available to YARN containers.

spark.driver.memory Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (for example, 1g, 2g).

Setting is configured based on the instance types in the cluster. However, because the Spark driver application may run on either the primary or one of the core instances (for example, in YARN client and cluster modes, respectively), this is set based on the smaller of the instance types in these two instance groups.

spark.executor.memory Amount of memory to use per executor process. (for example, 1g, 2g)

Setting is configured based on the core and task instance types in the cluster.

spark.executor.cores The number of cores to use on each executor. Setting is configured based on the core and task instance types in the cluster.
spark.executor.instances The number of executors.

Setting is configured based on the core and task instance types in the cluster. Set unless spark.dynamicAllocation.enabled explicitly set to true at the same time.

Configuring node decommissioning behavior

With Amazon EMR release 5.9.0 and higher, Spark on Amazon EMR includes a set of features to help ensure that Spark gracefully handles node termination because of a manual resize or an automatic scaling policy request. Amazon EMR implements a deny listing mechanism in Spark that is built on top of the YARN decommissioning mechanism. This mechanism helps ensure that no new tasks are scheduled on a node that is decommissioning, while at the same time allowing tasks that are already running to complete. In addition, there are features to help recover Spark jobs faster if shuffle blocks are lost when a node terminates. The recomputation process is triggered sooner and optimized to recompute faster with fewer stage retries, and jobs can be prevented from failing because of fetch failures that are caused by missing shuffle blocks.

Important

The spark.decommissioning.timeout.threshold setting was added in Amazon EMR release 5.11.0 to improve Spark resiliency when you use Spot instances. In earlier releases, when a node uses a Spot instance, and the instance is terminated because of bid price, Spark may not be able to handle the termination gracefully. Jobs may fail, and shuffle recomputations could take a significant amount of time. For this reason, we recommend using release 5.11.0 or later if you use Spot instances.

Spark node decommissioning settings
Setting Description Default value

spark.blacklist.decommissioning.enabled

When set to true, Spark deny lists nodes that are in the decommissioning state in YARN. Spark does not schedule new tasks on executors running on that node. Tasks already running are allowed to complete.

true

spark.blacklist.decommissioning.timeout

The amount of time that a node in the decommissioning state is deny listed. By default, this value is set to one hour, which is also the default for yarn.resourcemanager.decommissioning.timeout. To ensure that a node is deny listed for its entire decommissioning period, set this value equal to or greater than yarn.resourcemanager.decommissioning.timeout. After the decommissioning timeout expires, the node transitions to a decommissioned state, and Amazon EMR can terminate the node's EC2 instance. If any tasks are still running after the timeout expires, they are lost or killed and rescheduled on executors running on other nodes.

1h

spark.decommissioning.timeout.threshold

Available in Amazon EMR release 5.11.0 or later. Specified in seconds. When a node transitions to the decommissioning state, if the host will decommission within a time period equal to or less than this value, Amazon EMR not only deny lists the node, but also cleans up the host state (as specified by spark.resourceManager.cleanupExpiredHost) without waiting for the node to transition to a decommissioned state. This allows Spark to handle Spot instance terminations better because Spot instances decommission within a 20-second timeout regardless of the value of yarn.resourcemager.decommissioning.timeout, which may not provide other nodes enough time to read shuffle files.

20s

spark.resourceManager.cleanupExpiredHost

When set to true, Spark unregisters all cached data and shuffle blocks that are stored in executors on nodes that are in the decommissioned state. This speeds up the recovery process.

true

spark.stage.attempt.ignoreOnDecommissionFetchFailure

When set to true, helps prevent Spark from failing stages and eventually failing the job because of too many failed fetches from decommissioned nodes. Failed fetches of shuffle blocks from a node in the decommissioned state will not count toward the maximum number of consecutive fetch failures.

true

Spark ThriftServer environment variable

Spark sets the Hive Thrift Server Port environment variable, HIVE_SERVER2_THRIFT_PORT, to 10001.

Changing Spark default settings

You change the defaults in spark-defaults.conf using the spark-defaults configuration classification or the maximizeResourceAllocation setting in the spark configuration classification.

The following procedures show how to modify settings using the CLI or console.

To create a cluster with spark.executor.memory set to 2g using the CLI
  • Create a cluster with Spark installed and spark.executor.memory set to 2g, using the following command, which references a file, myConfig.json stored in Amazon S3.

    aws emr create-cluster --release-label emr-7.2.0 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    Note

    Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).

    myConfig.json:

    [ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]
To create a cluster with spark.executor.memory set to 2g using the console
  1. Navigate to the new Amazon EMR console and select Switch to the old console from the side navigation. For more information on what to expect when you switch to the old console, see Using the old console.

  2. Choose Create cluster, Go to advanced options.

  3. Choose Spark.

  4. Under Edit software settings, leave Enter configuration selected and enter the following configuration:

    classification=spark-defaults,properties=[spark.executor.memory=2G]
  5. Select other options, choose and then choose Create cluster.

To set maximizeResourceAllocation
  • Create a cluster with Spark installed and maximizeResourceAllocation set to true using the AWS CLI, referencing a file, myConfig.json, stored in Amazon S3.

    aws emr create-cluster --release-label emr-7.2.0 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    Note

    Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).

    myConfig.json:

    [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
Note

With Amazon EMR version 5.21.0 and later, you can override cluster configurations and specify additional configuration classifications for each instance group in a running cluster. You do this by using the Amazon EMR console, the AWS Command Line Interface (AWS CLI), or the AWS SDK. For more information, see Supplying a Configuration for an Instance Group in a Running Cluster.

Migrating from Apache Log4j 1.x to Log4j 2.x

Apache Spark releases 3.2.x and earlier use the legacy Apache Log4j 1.x and the log4j.properties file to configure Log4j in Spark processes. Apache Spark releases 3.3.0 and later use Apache Log4j 2.x and the log4j2.properties file to configure Log4j in Spark processes.

If you have configured Apache Spark Log4j using an Amazon EMR release lower than 6.8.0, then you must remove the legacy spark-log4j configuration classification and migrate to the spark-log4j2 configuration classification and key format before you can upgrade to Amazon EMR 6.8.0 or later. The legacy spark-log4j classification causes cluster creation to fail with a ValidationException error in Amazon EMR releases 6.8.0 and later. You will not be charged for a failure related to the Log4j incompatibility, but you must remove the defunct spark-log4j configuration classification to continue.

For more information about migrating from Apache Log4j 1.x to Log4j 2.x, see the Apache Log4j Migration Guide and the Spark Log4j 2 Template on Github.

Note

With Amazon EMR , Apache Spark uses a log4j2.properties file rather than the .xml file described in the Apache Log4j Migration Guide. Also, we do not recommend using the Log4j 1.x bridge method to convert to Log4j 2.x.