Using Map state in Distributed mode to orchestrate large-scale parallel workloads - AWS Step Functions

Using Map state in Distributed mode to orchestrate large-scale parallel workloads

With Step Functions, you can orchestrate large-scale parallel workloads to perform tasks, such as on-demand processing of semi-structured data. These parallel workloads let you concurrently process large-scale data sources stored in Amazon S3. For example, you might process a single JSON or CSV file that contains large amounts of data. Or you might process a large set of Amazon S3 objects.

To set up a large-scale parallel workload in your workflows, include a Map state in Distributed mode. The Map state processes items in a dataset concurrently. A Map state set to Distributed is known as a Distributed Map state. In Distributed mode, the Map state allows high-concurrency processing. In Distributed mode, the Map state processes the items in the dataset in iterations called child workflow executions. You can specify the number of child workflow executions that can run in parallel. Each child workflow execution has its own, separate execution history from that of the parent workflow. If you don't specify, Step Functions runs 10,000 parallel child workflow executions in parallel.

The following illustration explains how you can set up large-scale parallel workloads in your workflows.


            Diagram to illustrate the concept of orchestrating large-scale parallel workloads.

Key terms

Distributed mode

A processing mode of the Map state. In this mode, each iteration of the Map state runs as a child workflow execution that enables high concurrency. Each child workflow execution has its own execution history, which is separate from the parent workflow's execution history. This mode supports reading input from large-scale Amazon S3 data sources.

Distributed Map state

A Map state set to Distributed processing mode.

Map workflow

A set of steps that a Map state runs.

Parent workflow

A workflow that contains one or more Distributed Map states.

Child workflow execution

An iteration of the Distributed Map state. A child workflow execution has its own execution history, which is separate from the parent workflow's execution history.

Map Run

When you run a Map state in Distributed mode, Step Functions creates a Map Run resource. A Map Run refers to a set of child workflow executions that a Distributed Map state starts, and the runtime settings that control these executions. Step Functions assigns an Amazon Resource Name (ARN) to your Map Run. You can examine a Map Run in the Step Functions console. You can also invoke the DescribeMapRun API action. A Map Run also emits metrics to CloudWatch.

For more information, see Examining Map Run.

Distributed Map state definition example

Use the Map state in Distributed mode when you need to orchestrate large-scale parallel workloads that meet any combination of the following conditions:

  • The size of your dataset exceeds 256 KB.

  • The workflow's execution event history exceeds 25,000 entries.

  • You need a concurrency of more than 40 parallel iterations.

The following Distributed Map state definition example specifies the dataset as a CSV file stored in an Amazon S3 bucket. It also specifies a Lambda function that processes the data in each row of the CSV file. Because this example uses a CSV file, it also specifies the location of the CSV column headers. To view the complete state machine definition of this example, see the tutorial Copying large-scale CSV data using Distributed Map.

{ "Map": { "Type": "Map", "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database", "Key": "csv-dataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "LambdaTask", "States": { "LambdaTask": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:processCSVData" }, "End": true } } }, "Label": "Map", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "myOutputBucket", "Prefix": "csvProcessJobs" } } } }

Permissions to run Distributed Map

When you include a Distributed Map state in your workflows, Step Functions needs appropriate permissions to allow the state machine role to invoke the StartExecution API action for the Distributed Map state.

The following IAM policy example grants the least privileges required to your state machine role for running the Distributed Map state.

Note

Make sure that you replace stateMachineName with the name of the state machine in which you're using the Distributed Map state. For example, arn:aws:states:us-east-2:123456789012:stateMachine:mystateMachine.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws:states:region:accountID:stateMachine:stateMachineName" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws:states:region:accountID:execution:stateMachineName:*" } ] }

In addition, you need to make sure that you have the least privileges necessary to access the AWS resources used in the Distributed Map state, such as Amazon S3 buckets. For information, see IAM policies for using Distributed Map state.

Distributed Map state fields

To use the Distributed Map state in your workflows, specify one or more of these fields. You specify these fields in addition to the common state fields.

Type (Required)

Sets the type of state, such as Map.

ItemProcessor (Required)

Contains the following JSON objects that specify the Map state processing mode and definition.

  • ProcessorConfig – A JSON object that specifies the configuration for the Map state. This object contains the following sub-fields:

    • Mode – Set to DISTRIBUTED to use the Map state in Distributed mode.

      Note

      Currently, if you use the Map state inside Express workflows, you can't set the Mode to DISTRIBUTED. However, if you use the Map state inside Standard workflows, you can set the Mode to DISTRIBUTED.

    • ExecutionType – Specifies the execution type for the Map workflow as either STANDARD or EXPRESS. You must provide this field if you specified DISTRIBUTED for the Mode sub-field. For more information about workflow types, see Standard vs. Express Workflows.

  • StartAt – Specifies a string that indicates the first state in a workflow. This string is case-sensitive and must match the name of one of the state objects. This state runs first for each item in the dataset. Any execution input that you provide to the Map state passes to the StartAt state first.

  • States – A JSON object containing a comma-delimited set of states. In this object, you define the Map workflow.

ItemReader

Specifies a dataset and its location. The Map state receives its input data from the specified dataset.

In Distributed mode, you can use either a JSON payload passed from a previous state or a large-scale Amazon S3 data source as the dataset. For more information, see ItemReader.

ItemsPath (Optional)

Specifies a reference path using the JsonPath syntax to select the JSON node that contains an array of items inside the state input.

In Distributed mode, you specify this field only when you use a JSON array from a previous step as your state input. For more information, see ItemsPath.

ItemSelector (Optional)

Overrides the values of individual dataset items before they're passed on to each Map state iteration.

In this field, you specify a valid JSON input that contains a collection of key-value pairs. These pairs can either be static values that you define in your state machine definition, values selected from the state input using a path, or values accessed from the context object. For more information, see ItemSelector.

ItemBatcher (Optional)

Specifies to process the dataset items in batches. Each child workflow execution then receives a batch of these items as input. For more information, see ItemBatcher.

MaxConcurrency (Optional)

Specifies the number of child workflow executions that can run in parallel. The interpreter only allows up to the specified number of parallel child workflow executions. If you don't specify a concurrency value or set it to zero, Step Functions doesn't limit concurreny and runs 10,000 parallel child workflow executions.

Note

While you can specify a higher concurrency limit for parallel child workflow executions, we recommend that you don't exceed the capacity of a downstream AWS service, such as AWS Lambda.

MaxConcurrencyPath (Optional)

If you want to provide a maximum concurrency value dynamically from the state input using a reference path, use MaxConcurrencyPath. When resolved, the reference path must select a field whose value is a non-negative integer.

Note

A Map state cannot include both MaxConcurrency and MaxConcurrencyPath.

ToleratedFailurePercentage (Optional)

Defines the percentage of failed items to tolerate in a Map Run. The Map Run automatically fails if it exceeds this percentage. Step Functions calculates the percentage of failed items as the result of the total number of failed or timed out items divided by the total number of items. You must specify a value between zero and 100. For more information, see Tolerated failure threshold for Distributed Map state.

ToleratedFailurePercentagePath (Optional)

If you want to provide a tolerated failure percentage value dynamically from the state input using a reference path, use ToleratedFailurePercentagePath. When resolved, the reference path must select a field whose value is between zero and 100.

ToleratedFailureCount (Optional)

Defines the number of failed items to tolerate in a Map Run. The Map Run automatically fails if it exceeds this number. For more information, see Tolerated failure threshold for Distributed Map state.

ToleratedFailureCountPath (Optional)

If you want to provide a tolerated failure count value dynamically from the state input using a reference path, use ToleratedFailureCountPath. When resolved, the reference path must select a field whose value is a non-negative integer.

Label (Optional)

A string that uniquely identifies a Map state. For each Map Run, Step Functions adds the label to the Map Run ARN. The following is an example of a Map Run ARN with a custom label named demoLabel:

arn:aws:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b

If you don't specify a label, Step Functions automatically generates a unique label.

Note

Labels can't exceed 40 characters in length, must be unique within a state machine definition, and can't contain any of the following characters:

  • Whitespace characters

  • Wildcard characters (? *)

  • Bracket characters (< > { } [ ])

  • Special characters (: ; , \ | ^ ~ $ # % & ` ")

  • Control characters (\\u0000 - \\u001f or \\u007f - \\u009f).

Step Functions allows you to create names for state machines, executions, activities, and labels that contain non-ASCII characters. These non-ASCII names don't work with Amazon CloudWatch. To ensure that you can track CloudWatch metrics, choose a name that uses only ASCII characters.

ResultWriter (Optional)

Specifies the Amazon S3 location where Step Functions writes all child workflow execution results.

Step Functions consolidates all child workflow execution data, such as execution input and output, ARN, and execution status. It then exports executions with the same status to their respective files in the specified Amazon S3 location. For more information, see ResultWriter.

If you don't export the Map state results, it returns an array of all the child workflow execution results. For example:

[1, 2, 3, 4, 5]
ResultPath (Optional)

Specifies where in the input to place the output of the iterations. The input is then filtered as specified by the OutputPath field if present, before it is passed as the state's output. For more information, see Input and Output Processing.

ResultSelector (Optional)

Pass a collection of key-value pairs, where the values are static or selected from the result. For more information, see ResultSelector.

Tip

If the Parallel or Map state you use in your state machines returns an array of arrays, you can transform them into a flat array with the ResultSelector field. For more information, see Flattening an array of arrays.

Retry (Optional)

An array of objects, called Retriers, that define a retry policy. An execution uses the retry policy if the state encounters runtime errors. For more information, see State machine examples using Retry and using Catch.

Note

If you define Retriers for the Distributed Map state, the retry policy applies to all of the child workflow executions the Map state started. For example, imagine your Map state started three child workflow executions, out of which one fails. When the failure occurs, the execution uses the Retry field, if defined, for the Map state. The retry policy applies to all the child workflow executions and not just the failed execution. If one or more child workflow executions fails, the Map Run fails.

When you retry a Map state, it creates a new Map Run.

Catch (Optional)

An array of objects, called Catchers, that define a fallback state. Step Functions uses the Catchers defined in Catch if the state encounters runtime errors. When an error occurs, the execution first uses any retriers defined in Retry. If the retry policy isn't defined or is exhausted, the execution uses its Catchers, if defined. For more information, see Fallback States.

Next steps

To continue learning more about Distributed Map state, see the following resources: