Lambda 関数で個々のデータ項目を処理する - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Lambda 関数で個々のデータ項目を処理する

このチュートリアルでは、分散マップ状態ItemBatcher フィールドを使用して、Lambda 関数を使用してバッチに存在する個々のアイテムを反復処理します。分散マップ状態は、4 つの子ワークフローの実行を開始します。これらの子ワークフローはそれぞれインラインマップステートを実行します。インラインマップステートでは、反復のたびに Lambda 関数を呼び出し、バッチから 1 つの項目を関数に渡します。次に、Lambda 関数は項目を処理し、結果を返します。

整数の配列を乗算するステートマシンを作成します。入力として指定した整数配列が [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] で、乗算係数が 7 であるとします。この場合、これらの整数に係数 7 を掛けた結果の配列は [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] になります。

ステップ 1: ステートマシンを作成する

このステップでは、ステップ 2 で作成する Lambda 関数のそれぞれの呼び出しに、項目のバッチから 1 つの項目を渡すステートマシンのワークフロープロトタイプを作成します。

  • 次の定義を使用して、Step Functions コンソールを使用してステートマシンを作成します。ステートマシンの作成については、チュートリアル「分散マップ状態の使用を開始する」の「ステップ 1: ワークフロープロトタイプを作成する」を参照してください。

    このステートマシンでは、入力として 10 個の整数の配列を入力として受け取り、これらの配列項目をバッチで子ワークフロー実行に渡す分散マップ状態を定義します。子ワークフローを実行するたびに、入力として 3 つの項目のバッチが受け取り、インラインマップステートを実行します。インラインマップステートでは、反復のたびに Lambda 関数を呼び出し、バッチから 1 つの項目を関数に渡します。次に、この関数は項目に 7 の係数を掛けて、その結果を返します。

    子ワークフロー実行のそれぞれの出力は、渡された各項目の乗算結果を含む JSON 配列です。

    重要

    次のコードの Lambda 関数の Amazon リソースネーム (ARN) は、ステップ 2 で作成する関数の ARN に置き換えてください。

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

ステップ 2 : Lambda 関数を作成する

このステップでは、バッチから渡される各項目を処理する Lambda 関数を作成します。

重要

Lambda 関数がステートマシン AWS リージョン と同じ にあることを確認します。

Lambda 関数を作成するには
  1. Lambda コンソールを使用して、ProcessSingleItem という名前の Python 3.9 Lambda 関数を作成します。Lambda 関数の作成の詳細については、「分散マップ状態の使用開始」チュートリアルの「ステップ 4: Lambda 関数を設定する」を参照してください。 分散マップを使用した大規模 CSV データのコピー

  2. Lambda 関数用の次のコードをコピーし、Lambda 関数の [コードソース] セクションに貼り付けます。

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. Lambda 関数を作成したら、ページの右上隅に表示されている関数の ARN をコピーします。ARN をコピーするには、 
                  icon to copy the Lambda function's Amazon Resource Name
                をクリックします。ARN の例を次に示します。ここで、function-name は Lambda 関数の名前 (この場合は ProcessSingleItem) です。

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    ステップ 1 で作成したステートマシンに関数 ARN を指定する必要があります。

  4. [デプロイ] を選択して、変更をデプロイします。

ステップ 3: ステートマシンを実行する

ステートマシンを実行すると、分散マップ状態では、4 つの子ワークフロー実行が開始されます。それぞれの実行では 3 つの項目が処理され、1 つの実行では 1 つの項目が処理されます。

次の例は、子ワークフロー実行内の ProcessSingleItem 関数呼び出しの 1 つに渡されるデータを示しています。

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

この入力の場合、次の例では Lambda 関数によって返される出力を示しています。

{ "statusCode": 200, "multiplied": 7 }

次の例は、子ワークフローを実行の 1 つに対する出力 JSON 配列を示しています。

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

ステートマシンは、4 つの子ワークフロー実行のための 4 つの配列を含む、次の出力を返します。これらの配列には、個々の入力項目の乗算結果が含まれます。

最後に、ステートマシンの出力は、4 つの子ワークフロー実行に返された乗算結果をすべて組み合わせた、multiplied という名前の配列です。

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

子ワークフローの実行によって返されたすべての乗算結果を 1 つの出力配列にまとめるには、ResultSelector フィールドを使用できます。分散マップ状態内でこのフィールドを定義すると、すべての結果を検索し、それらを multiplied という名前の 1 つの出力配列に結合できます。

ResultSelector フィールドを使用するには、次の例に示すようにステートマシン定義を更新します。

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

更新されたステートマシンは、次の例に示すように統合された出力配列を返します。

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }