Using Map state in Distributed mode - AWS Step Functions

Using Map state in Distributed mode

Step Functions provides a high-concurrency mode for the Map state known as Distributed mode. In this mode, the Map state can accept input from large-scale Amazon S3 data sources. For example, your input can be a JSON or CSV file stored in an Amazon S3 bucket, or a JSON array passed from a previous step in the workflow. A Map state set to Distributed is known as a Distributed Map state. In this mode, the Map state runs each iteration as a child workflow execution, which enables high concurrency of up to 10,000 parallel child workflow executions. Each child workflow execution has its own, separate execution history from that of the parent workflow.

Use the Map state in Distributed mode when you need to orchestrate large-scale parallel workloads that meet any combination of the following conditions:

  • The size of your dataset exceeds 256 KB.

  • The workflow's execution event history exceeds 25,000 entries.

  • You need a concurrency of more than 40 parallel iterations.

For more information about working with large-scale parallel workloads, see Orchestrating large-scale parallel workloads in your state machines.

For an introduction to using the Distributed Map state, see the tutorial Copying large-scale CSV data using Distributed Map.

Key concepts in this topic

Distributed mode

A processing mode of the Map state. In this mode, each iteration of the Map state runs as a child workflow execution that enables high concurrency. Each child workflow execution has its own execution history, which is separate from the parent workflow's execution history. This mode supports reading input from large-scale Amazon S3 data sources.

Distributed Map state

A Map state set to Distributed processing mode.

Map workflow

A set of steps that a Map state runs.

Child workflow execution

An iteration of the Distributed Map state. A child workflow execution has its own execution history, which is separate from the parent workflow's execution history.

Map Run

When you run a Map state in Distributed mode, Step Functions creates a Map Run resource. A Map Run refers to a set of child workflow executions that a Distributed Map state starts, and the runtime settings that control these executions. Step Functions assigns an Amazon Resource Name (ARN) to your Map Run. You can examine a Map Run in the Step Functions console. You can also invoke the DescribeMapRun API action. A Map Run also emits metrics to CloudWatch.

For more information, see Examining Map Run.

Distributed Map state fields

To use the Distributed Map state in your workflows, specify one or more of these fields. You specify these fields in addition to the common state fields.

Type (Required)

Sets the type of state, such as Map.

ItemProcessor (Required)

Contains the following JSON objects that specify the Map state processing mode and definition.

  • ProcessorConfig – A JSON object that specifies the configuration for the Map state. This object contains the following sub-fields:

    • Mode – Set to DISTRIBUTED to use the Map state in Distributed mode. Currently, you can't set the Mode to DISTRIBUTED in Express workflows.

    • ExecutionType – Specifies the execution type for the Map workflow as either STANDARD or EXPRESS. You must provide this field if you specified DISTRIBUTED for the Mode sub-field. For more information about workflow types, see Standard vs. Express Workflows.

  • StartAt – Specifies a string that indicates the first state in a workflow. This string is case-sensitive and must match the name of one of the state objects. This state runs first for each item in the dataset. Any execution input that you provide to the Map state passes to the StartAt state first.

  • States – A JSON object containing a comma-delimited set of states. In this object, you define the Map workflow.

ItemReader (Optional)

Specifies a dataset and its location. The Map state receives its input data from the specified dataset.

In Distributed mode, you can use either a JSON payload passed from a previous state or a large-scale Amazon S3 data source as the dataset. For more information, see ItemReader.

ItemsPath (Optional)

Specifies a reference path using the JsonPath syntax to select the JSON node that contains an array of items inside the state input.

In Distributed mode, you specify this field only when you use a JSON array from a previous step as your state input. For more information, see ItemsPath.

ItemSelector (Optional)

Overrides the values of individual dataset items before they're passed on to each Map state iteration.

In this field, you specify a valid JSON input that contains a collection of key-value pairs. These pairs can either be static values that you define in your state machine definition, values selected from the state input using a path, or values accessed from the context object. For more information, see ItemSelector.

ItemBatcher (Optional)

Specifies to process the dataset items in batches. Each child workflow execution then receives a batch of these items as input. For more information, see ItemBatcher.

MaxConcurrency (Optional)

Specifies the number of child workflow executions that can run in parallel. The interpreter only allows up to the specified number of parallel child workflow executions. If you don't specify a concurrency value or set it to zero, Step Functions doesn't limit concurreny and runs 10,000 parallel child workflow executions.

Note

While you can specify a higher concurrency limit for parallel child workflow executions, we recommend that you don't exceed the capacity of a downstream AWS service, such as AWS Lambda.

ToleratedFailurePercentage (Optional)

Defines the percentage of failed items to tolerate in a Map Run. The Map Run automatically fails if it exceeds this percentage. Step Functions calculates the percentage of failed items as the result of the total number of failed or timed out items divided by the total number of items. You must specify a value between zero and 100. For more information, see Tolerated failure threshold.

ToleratedFailureCount (Optional)

Defines the number of failed items to tolerate in a Map Run. The Map Run automatically fails if it exceeds this number. For more information, see Tolerated failure threshold.

Label (Optional)

A string that uniquely identifies a Map state. For each Map Run, Step Functions adds the label to the Map Run ARN. The following is an example of a Map Run ARN with a custom label named demoLabel:

arn:aws:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b

If you don't specify a label, Step Functions automatically generates a unique label.

Note

Labels can't exceed 40 characters in length, must be unique within a state machine definition, and can't contain any of the following characters:

  • Whitespace characters

  • Wildcard characters (? *)

  • Bracket characters (< > { } [ ])

  • Special characters (: ; , \ | ^ ~ $ # % & ` ")

  • Control characters (\\u0000 - \\u001f or \\u007f - \\u009f).

Step Functions allows you to create names for state machines, executions, activities, and labels that contain non-ASCII characters. These non-ASCII names don't work with Amazon CloudWatch. To ensure that you can track CloudWatch metrics, choose a name that uses only ASCII characters.

ResultWriter (Optional)

Specifies the Amazon S3 location where Step Functions writes all child workflow execution results.

Step Functions consolidates all child workflow execution data, such as execution input and output, ARN, and execution status. It then exports executions with the same status to their respective files in the specified Amazon S3 location. For more information, see ResultWriter.

If you don't export the Map state results, it returns an array of all the child workflow execution results. For example:

[1, 2, 3, 4, 5]
ResultPath (Optional)

Specifies where in the input to place the output of the iterations. The input is then filtered as specified by the OutputPath field if present, before it is passed as the state's output. For more information, see Input and Output Processing.

ResultSelector (Optional)

Pass a collection of key-value pairs, where the values are static or selected from the result. For more information, see ResultSelector.

Retry (Optional)

An array of objects, called Retriers, that define a retry policy. An execution uses the retry policy if the state encounters runtime errors. For more information, see Examples using Retry and using Catch.

Note

If you define Retriers for the Distributed Map state, the retry policy applies to all of the child workflow executions the Map state started. For example, imagine your Map state started three child workflow executions, out of which one fails. When the failure occurs, the execution uses the Retry field, if defined, for the Map state. The retry policy applies to all the child workflow executions and not just the failed execution. If one or more child workflow executions fails, the Map Run fails.

When you retry a Map state, it creates a new Map Run.

Catch (Optional)

An array of objects, called Catchers, that define a fallback state. Step Functions uses the Catchers defined in Catch if the state encounters runtime errors. When an error occurs, the execution first uses any retriers defined in Retry. If the retry policy isn't defined or is exhausted, the execution uses its Catchers, if defined. For more information, see Fallback States.

Distributed Map state example

To use the Map state in Distributed mode, you must configure the following required options:

  • ItemReader – Specifies the dataset and its location from which the Map state can read input.

  • ItemProcessor – Specifies the following values:

    • ProcessorConfig – Set the Mode and ExecutionType to DISTRIBUTED and EXPRESS respectively. This sets the Map state's processing mode and the workflow type for child workflow executions that the Distributed Map state starts.

    • StartAt – The first state in the Map workflow.

    • States – Defines the Map workflow, which is a set of steps to repeat in each child workflow execution.

  • ResultWriter – Specifies the Amazon S3 location where Step Functions writes the Distributed Map state results.

You can optionally configure additional fields as explained in Distributed Map state input and output configuration.

The following is an example of a Distributed Map state definition that specifies the dataset as a CSV file stored in an Amazon S3 bucket. It also specifies a Lambda function that processes the data in each row of the CSV file. Because this example uses a CSV file, it also specifies the location of the CSV column headers. To view the complete state machine definition of this example, see the tutorial Copying large-scale CSV data using Distributed Map.

{ "Map": { "Type": "Map", "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database", "Key": "csv-dataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "LambdaTask", "States": { "LambdaTask": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:processCSVData" }, "End": true } } }, "Label": "Map", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "myOutputBucket", "Prefix": "csvProcessJobs" } } } }

Distributed Map state input and output configuration

You can configure the input and output for a Distributed Map state using its fields, such as ItemReader and ResultWriter.

The following examples show how you can configure the input and output in a Distributed Map state. To view the complete state machine definition containing these examples, see the tutorial Copying large-scale CSV data using Distributed Map.

Note

Based on your use case, you may not need to apply all of these fields. For more information about the following fields, see Map state input and output fields.

ItemReader

Use the ItemReader field to specify the location of the dataset from which the Map state reads its input data. The following example shows how to use a CSV file as a dataset by specifying the Amazon S3 bucket name and object key in which it's stored.

{ "Map": { "Type": "Map", ... "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database2022", "Key": "csv-dataset/ratings.csv" } }, ... "End": true } }
MaxItems

Use the MaxItems sub-field of the ItemReader field to limit the number of items the Map state can read from a dataset. For example, if you specify the MaxItems sub-field value as 90 for a CSV dataset, the Map state only reads the first 90 rows of your CSV file, starting after the header row.

{ "Map": { "Type": "Map", ... "ItemReader": { "ReaderConfig": { "MaxItems": 90, "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, ... }, ... "End": true } }
ItemsPath

If your dataset is a JSON input passed from a previous step in the workflow and it contains an array, use the ItemsPath field to select the node that contains the array.

For example, given the following JSON input:

{ "facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ] }

Use the ItemsPath field as shown in the following example to select the JSON node that contains the array:

"Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", ... } }, ... "ItemsPath": "$.facts", ... "End": true }
ItemSelector

Use the ItemSelector field to override the individual dataset items' values before they’re passed to the Map state iterations. To override the values, you specify a JSON input that contains a collection of key-value pairs. These pairs can either be static values that you provide in your state machine definition, values selected from the state input using a path, or values accessed from the context object.

For example, the following custom JSON input replaces the original input before it's passed on to the Map state iterations. Each child workflow execution receives the following custom input containing a static value and two context object data items.

{ "Map": { "Type": "Map", ... "ItemSelector": { "foo": "bear", "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }, ... "End": true } }
ItemBatcher

By default, the Map state passes each item in the dataset as input to individual child workflow executions. Use the ItemBatcher field to process a group of items in each child workflow execution. In this field, you can specify the maximum number of items to batch, the maximum batch size in bytes, or both these values. The interpreter adds the specified number of items to an Items array. It then passes the array as input to each child workflow execution. For example, if you specify the maximum items to batch as 10, the interpreter adds the specified number of items to an Items array in the input to each child workflow execution.

The following example shows how to batch 10 dataset items by specifying the MaxItemsPerBatch field:

{ "Map": { "Type": "Map", "ItemBatcher": { "MaxItemsPerBatch": 10 }, ... "End": true } }

The following example shows a batch of items inside the Items array that the Map state passes as input to a child workflow execution:

{ "Items": [ { "rating": "3.0", "movieId": "1244", "userId": "2", "timestamp": "1192913551" }, { "rating": "4.5", "movieId": "1296", "userId": "2", "timestamp": "1192913608" }, ... ] }
ResultWriter

Use the ResultWriter field to export the results of all child workflow executions to an Amazon S3 bucket. Exporting the results to an Amazon S3 bucket is helpful if your output payload size exceeds 256 KB. To export the results of all child workflow executions, specify the Amazon S3 bucket name and object prefix where you want to store the results.

{ "Map": { "Type": "Map", ... "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "processedOutput", "Prefix": "test-run" } }, ... "End": true } }

IAM policies to run Distributed Map state

When you include a Distributed Map state in your workflows, Step Functions needs appropriate permissions to allow the state machine role to invoke the StartExecution API action for the Distributed Map state.

The following IAM policy example grants the least privileges required to your state machine role for running the Distributed Map state.

Note

Make sure that you replace stateMachineName with the name of the state machine in which you're using the Distributed Map state. For example, arn:aws:states:us-east-2:123456789012:stateMachine:mystateMachine.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws:states:region:accountID:stateMachine:stateMachineName" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws:states:region:accountID:execution:stateMachineName/*" } ] }

In addition, you need to make sure that you've the least privileges necessary to access the AWS resources used in the Distributed Map state, such as Amazon S3 buckets. For information, see IAM policies for using Distributed Map state.

Distributed Map state execution

When you run a Map state in Distributed mode, Step Functions creates a Map Run resource. A Map Run refers to a set of child workflow executions that a Distributed Map state starts. You can view a Map Run in the Step Functions console. You can also invoke the DescribeMapRun API action. A Map Run also emits metrics to CloudWatch.

The Step Functions console provides a Map Run Details page, which displays all the information related to a Distributed Map state execution. For example, you can view the status of the Distributed Map state's execution, the Map Run ARN, statuses of the items processed in the child workflows executions started by the Distributed Map state, and a list of all the child workflow executions. The console shows this information in a dashboard format.

For more information about viewing a Distributed Map state's execution in the console, see Examining Map Run.