本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Step Functions 函數中的 Lambda 函數處理批次資料
在本教學課程中,您會使用分散式地圖狀態的ItemBatcher (地圖)欄位來處理 Lambda 函數內的整批項目。每個批次最多包含三個項目。「分散式對應」狀態會啟動四個子工作流程執行,其中每個執行都會處理三個項目,而一個執行則會處理單一項目。每個子工作流程執行都會叫用 Lambda 函數,該函數會重複執行批次中存在的個別項目。
您將創建一個對整數數組執行乘法的狀態機。假設您提供的整數數組作為輸入是[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
,乘法因子為7
。然後,將這些整數乘以 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
7 的因子之後形成的結果數組。
步驟 1:建立狀態機
在此步驟中,您會建立狀態機器的工作流程原型,將整批資料傳遞至您將在步驟 2 中建立的 Lambda 函數。
-
使用下列定義可使用 Step Functions 主控台
建立狀態機器。如需有關建立狀態機器的資訊,請參閱〈使用分散式地圖狀態入門〉自學課程步驟 1:建立工作流程原型中的〈〉。 在這個狀態機器中,您可以定義一個分散式地圖狀態,該狀態接受 10 個整數作為輸入的陣列,並將此陣列分批傳遞給 Lambda 函數
3
。Lambda 函數迭代存在於批處理中的各個項目,並返回一個名為multiplied
的輸出數組。輸出陣列包含對輸入陣列中傳遞的項目執行乘法的結果。重要
請務必將下列程式碼中的 Lambda 函數的 Amazon 資源名稱 (ARN) 取ARN代為您將在步驟 2 中建立的函數。
{ "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:
functionName
" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }
步驟 2:建立 Lambda 函數
在此步驟中,您會建立 Lambda 函數來處理批次中傳遞的所有項目。
重要
確保您的 Lambda 函數處於相同的狀態 AWS 區域 作為你的狀態機。
建立 Lambda 函數
-
使用 L ambda 主控台
建立名為的 Python Lambda 函數 ProcessEntireBatch
。如需建立 Lambda 函數的相關資訊,請參閱〈使用分散式地圖狀態入門〉教學課程中的步驟 4:設定 Lambda 函數。 -
複製 Lambda 函數的下列程式碼,並將其貼到 Lambda 函數的程式碼來源區段中。
import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
-
建立 Lambda 函數後,複製頁面右上角ARN顯示的函數。下面是一個例子ARN,其中
是 Lambda 函數的名稱 (在本例中為功能名
ProcessEntireBatch
):arn:aws:lambda:us-east-1:123456789012:function:
function-name
您需要在步驟 1 ARN 中建立的狀態機中提供函數。
-
選擇部署以部署變更。
步驟 3:運行狀態機
當您執行狀態機器時,分散式地圖狀態會啟動四個子工作流程執行,其中每個執行會處理三個項目,而一個執行則處理單一項目。
下列範例顯示其中一個子工作流程執行傳遞至ProcessEntireBatch函數的資料。
{
"BatchInput": {
"MyMultiplicationFactor": 7
},
"Items": [1, 2, 3]
}
根據此輸入,下列範例會顯示由 Lambda 函數傳回multiplied
的名為的輸出陣列。
{
"statusCode": 200,
"multiplied": [7, 14, 21]
}
狀態機器會傳回下列輸出,其中包含四個以四個子工作流程執行命名multiplied
的陣列。這些陣列包含個別輸入項目的乘法結果。
[
{
"statusCode": 200,
"multiplied": [7, 14, 21]
},
{
"statusCode": 200,
"multiplied": [28, 35, 42]
},
{
"statusCode": 200,
"multiplied": [49, 56, 63]
},
{
"statusCode": 200,
"multiplied": [70]
}
]
要將返回的所有數組項合併到單個輸出數組中,可以使用該ResultSelector字段。在「分佈式地圖」狀態中定義此字段以查找所有數multiplied
組,提取這些數組中的所有項目,然後將它們合併為單個輸出數組。
若要使用ResultSelector
欄位,請更新狀態機定義,如下列範例所示。
{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }
更新後的狀態機會傳回合併的輸出陣列,如下列範例所示。
{
"multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
}