Using AWS Lambda with Amazon DynamoDB - AWS Lambda

Using AWS Lambda with Amazon DynamoDB

AWS Lambda 함수를 사용하여 Amazon DynamoDB 데이터 스트림의 레코드를 처리할 수 있습니다. DynamoDB 스트림를 사용하여 DynamoDB가 업데이트될 때마다 추가 작업을 수행하는 Lambda 함수를 트리거할 수 있습니다.

Lambda는 스트림에서 레코드를 읽고 스트림 레코드를 포함하는 이벤트와 동기적으로 함수를 호출합니다. Lambda는 레코드를 배치(batch)로 읽고 함수를 호출하여 배치의 레코드를 처리합니다.

예 DynamoDB 스트림 레코드 이벤트

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda는 초당 4회의 기본 속도로 레코드에 대해 DynamoDB 스트림의 샤드를 폴링합니다. 레코드를 사용할 수 있으면 Lambda가 함수를 호출하고 결과를 기다립니다. 처리가 성공하면 Lambda가 레코드를 더 받을 때까지 폴링을 재개합니다.

기본적으로, Lambda는 스트림에서 레코드가 사용 가능한 즉시 함수를 호출합니다. Lambda가 스트림에서 읽는 배치에 하나의 레코드만 있는 경우, Lambda는 함수에 하나의 레코드만 전송합니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 배치 기간을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 전체 배치를 수집하거나 배치 기간이 만료될 때까지 스트림에서 레코드를 계속 읽습니다.

함수가 오류를 반환하면 Lambda는 처리가 성공할 때까지 또는 데이터가 만료될 때까지 배치(batch)를 재시도합니다. 샤드 중지를 방지하기 위해 보다 작은 배치 크기로 재시도하거나, 재시도 횟수를 제한하거나, 너무 오래된 레코드를 폐기하도록 이벤트 소스 매핑을 구성할 수 있습니다. 폐기된 이벤트를 유지하기 위해 실패한 배치에 대한 세부 정보를 SQS 대기열 또는 SNS 주제로 보내도록 이벤트 소스 매핑을 구성할 수 있습니다.

또한 각 샤드의 여러 배치를 병렬로 처리하여 동시성을 증가할 수 있습니다. Lambda에서는 동시에 각 샤드에서 최대 10개의 배치를 처리할 수 있습니다. 샤드당 동시 배치 수를 증가하는 경우 Lambda에서는 파티션-키 수준에서의 유효한 처리를 계속 보장합니다.

실행 역할 권한

Lambda는 DynamoDB 스트림과 관련된 리소스를 관리하기 위해 다음 권한이 필요합니다. 이러한 권한을 함수의 실행 역할에 추가합니다.

AWSLambdaDynamoDBExecutionRole 관리형 정책에는 이러한 권한이 포함되어 있습니다. 자세한 내용은 AWS Lambda 실행 역할 단원을 참조하십시오.

실패한 배치의 레코드를 대기열 또는 주제로 보내려면 함수에 추가 권한이 필요합니다. 다음과 같이 각 대상 서비스에는 서로 다른 권한이 필요합니다.

이벤트 소스로서 스트림 구성

이벤트 소스 매핑을 생성하여 Lambda가 스트림의 레코드를 Lambda 함수로 전송하도록 지시합니다. 여러 이벤트 소스 매핑을 생성하여 여러 Lambda 함수로 동일한 데이터를 처리하거나, 단일 함수로 여러 스트림의 항목을 처리할 수 있습니다.

함수가 Lambda 콘솔의 DynamoDB 스트림에서 읽도록 구성하려면 DynamoDB 트리거를 생성합니다.

트리거를 생성하려면

  1. Lambda 콘솔의 함수 페이지(Functions page)를 엽니다.

  2. 함수를 선택합니다.

  3. Designer에서 트리거 추가를 선택합니다.

  4. 트리거 유형을 선택합니다.

  5. 필요한 옵션을 구성하고 추가를 선택합니다.

Lambda은 DynamoDB 이벤트 소스에 대해 다음과 같은 옵션을 지원합니다.

이벤트 소스 옵션

  • DynamoDB 테이블 – 다음에서 레코드를 읽기 위한 DynamoDB 테이블입니다.

  • 배치 크기 – 각 배치에서 함수로 전송되는 레코드 수(최대 1,000개)입니다. Lambda은 한 번의 호출로 배치의 모든 레코드를 함수에 전달합니다. 단, 이벤트의 총 크기가 동기식 호출(6 MB)에 대한 페이로드 한도를 초과하지 않아야 합니다.

  • 배치 기간 – 함수를 호출하기 전에 레코드를 수집할 최대 기간(단위: 초)를 지정합니다.

  • 시작 위치 – 새 레코드 또는 기존의 모든 레코드만 처리합니다.

    • 최신 – 스트림에 추가된 새 레코드를 처리합니다.

    • 수평 트리밍 – 스트림의 모든 레코드를 처리합니다.

    기존 레코드 처리 후 함수는 캐치업되고 새 레코드를 계속 처리합니다.

  • On-failure destination(실패 시 대상) – 처리할 수 없는 레코드에 대한 SQS 대기열 또는 SNS 주제입니다. 레코드 배치가 너무 오래되었거나 모든 재시도를 다 사용했기 때문에 Lambda에서 해당 배치를 폐기하는 경우 해당 배치에 대한 세부 정보를 대기열 또는 주제로 보냅니다.

  • Retry attempts(재시도) – 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 이는 배치가 함수에 도달하지 않은 제한 또는 서비스 오류에 적용되지 않습니다.

  • Maximum age of record(최대 레코드 사용 기간) – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다.

  • Split batch on error(오류 시 배치 분할) – 함수에서 오류를 반환하면 재시도하기 전에 배치를 두 개로 분할합니다.

  • Concurrent batches per shard(샤드당 동시 배치) – 동일한 샤드의 여러 배치를 동시에 처리합니다.

  • 활성화 – 이벤트 소스 매핑을 활성화하려면 true로 설정합니다. 레코드 처리를 중지하려면 false로 설정합니다. Lambda는 마지막으로 처리된 레코드를 추적하여 매핑이 다시 활성화되면 해당 지점부터 처리를 다시 시작합니다.

참고

DynamoDB 트리거의 일부로 Lambda에서 호출한 GetRecords API 호출에 대해서는 요금이 부과되지 않습니다.

나중에 이벤트 소스 구성을 관리하기 위해 디자이너에서 트리거를 선택합니다.

이벤트 소스 매핑 API

AWS CLI 또는 AWS SDK로 이벤트 소스 매핑을 관리하려면 다음 API 작업을 사용하십시오.

다음 예제는 AWS CLI를 사용하여 Amazon 리소스 이름(ARN)으로 지정된 DynamoDB 스트림에 my-function이라는 함수를 매핑합니다. 배치 크기는 500입니다.

$ aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525 { "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

배치가 처리되는 방법을 사용자 지정하고 처리할 수 없는 레코드를 폐기할 때를 지정하는 추가 옵션을 구성합니다. 다음 예제는 두 번의 재시도 후 또는 레코드의 시간이 1시간을 넘는 경우 실패 레코드를 SQS 대기열로 보내는 이벤트 소스 매핑을 업데이트합니다.

$ aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}' { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

업데이트된 설정은 비동기식으로 적용되며 프로세스가 완료된 후에야 출력에 반영됩니다. get-event-source-mapping 명령을 사용하여 현재 상태를 봅니다.

$ aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

여러 배치를 동시에 처리하려면 --parallelization-factor 옵션을 사용합니다.

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

오류 처리

DynamoDB 스트림에서 레코드를 읽는 이벤트 소스 매핑은 동기식으로 함수를 호출하고 오류 시 재시도합니다. 함수가 제한되거나 Lambda 서비스에서 함수 호출 없이 오류를 반환하는 경우 레코드가 만료되거나 이벤트 소스 매핑에 구성된 최대 사용 기간을 초과할 때까지 Lambda에서 재시도합니다.

함수가 오류를 반환하는 레코드를 받으면 Lambda에서 배치 레코드가 만료되거나, 최대 기간을 초과하거나, 구성된 재시도 할당량에 도달할 때까지 재시도합니다. 함수 오류의 경우에도 실패한 배치를 두 배치로 분할하도록 이벤트 소스 매핑을 구성할 수 있습니다. 보다 작은 배치로 재시도하면 잘못된 레코드가 격리되고 시간 초과 문제가 해결됩니다. 배치를 분할하는 작업은 재시도 할당량에 포함되지 않습니다.

오류 처리에서 실패를 측정하는 경우 Lambda는 레코드를 폐기하고 스트림에서 배치 처리를 계속합니다. 기본 설정을 사용하는 경우 이는 잘못된 레코드가 영향을 받은 샤드에 대한 처리를 최대 one day 동안 차단할 수 있음을 의미합니다. 이를 방지하려면 함수의 이벤트 소스 매핑을 사용자의 사례에 적합한 최대 레코드 사용 기간 및 합당한 재시도 횟수로 구성합니다.

폐기된 배치의 레코드를 유지하려면 실패한 이벤트 대상을 구성합니다. Lambda는 배치의 세부 정보와 함께 문서를 대상 대기열 또는 주제에 보냅니다.

실패한 이벤트 레코드의 대상을 구성하려면

  1. Lambda 콘솔의 함수 페이지(Functions page)를 엽니다.

  2. 함수를 선택합니다.

  3. 디자이너에서 대상 추가를 선택합니다.

  4. 소스에서 Stream invocation(스트림 호출)을 선택합니다.

  5. Stream(스트림)에서 함수에 매핑되는 스트림을 선택합니다.

  6. Destination type(대상 유형)에서 호출 레코드를 수신하는 리소스 유형을 선택합니다.

  7. Destination(대상)에서 리소스를 선택합니다.

  8. 저장을 선택합니다.

다음 예제에서는 DynamoDB 스트림의 호출 레코드를 보여 줍니다.

예 호출 레코드

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

이 정보를 사용하여 문제 해결을 위해 스트림에서 영향을 받은 레코드를 검색할 수 있습니다. 실제 레코드는 포함되지 않으므로 이 레코드를 처리하고 실제 레코드가 만료되고 없어지기 전에 스트림에서 해당 레코드를 검색해야 합니다.

Amazon CloudWatch 지표

Lambda는 해당 함수가 한 배치(batch)의 레코드 처리를 완료하면 IteratorAge 측정치를 생성합니다. 이 측정치는 처리가 완료되었을 때 배치의 마지막 레코드가 얼마나 오래되었는지를 나타냅니다. 함수가 새 이벤트를 처리하는 경우 반복기 수명을 사용하여 레코드가 추가된 후 함수에서 레코드를 처리할 때까지의 지연 시간을 추정할 수 있습니다.

반복기 수명이 증가하는 추세이면 함수에 문제가 있음을 나타낼 수 있습니다. 자세한 내용은 AWS Lambda 함수 지표 작업 단원을 참조하십시오.