Amazon DynamoDB에서 AWS Lambda 사용 - AWS Lambda

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon DynamoDB에서 AWS Lambda 사용

AWS Lambda 함수를 사용하여 Amazon DynamoDB 데이터 스트림의 레코드를 처리할 수 있습니다. DynamoDB Streams를 사용하여 DynamoDB 테이블이 업데이트될 때마다 추가 작업을 수행하는 Lambda 함수를 트리거할 수 있습니다.

Lambda는 스트림에서 레코드를 읽고 스트림 레코드를 포함한 이벤트와 동기적으로 함수를 호출합니다. Lambda는 배치의 레코드를 읽고 함수를 호출하여 배치의 레코드를 처리합니다.

예제 이벤트

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

폴링 및 배치 처리 스트림

Lambda는 초당 4회의 기본 속도로 레코드에 대해 DynamoDB 스트림의 샤드를 폴링합니다. 레코드를 사용할 수 있으면 Lambda가 함수를 호출하고 결과를 기다립니다. 처리가 성공하면 Lambda가 레코드를 더 받을 때까지 폴링을 재개합니다.

기본적으로, Lambda는 레코드가 사용 가능하게 되는 즉시 함수를 호출합니다. Lambda가 이벤트 소스에서 읽는 배치에 하나의 레코드만 있는 경우, Lambda는 함수에 하나의 레코드만 전송합니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 일괄 처리 기간을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 전체 배치가 수집되거나, 일괄 처리 기간이 만료되거나, 배치가 페이로드 한도인 6MB에 도달할 때까지 이벤트 소스에서 레코드를 계속 읽습니다. 자세한 설명은 일괄 처리 동작 섹션을 참조하세요.

주의

Lambda 이벤트 소스 매핑은 각 이벤트를 한 번 이상 처리하므로 일괄 처리가 중복될 수 있습니다. 중복 이벤트와 관련된 잠재적 문제를 방지하려면 함수 코드를 idempotent로 만드는 것이 좋습니다. 자세히 알아보려면 지식 센터에서 Lambda 함수를 동등하게 만드는 방법을 참조하십시오. AWS

함수가 오류를 반환하면 Lambda는 처리가 성공할 때까지 또는 데이터가 만료될 때까지 배치(batch)를 재시도합니다. 샤드 중지를 방지하기 위해 보다 작은 배치 크기로 재시도하거나, 재시도 횟수를 제한하거나, 너무 오래된 레코드를 폐기하도록 이벤트 소스 매핑을 구성할 수 있습니다. 폐기된 이벤트를 유지하기 위해 실패한 배치에 대한 세부 정보를 표준 SQS 대기열 또는 표준 SNS 주제로 보내도록 이벤트 소스 매핑을 구성할 수 있습니다.

동시성을 높이려면 또한 각 샤드의 여러 배치를 병렬로 처리할 수도 있습니다. Lambda는 각 샤드에서 최대 10개의 배치를 동시에 처리할 수 있습니다. 샤드당 동시 배치 수를 높이는 경우에도 Lambda는 파티션-키 수준에서의 순차 처리를 계속 보장합니다.

Kinesis 또는 DynamoDB 데이터 스트림의 한 샤드와 하나 이상의 Lambda 호출을 동시에 처리하도록 ParallelizationFactor 설정을 구성합니다. Lambda가 병렬화 계수를 통해 샤드에서 폴링하는 동시 배치의 수는 1(기본값)부터 10까지 지정할 수 있습니다. 예를 들어 ParallelizationFactor를 2로 설정하는 경우 최대 100개의 Kinesis 데이터 샤드를 처리하기 위한 200번의 동시 Lambda 간접 호출을 보유할 수 있습니다(실제로 ConcurrentExecutions 지표의 값은 다를 수 있음). 이는 데이터 볼륨이 일시적이고 IteratorAge가 높을 때 처리량을 확장하는 데 도움을 줍니다.

Kinesis 어그리게이션과 ParallelizationFactor 함께 사용할 수도 있습니다. 이벤트 소스 매핑의 동작은 향상된 팬아웃을 사용하는지 여부에 따라 달라집니다.

  • 향상된 팬아웃이 없는 경우: 집계된 이벤트 내의 모든 이벤트는 동일한 파티션 키를 가져야 합니다. 파티션 키도 집계된 이벤트의 파티션 키와 일치해야 합니다. 집계된 이벤트 내의 이벤트의 파티션 키가 서로 다른 경우, Lambda는 파티션 키를 기준으로 이벤트를 순서대로 처리한다고 보장할 수 없습니다.

  • 향상된 팬아웃 기능: 먼저 Lambda는 집계된 이벤트를 개별 이벤트로 디코딩합니다. 집계된 이벤트는 포함된 이벤트와 다른 파티션 키를 가질 수 있습니다. 하지만 파티션 키에 해당하지 않는 이벤트는 삭제되고 손실됩니다. Lambda는 이러한 이벤트를 처리하지 않으며 구성된 장애 목적지로 전송하지도 않습니다.

폴링 및 스트리밍 시작 위치

이벤트 소스 매핑 생성 및 업데이트 중 스트림 폴링은 최종적으로 일관됩니다.

  • 이벤트 소스 매핑 생성 중 스트림에서 이벤트 폴링을 시작하는 데 몇 분 정도 걸릴 수 있습니다.

  • 이벤트 소스 매핑 업데이트 중 스트림에서 이벤트 폴링을 중지했다가 다시 시작하는 데 몇 분 정도 걸릴 수 있습니다.

이 동작은 스트림의 시작 위치로 LATEST를 지정하면 이벤트 소스 매핑이 생성 또는 업데이트 중에 이벤트를 놓칠 수 있음을 의미합니다. 누락된 이벤트가 없도록 스트림 시작 위치를 TRIM_HORIZON으로 지정하세요.

DynamoDB Streams 내 샤드의 동시 리더

글로벌 테이블이 아닌 단일 리전 테이블의 경우 최대 2개의 Lambda 함수가 동시에 동일 DynamoDB Streams 샤드에서 읽기 작업을 수행하도록 설계할 수 있습니다. 이 제한을 초과하면 요청 병목이 발생할 수 있습니다. 글로벌 테이블의 경우 요청 제한을 피하기 위해 동시 함수 수를 1로 제한하는 것이 좋습니다.

실행 역할 권한

AWSLambdaDynamoDBExecutionRoleAWS관리형 정책에는 Lambda가 DynamoDB 스트림에서 읽는 데 필요한 권한이 포함됩니다. 이 관리형 정책을 함수의 실행 역할에 추가하십시오.

실패한 배치의 레코드를 표준 SQS 대기열 또는 표준 SNS 주제로 보내려면 함수에 추가 권한이 필요합니다. 다음과 같이 각 대상 서비스에는 서로 다른 권한이 필요합니다.

권한 추가 및 이벤트 소스 매핑 생성

이벤트 소스 매핑을 생성하여 Lambda가 스트림의 레코드를 Lambda 함수로 전송하도록 지시합니다. 여러 이벤트 소스 매핑을 생성하여 여러 Lambda 함수로 동일한 데이터를 처리하거나, 단일 함수로 여러 스트림의 항목을 처리할 수 있습니다.

함수를 DynamoDB 스트림에서 읽도록 구성하려면 AWSLambdaDynamoDBExecutionRoleAWS관리형 정책을 실행 역할에 연결한 다음 DynamoDB 트리거를 생성하십시오.

권한을 추가하고 트리거를 생성하려면
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수의 이름을 선택합니다.

  3. 구성(Configuration) 탭을 선택한 다음, 권한(Permissions)을 선택합니다.

  4. 역할 이름에서 실행 역할 링크를 선택합니다. 이 링크는 IAM 콘솔에서 역할을 엽니다.

    
              실행 역할 링크
  5. 권한 추가를 선택하고 정책 연결을 선택합니다.

    
              IAM 콘솔에 정책 연결
  6. 검색 필드에 AWSLambdaDynamoDBExecutionRole를 입력합니다. 이 정책을 실행 역할에 추가하십시오. 이는 함수가 DynamoDB 스트림에서 읽는 데 필요한 권한을 포함하는 AWS 관리형 정책입니다. 이 정책에 대한 자세한 내용은 AWS관리형 정책 참조를 참조하십시오 AWSLambdaDynamoDBExecutionRole.

  7. Lambda 콘솔에서 함수로 돌아가십시오. 함수 개요(Function overview)에서 트리거 추가(Add trigger)를 선택합니다.

    
              Lambda 콘솔의 함수 개요 섹션
  8. 트리거 유형을 선택합니다.

  9. 필요한 옵션을 구성한 다음 추가를 선택합니다.

Lambda는 DynamoDB 이벤트 소스에 대해 다음 옵션을 지원합니다.

이벤트 소스 옵션
  • DynamoDB 테이블 – 레코드를 읽을 DynamoDB 테이블입니다.

  • 배치 크기(Batch size) - 각 배치에서 함수에 보낼 레코드 수입니다(최대 10,000개). Lambda는 한 번의 호출로 배치의 모든 레코드를 함수에 전달합니다. 단, 이벤트의 총 크기가 동기식 호출에 대한 페이로드 한도(6MB)를 초과하지 않아야 합니다.

  • 배치 기간(Batch window) - 함수를 호출하기 전에 레코드를 수집할 최대 기간(단위: 초)를 지정합니다.

  • 시작 위치 – 새 레코드만, 또는 기존의 모든 레코드를 처리합니다.

    • 최신 – 스트림에 추가된 새 레코드를 처리합니다.

    • 수평 트리밍 – 스트림의 모든 레코드를 처리합니다.

    기존 레코드 처리 후 함수는 캐치업되고 새 레코드를 계속 처리합니다.

  • On-failure destination(실패 시 대상) – 처리할 수 없는 레코드에 대한 표준 SQS 대기열 또는 표준 SNS 주제입니다. 너무 오래되었거나 모든 재시도를 다 사용한 레코드 배치를 폐기할 때, Lambda는 해당 배치에 대한 세부 정보를 대기열 또는 주제로 보냅니다.

  • Retry attempts(재시도) - 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 이는 배치가 함수에 도달하지 않은 제한 또는 서비스 오류에 적용되지 않습니다.

  • Maximum age of record(최대 레코드 사용 기간) – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다.

  • Split batch on error(오류 시 배치 분할) – 함수에서 오류를 반환하면 재시도하기 전에 배치를 두 개로 분할합니다. 원래 배치 크기 설정은 변경되지 않습니다.

  • Concurrent batches per shard(샤드당 동시 배치) – 동일한 샤드의 여러 배치를 동시에 처리합니다.

  • 활성화 – 이벤트 소스 매핑을 활성화하려면 true로 설정합니다. 레코드 처리를 중지하려면 false로 설정합니다. Lambda는 마지막으로 처리된 레코드를 추적하여 매핑이 다시 활성화되면 해당 지점부터 처리를 다시 시작합니다.

참고

DynamoDB 트리거의 일부로 Lambda에서 호출한 GetRecords API 호출에는 요금이 부과되지 않습니다.

나중에 이벤트 소스 구성을 관리하기 위해 디자이너에서 트리거를 선택합니다.

이벤트 소스 매핑 API

AWS Command Line Interface(AWS CLI) 또는 AWS SDK를 사용하여 이벤트 소스를 관리하려면 다음 API 작업을 사용할 수 있습니다.

다음 예제는 AWS CLI를 사용하여 배치 크기가 500인 Amazon 리소스 이름(ARN)이 지정하는 DynamoDB 스트림에 my-function이라는 함수를 매핑합니다.

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525

다음 결과가 표시됩니다.

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

배치가 처리되는 방법을 사용자 지정하고 처리할 수 없는 레코드를 폐기할 때를 지정하는 추가 옵션을 구성합니다. 다음 예제는 두 번의 재시도 후 또는 레코드의 시간이 1시간을 넘는 경우 실패 레코드를 표준 SQS 대기열로 보내는 이벤트 소스 매핑을 업데이트합니다.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

다음 출력이 나타나야 합니다.

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

업데이트된 설정은 비동기식으로 적용되며 프로세스가 완료된 후에야 출력에 반영됩니다. get-event-source-mapping 명령을 사용하여 현재 상태를 봅니다.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

다음 출력이 나타나야 합니다.

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

여러 배치를 동시에 처리하려면 --parallelization-factor 옵션을 사용합니다.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

오류 처리

DynamoDB 스트림에서 레코드를 읽는 이벤트 소스 매핑은 동기식으로 함수를 호출하고 오류 시 재시도합니다. Lambda에서 함수가 제한되거나 함수 호출 없이 오류를 반환하는 경우, 레코드가 만료되거나 이벤트 소스 매핑에서 구성한 최대 사용 기간을 초과할 때까지 Lambda에서 재시도합니다.

함수가 오류를 반환하는 레코드를 받으면 Lambda에서 배치 레코드가 만료되거나, 최대 기간을 초과하거나, 구성된 재시도 할당량에 도달할 때까지 재시도합니다. 함수 오류의 경우에도 실패한 배치를 두 배치로 분할하도록 이벤트 소스 매핑을 구성할 수 있습니다. 보다 작은 배치로 재시도하면 잘못된 레코드가 격리되고 시간 초과 문제가 해결됩니다. 배치를 분할하는 작업은 재시도 할당량 횟수에 포함되지 않습니다.

오류 처리에서 실패를 측정하는 경우 Lambda는 레코드를 폐기하고 스트림에서 배치 처리를 계속합니다. 기본 설정을 사용하는 경우 이는 잘못된 레코드가 영향을 받은 샤드에 대한 처리를 최대 1일 동안 차단할 수 있음을 의미합니다. 이를 방지하려면 함수의 이벤트 소스 매핑을 사용자의 사례에 적합한 최대 레코드 사용 기간 및 합당한 재시도 횟수로 구성합니다.

폐기된 배치의 레코드를 보존하려면 실패한 이벤트 대상을 구성합니다. Lambda는 배치의 세부 정보와 함께 문서를 대상 대기열 또는 주제에 보냅니다.

실패한 이벤트 레코드의 대상을 구성하려면
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.

  4. 소스에서 Stream invocation(스트림 호출)을 선택합니다.

  5. Stream(스트림)에서 함수에 매핑되는 스트림을 선택합니다.

  6. Destination type(대상 유형)에서 호출 레코드를 수신하는 리소스 유형을 선택합니다.

  7. Destination(대상)에서 리소스를 선택합니다.

  8. 저장을 선택합니다.

다음 예제에서는 DynamoDB 스트림의 호출 레코드를 보여 줍니다.

예 호출 레코드
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

이 정보를 사용하여 문제 해결을 위해 스트림에서 영향을 받은 레코드를 검색할 수 있습니다. 실제 레코드는 포함되지 않으므로 이 레코드를 처리하고 실제 레코드가 만료되고 없어지기 전에 스트림에서 해당 레코드를 검색해야 합니다.

아마존 CloudWatch 메트릭스

Lambda는 해당 함수가 한 배치(batch)의 레코드 처리를 완료하면 IteratorAge 지표를 생성합니다. 이 측정치는 처리가 완료되었을 때 배치의 마지막 레코드가 얼마나 오래되었는지를 나타냅니다. 함수가 새 이벤트를 처리하는 경우 반복기 수명을 사용하여 레코드가 추가된 후 함수에서 레코드를 처리할 때까지의 지연 시간을 추정할 수 있습니다.

반복기 수명이 증가하는 추세이면 함수에 문제가 있음을 나타낼 수 있습니다. 자세한 내용은 Lambda 함수 지표 작업 섹션을 참조하세요.

시간 범위

Lambda 함수는 연속 스트림 처리 애플리케이션을 실행할 수 있습니다. 스트림은 애플리케이션을 통해 연속적으로 흐르는 무한 데이터를 나타냅니다. 이 지속적으로 업데이트되는 입력의 정보를 분석하기 위해 시간의 항으로 정의된 윈도우를 사용하여 포함된 레코드를 바인딩할 수 있습니다.

텀블링 기간은 일정한 간격으로 시작되고 끝나는 고유한 시간대입니다. 기본적으로 Lambda 호출은 상태 비저장입니다. 즉, 외부 데이터베이스 없이는 여러 연속 호출에서 데이터를 처리하는 데 사용할 수 없습니다. 그러나 텀블링 기간을 활성화하면 호출 간에 상태를 유지할 수 있습니다. 이 상태에는 현재 윈도우에 대해 이전에 처리된 메시지의 집계 결과가 포함됩니다. 상태는 샤드당 최대 1MB가 될 수 있습니다. 이 크기를 초과하면 Lambda가 윈도우를 조기 종료합니다.

스트림의 각 레코드는 특정 윈도우에 속합니다. Lambda는 각 레코드를 한 번 이상 처리하지만 각 레코드가 한 번만 처리된다고 보장하지는 않습니다. 드물게 오류 처리와 같이 일부 레코드가 두 번 이상 처리될 수 있습니다. 레코드는 항상 처음부터 순서대로 처리됩니다. 레코드가 두 번 이상 처리되는 경우 레코드가 비순차적으로 처리될 수 있습니다.

집계 및 처리

집계 및 해당 집계의 최종 결과 처리를 위해 사용자 관리형 함수가 호출됩니다. Lambda는 윈도우 내에서 수신한 모든 레코드를 집계합니다. 이러한 레코드를 각각 별도의 호출인 여러 배치에서 받을 수 있습니다. 각 호출은 상태를 수신합니다. 따라서 텀블링 기간을 사용할 때 Lambda 함수 응답에는 state 속성을 포함해야 합니다. 응답에 state 속성이 포함되지 않은 경우 Lambda는 이 호출이 실패한 것으로 간주합니다. 이 조건을 충족하기 위해 함수는 다음 JSON 모양의 TimeWindowEventResponse 객체를 반환할 수 있습니다.

TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
참고

Java 함수의 경우 Map<String, String>을 사용하여 상태를 나타내는 것이 좋습니다.

윈도우 끝에서 isFinalInvokeForWindow 플래그는 이것이 최종 상태이며 처리 준비가 되었음을 나타내기 위해 true로 설정됩니다. 처리 후 윈도우가 완료되고 최종 호출이 완료된 다음 상태가 삭제됩니다.

윈도우 끝에서 Lambda가 집계 결과에 대한 작업을 위해 최종 처리를 사용합니다. 최종 처리는 동기식으로 호출됩니다. 성공적인 호출 후 함수가 시퀀스 번호를 검사하고 스트림 처리가 계속됩니다. 호출이 실패하면 Lambda 함수는 호출이 성공할 때까지 추가 처리를 일시 중단합니다.

예 DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

구성

이벤트 소스 매핑을 생성하거나 업데이트할 때 텀블링 윈도우를 구성할 수 있습니다. 텀블링 윈도우를 구성하려면 윈도우를 초 단위로 지정합니다. 다음 예제 AWS Command Line Interface(AWS CLI) 명령은 텀블링 윈도우가 120초인 스트리밍 이벤트 소스 매핑을 생성합니다. 집계 및 처리를 위해 정의된 Lambda 함수는 tumbling-window-example-function입니다.

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda는 레코드가 스트림에 삽입된 시간을 기준으로 텀블링 윈도우 경계를 결정합니다. 모든 레코드에는 Lambda가 경계 결정에서 사용하는 대략적인 타임스탬프가 있습니다.

텀블링 윈도우 집계는 리샤딩을 지원하지 않습니다. 샤드가 끝나면 Lambda는 윈도우가 닫힌 것으로 간주하며 자식 샤드는 새 상태로 고유한 윈도우를 시작합니다.

텀블링 윈도우는 기존 재시도 정책 maxRetryAttemptsmaxRecordAge를 완벽하게 지원합니다.

예 Handler.py - 집계 및 처리

다음 Python 함수는 최종 상태를 집계한 다음 처리하는 방법을 보여줍니다.

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

배치 항목 실패 보고

이벤트 소스에서 스트리밍 데이터를 사용하고 처리할 때 기본적으로 Lambda는 배치가 완전히 성공한 경우에만 배치의 가장 높은 시퀀스 번호로 체크포인트를 수행합니다. Lambda는 다른 모든 결과를 완전한 실패로 처리하고 재시도 제한까지 배치 처리를 재시도합니다. 스트림에서 배치를 처리하는 동안 부분적인 성공을 허용하려면 ReportBatchItemFailures를 설정합니다. 부분적인 성공을 허용하면 레코드에 대한 재시도 횟수를 줄이는 데 도움이 되지만 성공한 레코드의 재시도 가능성을 완전히 막지는 못합니다.

ReportBatchItemFailures를 활성화하려면 ReportBatchItemFailures 목록에 열거형 값 FunctionResponseTypes를 포함시킵니다. 이 목록은 함수에 대해 활성화된 응답 유형을 나타냅니다. 이벤트 소스 매핑을 생성하거나 업데이트할 때 이 목록을 구성할 수 있습니다.

보고서 구문

배치 항목 실패에 대한 보고를 구성할 때 StreamsEventResponse 클래스는 배치 항목 실패 목록과 함께 반환됩니다. StreamsEventResponse 객체를 사용하여 배치에서 첫 번째 실패한 레코드의 시퀀스 번호를 반환할 수 있습니다. 올바른 응답 구문을 사용하여 고유한 사용자 지정 클래스를 생성할 수도 있습니다. 다음 JSON 구조는 필요한 응답 구문을 보여줍니다.

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
참고

batchItemFailures 어레이에 여러 항목이 포함되어 있으면 Lambda는 시퀀스 번호가 가장 낮은 레코드를 체크포인트로 사용합니다. 그런 다음 Lambda는 해당 체크포인트에서 시작하여 모든 레코드를 다시 시도합니다.

성공 및 실패 조건

Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.

  • 비어 있는 batchItemFailure 목록

  • null batchItemFailure 목록

  • 비어 있는 EventResponse

  • null EventResponse

Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.

  • 빈 문자열 itemIdentifier

  • null itemIdentifier

  • 키 이름이 잘못된 itemIdentifier

Lambda는 재시도 전략에 따라 실패를 재시도합니다.

배치 이등분

호출이 실패하고 BisectBatchOnFunctionError가 활성화되어 있으면 ReportBatchItemFailures 설정에 관계 없이 배치가 이등분됩니다.

부분적 배치 성공 응답이 수신되고 BisectBatchOnFunctionErrorReportBatchItemFailures가 모두 활성화되면 배치가 반환된 시퀀스 번호에서 이등분되고 Lambda는 나머지 레코드만 재시도합니다.

Java
예 Handler.java — 새 항목 반환 StreamsEventResponse ()
import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
Python
예 Handler.py — 반품 batchItemFailures []
def handler(event, context): records = event.get("Records") curRecordSequenceNumber = ""; for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

Amazon DynamoDB Streams 구성 파라미터

모든 Lambda 이벤트 소스 유형은 CreateEventSourceMapping동일한 UpdateEventSourceMappingAPI 작업을 공유합니다. 그러나 일부 파라미터만 DynamoDB Streams에 적용됩니다.

DynamoDB Streams 에 적용되는 이벤트 소스 파라미터
파라미터 필수 기본값 참고

BatchSize

N

100

최대값: 10,000

BisectBatchOnFunctionError

N

false

DestinationConfig

N

폐기된 레코드의 표준 Amazon SQS 대기열 또는 표준 Amazon SNS 주제 대상

활성

N

true

EventSourceArn

Y

데이터 스트림 또는 스트림 소비자의 ARN

FilterCriteria

N

FunctionName

Y

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1은 무한을 의미합니다. 실패한 레코드는 레코드가 만료될 때까지 재시도됩니다. DynamoDB 스트림의 데이터 보존 한도는 24시간입니다.

최솟값: -1

최대값: 604,800

MaximumRetryAttempts

N

-1

-1(무한): 레코드가 만료될 때까지 실패한 레코드가 다시 시도됩니다

최솟값: 0

최대값: 10,000

ParallelizationFactor

N

1

최댓값: 10

StartingPosition

Y

TRIM_HIZON 또는 LATEST

TumblingWindowInSeconds

N

최솟값: 0

최댓값: 900