Spark jobs - Amazon EMR

Spark jobs

You can run Spark jobs on an application with the type parameter set to 'SPARK'. Jobs must be compatible with the Spark version referenced in the Amazon EMR release version. For example, when you run jobs on an application with Amazon EMR release 6.6.0, your job must be compatible with Apache Spark 3.2.0.

To run a Spark job, specify the following parameters when you use the start-job-run API.

Job runtime role (executionRoleArn)

This role is an IAM role ARN that your application uses to execute Spark jobs. This role must contain the following permissions:

  • Read from S3 buckets or other data sources where your data resides

  • Read from S3 buckets or prefixes where your PySpark script or JAR file resides

  • Write to S3 buckets where you intend to write your final output

  • Write logs to a S3 bucket or prefix that S3MonitoringConfigurationspecifies

  • Access to KMS keys if you use KMS keys to encrypt data in your S3 bucket

  • Access to the AWS Glue Data Catalog if you use SparkSQL

If your Spark job reads or writes data to or from other data sources, specify the appropriate permissions in this IAM role. If you don't provide these permissions to the IAM role, the job might fail. For more information, see Job runtime roles and Storing logs.

Job driver (jobDriver)

Use a job driver to provide input to the job. This parameter accepts only one value for the job type that you want to run. For a Spark job, the parameter value is sparkSubmit. You can use this job type to run Scala, Java, PySpark, SparkR, and any other supported jobs through Spark submit. This job type has the following parameters:

  • entryPoint – This is the reference in Amazon S3 to the main JAR or Python file that you want to run. If you are running a Scala or Java JAR, specify the main entry class in the SparkSubmitParameters using the --class argument.

  • entryPointArguments – This is an array of arguments that you want to pass to your main JAR or Python file. You should handle reading these parameters using your entrypoint code. Separate each argument in the array by a comma.

  • sparkSubmitParameters – These are the additional Spark parameters that you want to send to the job. Use this parameter to override default Spark properties such as driver memory or number of executors, like those defined in the --conf or --class parameters.

For additional information, see Launching Applications with spark-submit.

Configuration overrides (configurationOverrides)

Use this parameter to override application and monitoring level configuration properties. This parameter accepts a JSON object with the following two fields:

  • applicationConfiguration – To override the default configurations for applications, you can provide a configuration object in this field. You can use a shorthand syntax to provide the configuration, or you can reference the configuration object in a JSON file. Configuration objects consist of a classification, properties, and optional nested configurations. Properties consist of the settings that you want to override in that file. You can specify multiple classifications for multiple applications in a single JSON object. The configuration classifications that you can use vary by specific release version for Amazon EMR. For a list of configuration classifications that are available for each release version of Amazon EMR, see Release versions.

    If you use the same configuration in an application override and in Spark submit parameters, the Spark submit parameters take priority. Configurations rank in priority as follows, from highest to lowest:

    • Configuration that you supply when creating SparkSession.

    • Configuration that you provide as part of sparkSubmitParameters using --conf.

    • Configuration that you provide as part of your application overrides.

    • Optimized configurations that Amazon EMR uses for the release.

    • Default open source configurations for the application.

  • monitoringConfiguration ‐ Use this field to specify the Amazon S3 URL (s3MonitoringConfiguration) where you want the EMR Serverless job to store logs of your Spark job. Make sure you've created this bucket with the same AWS account that hosts your application, and in the same AWS Region where your job is running.

Spark job properties

The following table lists optional Spark properties and their default values that you can override when you submit a Spark job.

Key Description Default value
spark.emr-serverless.allocation.batch.size The number of containers to request in each cycle of executor allocation. There is a one-second gap between each allocation cycle. 20
spark.emr-serverless.allocation.executor.timeout The time to wait for a newly-created executor container to reach the running state before the request is cancelled. 300s
spark.emr-serverless.memoryOverheadFactor Sets the memory overhead to add to the driver and executor container memory. 0.1
spark.executor.memory The amount of memory that each executor uses. 14G
spark.executor.cores The number of cores that each executor uses. 4
spark.driver.memory The amount of memory that the driver uses. 14G
spark.driver.cores The number of cores that the driver uses. 4
spark.emr-serverless.driver.disk The Spark driver disk. 21G
spark.emr-serverless.executor.disk The Spark executor disk. 21G
spark.executor.instances The number of Spark executor containers to allocate. 3
spark.executor.extraJavaOptions Extra Java options for the Spark executor. NULL
spark.driver.extraJavaOptions Extra Java options for the Spark driver. NULL
spark.driver.extraJavaOptions Extra Java options for the Spark driver. NULL
spark.dynamicAllocation.enabled Option that activates Spark Dynamic Resource Allocation. TRUE
spark.emr-serverless.driverEnv.[KEY] Option that adds environment variables to the Spark driver. NULL
spark.executorEnv.[KEY] Option that adds environment variables to the Spark executors. NULL
spark.files A comma-separated list of files to go in the working directory of each executor. The file paths of these files in executors can be accessed by running SparkFiles.get(fileName). NULL
spark.jars Additional jars to add to the runtime classpath of the driver and executors. NULL
spark.archives A comma-separated list of archives to be extracted into the working directory of each executor. Supported file types include .jar, .tar.gz, .tgz and .zip. To specify the directory name to extract, add # after the file name that you want to extract. For example, file.zip#directory. This configuration is experimental. NULL
spark.submit.pyFiles A comma-separated list of .zip, .egg, or .py files to place in the PYTHONPATH for Python apps. NULL
spark.sql.warehouse.dir The default location for managed databases and tables. The value of $PWD/spark-warehouse
spark.hadoop.hive.metastore.client.factory.class The Hive metastore implementation class. NULL
spark.authenticate Option that turns on authentication of Spark's internal connections. TRUE
spark.network.crypto.enabled Option that turns on AES-based RPC encryption. This includes the authentication protocol added in 2.2.0. FALSE
spark.dynamicAllocation.enabled Option that turns on dynamic resource allocation,. This option scales up or down the number of executors registered with the application, based on the workload. TRUE
spark.dynamicAllocation.executorIdleTimeout The length of time that an executor can remain idle before it will be removed. This only applies if you turn on dynamic allocation. 60s
spark.dynamicAllocation.initialExecutors The initial number of executors to run if you turn on dynamic allocation. 3
spark.dynamicAllocation.maxExecutors The upper bound for the number of executors if you turn on dynamic allocation. 100
spark.dynamicAllocation.minExecutors The lower bound for the number of executors if you turn on dynamic allocation. 0

The following table lists the default Spark submit parameters.

Key Description Default value
executor-memory The amount of memory that the executor uses. 14G
executor-cores The number of cores that each executor uses. 4
driver-memory The amount of memory that the driver uses. 14G
driver-cores The number of cores that the driver uses. 4
num-executors The number of executors to launch. 3
files A comma-separated list of files to place in the working directory of each executor. File paths of these files in executors can be accessed with SparkFiles.get(fileName). NULL
py-files A comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. NULL
archives A comma-separated list of archives to be extracted into the working directory of each executor. NULL
jars A comma-separated list of jars to include on the driver and executor classpaths. NULL
verbose Option that turns on additional debug output. NULL
class The application's main class (for Java and Scala apps). NULL
conf An arbitrary Spark configuration property. NULL

Spark examples

The following example shows how to use the StartJobRun API to run a Python script. For an end-to-end tutorial that uses this example, see Getting started with Amazon EMR Serverless. You can find additional examples of how to run PySpark jobs and add Python dependencies in the EMR Serverless Samples GitHub repository.

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py", "entryPointArguments": ["s3://DOC-EXAMPLE-BUCKET-OUTPUT/wordcount_output"], "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1" } }'

The following example shows how to use the StartJobRun API to run a Spark JAR.

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "/usr/lib/spark/examples/jars/spark-examples.jar", "entryPointArguments": ["1"], "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" } }'