Lambda 함수를 사용하여 개별 데이터 항목 처리 - AWS Step Functions

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Lambda 함수를 사용하여 개별 데이터 항목 처리

이 자습서에서는 Distributed Map 상태ItemBatcher 필드를 사용하여 Lambda 함수를 통해 배치로 있는 개별 항목을 반복합니다. Distributed Map 상태는 하위 워크플로 실행 4개를 시작합니다. 이러한 각 하위 워크플로는 Inline Map 상태를 실행합니다. 반복마다 Inline Map 상태는 Lambda 함수를 간접적으로 호출하고 배치의 단일 항목을 함수로 전달합니다. 그러면 Lambda 함수에서 항목을 처리하고 결과를 반환합니다.

정수 배열에서 곱셈을 수행하는 상태 시스템을 만듭니다. 입력으로 제공하는 정수 배열은 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]이고 곱셈 인수는 7입니다. 그러면 이러한 정수에 7의 인수를 곱한 후 생성된 결과 배열은 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]입니다.

1단계: 상태 시스템 만들기

이 단계에서는 배치의 단일 항목을 2단계에서 만들 Lambda 함수의 각 간접 호출에 전달하는 상태 시스템의 워크플로 프로토타입을 만듭니다.

  • 다음 정의를 사용하여 Step Functions 콘솔로 상태 시스템을 만듭니다. 상태 시스템을 만드는 방법은 Distributed Map 상태 사용 시작하기 자습서의 1단계: 워크플로 프로토타입 만들기를 참조하세요.

    이 상태 시스템에서 정수 10개 배열을 입력으로 수락하고 이러한 배열 항목을 배치로 하위 워크플로 실행에 전달하는 Distributed Map 상태를 정의합니다. 각 하위 워크플로 실행은 항목 3개 배치를 입력으로 수신하고 Inline Map 상태를 실행합니다. Inline Map 상태가 반복될 때마다 Lambda 함수가 간접적으로 호출되고 배치의 항목이 함수로 전달됩니다. 그런 다음 이 함수는 항목에 승수 7을 곱하고 결과를 반환합니다.

    각 하위 워크플로 실행 출력은 전달된 각 항목에 대한 곱셈 결과가 포함된 JSON 배열입니다.

    중요

    다음 코드에 있는 Lambda 함수의 Amazon 리소스 이름(ARN)을 2단계에서 만들 함수의 ARN으로 교체해야 합니다.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

2단계: Lambda 함수 만들기

이 단계에서는 배치에서 전달된 각 항목을 처리하는 Lambda 함수를 만듭니다.

중요

Lambda 함수가 상태 머신과 AWS 리전 동일한 위치에 있는지 확인하십시오.

Lambda 함수를 생성하려면
  1. Lambda 콘솔을 사용하여 ProcessSingleItem라는 Python 3.9 Lambda 함수를 만듭니다. Lambda 함수를 만드는 방법은 Distributed Map 상태 사용 시작하기 자습서의 4단계: Lambda 함수 구성을 참조하세요.

  2. Lambda 함수의 다음 코드를 복사하여 Lambda 함수의 코드 소스 섹션에 붙여넣습니다.

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. Lambda 함수를 만든 후 페이지 오른쪽 상단에 표시된 함수의 ARN을 복사합니다. ARN을 복사하려면 icon to copy the Lambda function's Amazon Resource Name 를 클릭합니다. 다음은 예제 ARN입니다. 여기서 function-name은 Lambda 함수 이름입니다(이 경우 ProcessSingleItem).

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    1단계에서 만든 상태 시스템에 함수 ARN을 제공해야 합니다.

  4. 배포를 선택하여 변경 사항을 배포합니다.

3단계: 상태 시스템 실행

상태 시스템을 실행하면 Distributed Map 상태에서 하위 워크플로 실행 4개를 시작합니다. 여기서 각 실행은 항목 3개를 처리하고 실행 하나는 단일 항목을 처리합니다.

다음 예제에서는 하위 워크플로 실행 내의 ProcessSingleItem 함수 간접 호출 중 하나에 전달된 데이터를 보여줍니다.

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

이 입력이 제공되면 다음 예제에서는 Lambda 함수에서 반환한 출력을 보여줍니다.

{ "statusCode": 200, "multiplied": 7 }

다음 예제에서는 하위 워크플로 실행 중 하나의 출력 JSON 배열을 보여줍니다.

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

상태 시스템은 하위 워크플로 실행 4개에 대한 배열 4개가 포함된 다음 출력을 반환합니다. 이러한 배열에는 개별 입력 항목의 곱셈 결과가 포함됩니다.

마지막으로, 상태 시스템 출력은 하위 워크플로 실행 4개에 대해 반환된 모든 곱셈 결과를 합친 multiplied 배열입니다.

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

하위 워크플로 실행에서 반환한 모든 곱셈 결과를 단일 출력 배열로 결합하려면 ResultSelector 필드를 사용하면 됩니다. Distributed Map 상태 내에서 이 필드를 정의하여 모든 결과를 찾고 개별 결과를 추출한 다음 multiplied 단일 출력 배열로 결합합니다.

ResultSelector 필드를 사용하려면 다음 예제와 같이 상태 시스템 정의를 업데이트합니다.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

업데이트된 상태 시스템에서 다음 예제와 같이 통합된 출력 배열을 반환합니다.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }