Call Amazon EMR on EKS with AWS Step Functions - AWS Step Functions

Call Amazon EMR on EKS with AWS Step Functions

Step Functions can control certain AWS services directly from Amazon States Language (ASL). To learn more, see Working with other services and Pass parameters to a service API.

How the Optimized Amazon EMR on EKS integration is different than the Amazon EMR on EKS AWS SDK integration
Note

For integration with Amazon EMR, Step Functions has a hard-coded 60 seconds job polling frequency for the first 10 minutes and 300 seconds after that.

To integrate AWS Step Functions with Amazon EMR on EKS, use the Amazon EMR on EKS service integration APIs. The service integration APIs are the same as the corresponding Amazon EMR on EKS APIs, but not all APIs support all integration patterns, as shown in the following table.

API Request response Run a job (.sync)
CreateVirtualCluster
DeleteVirtualCluster
StartJobRun

Supported Amazon EMR on EKS APIs:

Note

There is a quota for the maximum input or result data size for a task in Step Functions. This restricts you to 256 KB of data as a UTF-8 encoded string when you send to, or receive data from, another service. See Quotas related to state machine executions.

The following includes a Task state that creates a virtual cluster.

"Create_Virtual_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::emr-containers:createVirtualCluster", "Parameters": { "Name": "MyVirtualCluster", "ContainerProvider": { "Id": "EKSClusterName", "Type": "EKS", "Info": { "EksInfo": { "Namespace": "Namespace" } } } }, "End": true }

The following includes a Task state that submits a job to a virtual cluster and waits for it to complete.

"Submit_Job": { "Type": "Task", "Resource": "arn:aws:states:::emr-containers:startJobRun.sync", "Parameters": { "Name": "MyJobName", "VirtualClusterId.$": "$.VirtualClusterId", "ExecutionRoleArn": "arn:aws:iam::<accountId>:role/job-execution-role", "ReleaseLabel": "emr-6.2.0-latest", "JobDriver": { "SparkSubmitJobDriver": { "EntryPoint": "s3://<mybucket>/jobs/trip-count.py", "EntryPointArguments": [ "60" ], "SparkSubmitParameters": "--conf spark.driver.cores=2 --conf spark.executor.instances=10 --conf spark.kubernetes.pyspark.pythonVersion=3 --conf spark.executor.memory=10G --conf spark.driver.memory=10G --conf spark.executor.cores=1 --conf spark.dynamicAllocation.enabled=false" } }, "ConfigurationOverrides": { "ApplicationConfiguration": [ { "Classification": "spark-defaults", "Properties": { "spark.executor.instances": "2", "spark.executor.memory": "2G" } } ], "MonitoringConfiguration": { "PersistentAppUI": "ENABLED", "CloudWatchMonitoringConfiguration": { "LogGroupName": "MyLogGroupName", "LogStreamNamePrefix": "MyLogStreamNamePrefix" }, "S3MonitoringConfiguration": { "LogUri": "s3://<mylogsbucket>" } } }, "Tags": { "taskType": "jobName" } }, "End": true }

The following includes a Task state that deletes a virtual cluster and waits for the deletion to complete.

"Delete_Virtual_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::emr-containers:deleteVirtualCluster.sync", "Parameters": { "Id.$": "$.VirtualClusterId" }, "End": true }

For information about how to configure IAM permissions when using Step Functions with other AWS services, see IAM Policies for integrated services.