Spark jobs - Amazon EMR

Spark jobs

You can run Spark jobs on an application with the type parameter set to 'SPARK'. Jobs must be compatible with the Spark version referenced in the Amazon EMR release version. For example, when you run jobs on an application with Amazon EMR release 6.6.0, your job must be compatible with Apache Spark 3.2.0.

To run a Spark job, specify the following parameters when you use the start-job-run API.

Job runtime role (executionRoleArn)

This role is an IAM role ARN that your application uses to execute Spark jobs. This role must contain the following permissions:

  • Read from S3 buckets or other data sources where your data resides

  • Read from S3 buckets or prefixes where your PySpark script or JAR file resides

  • Write to S3 buckets where you intend to write your final output

  • Write logs to a S3 bucket or prefix that S3MonitoringConfigurationspecifies

  • Access to KMS keys if you use KMS keys to encrypt data in your S3 bucket

  • Access to the AWS Glue Data Catalog if you use SparkSQL

If your Spark job reads or writes data to or from other data sources, specify the appropriate permissions in this IAM role. If you don't provide these permissions to the IAM role, the job might fail. For more information, see Job runtime roles and Storing logs.

Job driver (jobDriver)

Use a job driver to provide input to the job. This parameter accepts only one value for the job type that you want to run. For a Spark job, the parameter value is sparkSubmit. You can use this job type to run Scala, Java, PySpark, SparkR, and any other supported jobs through Spark submit. Spark jobs have the following parameters:

  • sparkSubmitParameters – These are the additional Spark parameters that you want to send to the job. Use this parameter to override default Spark properties such as driver memory or number of executors, like those defined in the --conf or --class parameters.

  • entryPointArguments – This is an array of arguments that you want to pass to your main JAR or Python file. You should handle reading these parameters using your entrypoint code. Separate each argument in the array by a comma.

  • entryPoint – This is the reference in Amazon S3 to the main JAR or Python file that you want to run. If you are running a Scala or Java JAR, specify the main entry class in the SparkSubmitParameters using the --class argument.

For additional information, see Launching Applications with spark-submit.

Configuration overrides (configurationOverrides)

Use this parameter to override application and monitoring level configuration properties. This parameter accepts a JSON object with the following two fields:

  • applicationConfiguration – To override the default configurations for applications, you can provide a configuration object in this field. You can use a shorthand syntax to provide the configuration, or you can reference the configuration object in a JSON file. Configuration objects consist of a classification, properties, and optional nested configurations. Properties consist of the settings that you want to override in that file. You can specify multiple classifications for multiple applications in a single JSON object. The configuration classifications that you can use vary by specific release version for Amazon EMR. For a list of configuration classifications that are available for each release version of Amazon EMR, see Release versions.

    If you use the same configuration in an application override and in Spark submit parameters, the Spark submit parameters take priority. Configurations rank in priority as follows, from highest to lowest:

    • Configuration that EMR Serverless provides when it creates SparkSession.

    • Configuration that you provide as part of sparkSubmitParameters with the --conf argument.

    • Configuration that you provide as part of your application overrides.

    • Optimized configurations that Amazon EMR uses for the release.

    • Default open source configurations for the application.

  • monitoringConfiguration ‐ Use this field to specify the Amazon S3 URL (s3MonitoringConfiguration) where you want the EMR Serverless job to store logs of your Spark job. Make sure you've created this bucket with the same AWS account that hosts your application, and in the same AWS Region where your job is running.

Spark job properties

The following table lists optional Spark properties and their default values that you can override when you submit a Spark job.

Key Description Default value
spark.archives A comma-separated list of archives that Spark extracts into each executor's working directory. Supported file types include .jar, .tar.gz, .tgz and .zip. To specify the directory name to extract, add # after the file name that you want to extract. For example, file.zip#directory. This configuration is experimental. NULL
spark.authenticate Option that turns on authentication of Spark's internal connections. TRUE
spark.driver.cores The number of cores that the driver uses. 4
spark.driver.extraJavaOptions Extra Java options for the Spark driver. NULL
spark.driver.memory The amount of memory that the driver uses. 14G
spark.dynamicAllocation.enabled Option that turns on dynamic resource allocation. This option scales up or down the number of executors registered with the application, based on the workload. TRUE
spark.dynamicAllocation.executorIdleTimeout The length of time that an executor can remain idle before Spark removes it. This only applies if you turn on dynamic allocation. 60s
spark.dynamicAllocation.initialExecutors The initial number of executors to run if you turn on dynamic allocation. 3
spark.dynamicAllocation.maxExecutors The upper bound for the number of executors if you turn on dynamic allocation. 100
spark.dynamicAllocation.minExecutors The lower bound for the number of executors if you turn on dynamic allocation. 0
spark.emr-serverless.allocation.batch.size The number of containers to request in each cycle of executor allocation. There is a one-second gap between each allocation cycle. 20
spark.emr-serverless.allocation.executor.timeout The time to wait for a newly-created executor container to reach the running state before EMR Serverless cancels the request. 300s
spark.emr-serverless.driver.disk The Spark driver disk. 20G
spark.emr-serverless.driverEnv.[KEY] Option that adds environment variables to the Spark driver. NULL
spark.emr-serverless.executor.disk The Spark executor disk. 20G
spark.emr-serverless.memoryOverheadFactor Sets the memory overhead to add to the driver and executor container memory. 0.1
spark.executor.cores The number of cores that each executor uses. 4
spark.executor.extraJavaOptions Extra Java options for the Spark executor. NULL
spark.executor.instances The number of Spark executor containers to allocate. 3
spark.executor.memory The amount of memory that each executor uses. 14G
spark.executorEnv.[KEY] Option that adds environment variables to the Spark executors. NULL
spark.files A comma-separated list of files to go in the working directory of each executor. You can access the file paths of these files in the executor with SparkFiles.get(fileName). NULL
spark.hadoop.hive.metastore.client.factory.class The Hive metastore implementation class. NULL
spark.jars Additional jars to add to the runtime classpath of the driver and executors. NULL
spark.network.crypto.enabled Option that turns on AES-based RPC encryption. This includes the authentication protocol added in Spark 2.2.0. FALSE
spark.sql.warehouse.dir The default location for managed databases and tables. The value of $PWD/spark-warehouse
spark.submit.pyFiles A comma-separated list of .zip, .egg, or .py files to place in the PYTHONPATH for Python apps. NULL

The following table lists the default Spark submit parameters.

Key Description Default value
archives A comma-separated list of archives that Spark extracts into each executor's working directory. NULL
class The application's main class (for Java and Scala apps). NULL
conf An arbitrary Spark configuration property. NULL
driver-cores The number of cores that the driver uses. 4
driver-memory The amount of memory that the driver uses. 14G
executor-cores The number of cores that each executor uses. 4
executor-memory The amount of memory that the executor uses. 14G
files A comma-separated list of files to place in the working directory of each executor. You can access the file paths of these files in the executor with SparkFiles.get(fileName). NULL
jars A comma-separated list of jars to include on the driver and executor classpaths. NULL
num-executors The number of executors to launch. 3
py-files A comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. NULL
verbose Option that turns on additional debug output. NULL

Spark examples

The following example shows how to use the StartJobRun API to run a Python script. For an end-to-end tutorial that uses this example, see Getting started with Amazon EMR Serverless. You can find additional examples of how to run PySpark jobs and add Python dependencies in the EMR Serverless Samples GitHub repository.

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py", "entryPointArguments": ["s3://DOC-EXAMPLE-BUCKET-OUTPUT/wordcount_output"], "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1" } }'

The following example shows how to use the StartJobRun API to run a Spark JAR.

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "/usr/lib/spark/examples/jars/spark-examples.jar", "entryPointArguments": ["1"], "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" } }'