在 Step Functions 中使用 Lambda 函数处理单个项目 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Step Functions 中使用 Lambda 函数处理单个项目

在本教程中,您将使用分布式 Map 状态ItemBatcher (地图) 字段,使用 Lambda 函数迭代批次中的单个项目。分布式 Map 状态将启动四个子工作流执行。每个子工作流都运行一个内联 Map 状态内联 Map 状态每次迭代都会调用一个 Lambda 函数,并将批次中的单个项目传递给该函数。然后,Lambda 函数处理该项目并返回结果。

您将创建一个对整数数组执行乘法的状态机。假设输入的整数数组是 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],乘法因子是 7。这些整数与乘法因子 7 相乘后形成的数组将是 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

第 1 步:创建状态机

在此步骤中,您将创建状态机的工作流原型,该原型会将一批项目中的单个项目传递给您将在第 2 步中创建的 Lambda 函数的每次调用。

  • 使用以下定义通过 Step Functions 控制台创建状态机。有关创建状态机的信息,请参阅开始使用分布式 Map 状态教程中的第 1 步:创建工作流原型

    在此状态机中,您将定义一个分布式 Map 状态,该状态接受 10 个整数的数组作为输入,并将这些数组项分批传递给子工作流执行。每个子工作流程执行都会接收一批三个项目作为输入,并运行一个内联 Map 状态内联 Map 状态的每次迭代都会调用 Lambda 函数,并将批次中的一个项目传递给该函数。然后,此函数将该项与系数 7 相乘并返回结果。

    每个子工作流程执行的输出都是一个JSON数组,其中包含每个传递项目的乘法结果。

    重要

    请务必将以下代码中 Lambda 函数的 Amazon 资源名称 (ARN) 替换为您将在步骤 2 中创建ARN的函数的名称。

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

第 2 步:创建 Lambda 函数

在此步骤中,您将创建 Lambda 函数,用于处理批次传递的每个项目。

重要

确保您的 Lambda 函数处于相同状态 AWS 区域 就像你的状态机一样。

创建 Lambda 函数
  1. 使用 Lambda 控制台创建名为的 Python Lambda 函数。ProcessSingleItem有关创建 Lambda 函数的信息,请参阅开始使用分布式 Map 状态教程中的第 4 步:配置 Lambda 函数

  2. 复制以下 Lambda 函数代码,并将其粘贴到 Lambda 函数的代码源部分。

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. 创建 Lambda 函数后,复制页面右上角ARN显示的函数。以下是一个示例ARN,其中 函数名 是 Lambda 函数的名称(在本例中为):ProcessSingleItem

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    你需要在步骤 1 ARN 中创建的状态机中提供该函数。

  4. 选择部署,以部署更改。

第 3 步:运行状态机

运行状态机时,分布式 Map 状态会启动四个子工作流程执行,其中每个执行处理三个项目,而一个执行处理单个项目。

以下示例显示了在子工作流执行中传递给其中一个 ProcessSingleItem 函数调用的数据。

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

给定此输入,以下示例显示了 Lambda 函数返回的输出。

{ "statusCode": 200, "multiplied": 7 }

以下示例显示了其中一个子工作流程执行的输出JSON数组。

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

状态机返回以下输出,其中包含四个子工作流执行的四个数组。这些数组包含各个输入项的乘法结果。

最后,状态机输出是一个名为 multiplied 的数组,组合了为四个子工作流执行返回的所有乘法结果。

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

要将子工作流执行返回的所有乘法结果合并到单个输出数组中,可以使用 ResultSelector 字段。在分布式 Map 状态 中定义此字段以查找所有结果,提取单个结果,然后将它们组合成一个名为 multiplied 的输出数组。

要使用 ResultSelector 字段,请更新您的状态机定义,如以下示例所示。

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

更新的状态机返回统一的输出数组,如以下示例所示。

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }