使用 Lambda 函數處理個別資料項目 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 函數處理個別資料項目

在本教學課程中,您可以使用「分散式地圖」狀態ItemBatcher欄位,使用 Lambda 函數對批次中存在的個別項目進行迭代。「分散式對應」狀態會啟動四個子工作流程執行。這些子工作流程中的每一個都會執行內嵌對映狀態。對於其每次迭代,內聯映射狀態調用 Lambda 函數,並將單個項目從批處理傳遞給函數。Lambda 函數接著會處理項目並傳回結果。

您將創建一個狀態機,該狀態機器對整數數組執行乘法。假設您提供的整數數組作為輸入是[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],乘法因子為7。然後,將這些整數乘以 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] 7 的因子之後形成的結果數組。

步驟 1:建立狀態機

在此步驟中,您會建立狀態機器的工作流程原型,將單一項目從批次項目傳遞至您將在步驟 2 中建立的 Lambda 函數的每次叫用。

  • 使用下列定義可使用 Step Functions 主控台建立狀態機器。如需有關建立狀態機器的資訊,請參閱〈使用分散式地圖狀態入門〉自學課程步驟 1:建立工作流程原型中的〈〉。

    在此狀態機器中,您可以定義接受 10 個整數的陣列作為輸入的分散式地圖狀態,並將這些陣列項目批次傳遞給子工作流程執行。每個子工作流程執行都會接收三個項目的批次作為輸入,並執行內嵌對應狀態內聯地圖狀態的每一次迭代調用 Lambda 函數,並將項目從批處理傳遞給函數。然後,此函數會將項目乘以係數,7並傳回結果。

    每個子工作流程執行的輸出都是 JSON 陣列,其中包含每個傳遞項目的乘法結果。

    重要

    請務必將下列程式碼中的 Lambda 函數的 Amazon 資源名稱 (ARN) 取代為您將在步驟 2 中建立之函數的 ARN。

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

步驟 2:建立 Lambda 函數

在此步驟中,您會建立 Lambda 函數來處理從批次傳遞的每個項目。

重要

確保您的 Lambda 函數與狀態機 AWS 區域 相同。

建立 Lambda 函數
  1. 使用 L ambda 主控台建立名為的 Python 3.9 Lambda 函數ProcessSingleItem。如需建立 Lambda 函數的相關資訊,請參閱〈使用分散式地圖狀態入門〉教學課程中的步驟 4:設定 Lambda 函數

  2. 複製 Lambda 函數的下列程式碼,並將其貼到 Lambda 函數的程式碼原始碼區段中。

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. 建立 Lambda 函數之後,請複製頁面右上角顯示的函數 ARN。若要複製 ARN,請按一下。 icon to copy the Lambda function's Amazon Resource Name 以下是 ARN 範例,其中function-name是 Lambda 函數的名稱 (在本例中為ProcessSingleItem):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    您需要在步驟 1 中創建的狀態機中提供函數 ARN。

  4. 選擇部署以部署變更。

步驟 3:運行狀態機

當您執行狀態機器時,分散式地圖狀態會啟動四個子工作流程執行,其中每個執行會處理三個項目,而一個執行則處理單一項目。

下列範例顯示傳遞至子工作流程執行內其中一個ProcessSingleItem函數叫用的資料。

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

有了這個輸入,下列範例會顯示 Lambda 函數傳回的輸出。

{ "statusCode": 200, "multiplied": 7 }

下列範例顯示其中一個子工作流程執行的輸出 JSON 陣列。

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

狀態機會傳回下列輸出,其中包含四個子工作流程執行的四個陣列。這些陣列包含個別輸入項目的乘法結果。

最後,狀態機輸出是一個名為的陣列,結合multiplied了針對四個子工作流程執行傳回的所有乘法結果。

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

若要將子工作流程執行傳回的所有乘法結果合併為單一輸出陣列,您可以使用ResultSelector欄位。在「分散式地圖」狀態中定義此欄位以尋找所有結果、擷取個別結果,然後將它們合併為一個名為的單一輸出陣列multiplied

若要使用ResultSelector欄位,請更新狀態機定義,如下列範例所示。

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

更新後的狀態機會傳回合併的輸出陣列,如下列範例所示。

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }