使用 Step Functions 函數中的 Lambda 函數處理批次資料 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Step Functions 函數中的 Lambda 函數處理批次資料

在本教學課程中,您會使用分散式地圖狀態ItemBatcher (地圖)欄位來處理 Lambda 函數內的整批項目。每個批次最多包含三個項目。「分散式對應」狀態會啟動四個子工作流程執行,其中每個執行都會處理三個項目,而一個執行則會處理單一項目。每個子工作流程執行都會叫用 Lambda 函數,該函數會重複執行批次中存在的個別項目。

您將創建一個對整數數組執行乘法的狀態機。假設您提供的整數數組作為輸入是[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],乘法因子為7。然後,將這些整數乘以 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] 7 的因子之後形成的結果數組。

步驟 1:建立狀態機

在此步驟中,您會建立狀態機器的工作流程原型,將整批資料傳遞至您將在步驟 2 中建立的 Lambda 函數。

  • 使用下列定義可使用 Step Functions 主控台建立狀態機器。如需有關建立狀態機器的資訊,請參閱〈使用分散式地圖狀態入門〉自學課程步驟 1:建立工作流程原型中的〈〉。

    在這個狀態機器中,您可以定義一個分散式地圖狀態,該狀態接受 10 個整數作為輸入的陣列,並將此陣列分批傳遞給 Lambda 函數3。Lambda 函數迭代存在於批處理中的各個項目,並返回一個名為multiplied的輸出數組。輸出陣列包含對輸入陣列中傳遞的項目執行乘法的結果。

    重要

    請務必將下列程式碼中的 Lambda 函數的 Amazon 資源名稱 (ARN) 取ARN代為您將在步驟 2 中建立的函數。

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

步驟 2:建立 Lambda 函數

在此步驟中,您會建立 Lambda 函數來處理批次中傳遞的所有項目。

重要

確保您的 Lambda 函數處於相同的狀態 AWS 區域 作為你的狀態機。

建立 Lambda 函數
  1. 使用 L ambda 主控台建立名為的 Python Lambda 函數ProcessEntireBatch。如需建立 Lambda 函數的相關資訊,請參閱〈使用分散式地圖狀態入門〉教學課程中的步驟 4:設定 Lambda 函數

  2. 複製 Lambda 函數的下列程式碼,並將其貼到 Lambda 函數的程式碼來源區段中。

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. 建立 Lambda 函數後,複製頁面右上角ARN顯示的函數。下面是一個例子ARN,其中 功能名 是 Lambda 函數的名稱 (在本例中為ProcessEntireBatch):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    您需要在步驟 1 ARN 中建立的狀態機中提供函數。

  4. 選擇部署以部署變更。

步驟 3:運行狀態機

當您執行狀態機器時,分散式地圖狀態會啟動四個子工作流程執行,其中每個執行會處理三個項目,而一個執行則處理單一項目。

下列範例顯示其中一個子工作流程執行傳遞至ProcessEntireBatch函數的資料。

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

根據此輸入,下列範例會顯示由 Lambda 函數傳回multiplied的名為的輸出陣列。

{ "statusCode": 200, "multiplied": [7, 14, 21] }

狀態機器會傳回下列輸出,其中包含四個以四個子工作流程執行命名multiplied的陣列。這些陣列包含個別輸入項目的乘法結果。

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

要將返回的所有數組項合併到單個輸出數組中,可以使用該ResultSelector字段。在「分佈式地圖」狀態中定義此字段以查找所有數multiplied組,提取這些數組中的所有項目,然後將它們合併為單個輸出數組。

若要使用ResultSelector欄位,請更新狀態機定義,如下列範例所示。

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

更新後的狀態機會傳回合併的輸出陣列,如下列範例所示。

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }