Amazon Kinesis에서 AWS Lambda 사용 - AWS Lambda

Amazon Kinesis에서 AWS Lambda 사용

AWS Lambda 함수를 사용하여 Amazon Kinesis 데이터 스트림의 레코드를 처리할 수 있습니다.

Kinesis 데이터 스트림은 샤드의 집합입니다. 샤드마다 데이터 레코드 시퀀스가 포함되어 있습니다. 소비자는 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션입니다. Lambda 함수를 공유 처리량 소비자(표준 반복기)에 매핑하거나 향상된 팬 아웃 기능이 있는 전용 처리량 소비자에 매핑할 수 있습니다.

표준 반복기의 경우 Lambda는 HTTP 프로토콜을 사용하여 Kinesis 스트림의 각 샤드를 폴링합니다. 이벤트 소스 매핑은 샤드의 다른 소비자와 읽기 처리량을 공유합니다.

지연 시간을 최소화하고 읽기 처리량을 최대화하기 위해 향상된 팬아웃으로 데이터 스트림 소비자를 생성할 수 있습니다. 스트림 소비자는 각 샤드에 대해 전용 연결을 설정하므로 스트림에서 읽는 다른 애플리케이션에 영향을 주지 않습니다. 전용 처리량은 많은 애플리케이션에서 동일한 데이터를 읽거나, 큰 레코드를 갖는 스트림을 재처리하는 경우에 유용합니다. Kinesis는 HTTP/2를 통해 레코드를 Lambda로 푸시합니다.

Kinesis 데이터 스트림에 대한 자세한 내용은 Amazon Kinesis Data Streams에서 데이터 읽기를 참조하세요.

Lambda는 데이터 스트림에서 레코드를 읽고 스트림 레코드를 포함한 이벤트와 동기적으로 함수를 호출합니다. Lambda는 배치의 레코드를 읽고 함수를 호출하여 배치의 레코드를 처리합니다. 각 배치에는 단일 샤드/데이터 스트림의 레코드가 포함됩니다.

예 Kinesis 레코드 이벤트

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

기본적으로, Lambda는 레코드가 사용 가능하게 되는 즉시 함수를 호출합니다. Lambda가 이벤트 소스에서 읽는 배치에 하나의 레코드만 있는 경우, Lambda는 함수에 하나의 레코드만 전송합니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 일괄 처리 기간을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 전체 배치가 수집되거나, 일괄 처리 기간이 만료되거나, 배치가 페이로드 한도인 6MB에 도달할 때까지 이벤트 소스에서 레코드를 계속 읽습니다. 자세한 정보는 일괄 처리 동작을 참조하십시오.

함수가 오류를 반환하면 Lambda는 처리가 성공할 때까지 또는 데이터가 만료될 때까지 배치(batch)를 재시도합니다. 샤드 중지를 방지하기 위해 보다 작은 배치 크기로 재시도하거나, 재시도 횟수를 제한하거나, 너무 오래된 레코드를 폐기하도록 이벤트 소스 매핑을 구성할 수 있습니다. 폐기된 이벤트를 유지하기 위해 실패한 배치에 대한 세부 정보를 SQS 대기열 또는 SNS 주제로 보내도록 이벤트 소스 매핑을 구성할 수 있습니다.

또한 각 샤드의 여러 배치를 병렬로 처리하여 동시성을 높일 수 있습니다. Lambda는 각 샤드에서 최대 10개의 배치를 동시에 처리할 수 있습니다. 샤드당 동시 배치 수를 높이는 경우에도 Lambda는 공유 수준에서의 순차 처리를 계속 보장합니다.

Kinesis 또는 DynamoDB 데이터 스트림의 한 샤드와 하나 이상의 Lambda 호출을 동시에 처리하도록 ParallelizationFactor 설정을 구성합니다. Lambda가 병렬화 계수를 통해 샤드에서 폴링하는 동시 배치의 수는 1(기본값)부터 10까지 지정할 수 있습니다. 예를 들어 ParallelizationFactor를 2로 설정하는 경우 최대 100개의 Kinesis 데이터 샤드를 처리하기 위한 200개의 동시 Lambda 호출을 보유할 수 있습니다. 이는 데이터 볼륨이 일시적이고 IteratorAge가 높을 때 처리량을 확장하는 데 도움을 줍니다. Kinesis 집계를 사용하는 경우에는 병렬화 계수가 작동하지 않습니다. 자세한 내용은 Kinesis 및 DynamoDB 이벤트 소스에 대한 새 AWS Lambda 크기 조정 제어를 참조하세요. 또한 전체 자습서는 AWS의 서버리스 데이터 처리워크숍을 참조하십시오.

데이터 스트림과 함수 구성

Lambda 함수는 데이터 스트림의 소비자 애플리케이션입니다. 이 함수는 각 샤드에서 한 번에 한 개의 레코드 배치를 처리합니다. Lambda 함수를 데이터 스트림(표준 반복자)이나 스트림의 소비자(향상된 팬아웃)에게 매핑할 수 있습니다.

표준 반복자의 경우 Lambda는 초당 1회의 속도로 Kinesis 스트림의 각 샤드에서 레코드를 폴링합니다. 더 많은 레코드를 사용할 수 있는 경우 Lambda는 함수가 스트림을 따라잡을 때까지 배치 처리를 유지합니다. 이벤트 소스 매핑은 샤드의 다른 소비자와 읽기 처리량을 공유합니다.

지연 시간을 최소화하고 읽기 처리량을 최대화하려면 향상된 팬아웃으로 데이터 스트림 소비자를 생성하세요. 향상된 팬아웃 소비자는 각 샤드에 대해 전용 연결을 설정하므로 스트림에서 읽는 다른 애플리케이션에 영향을 주지 않습니다. 스트림 소비자는 HTTP/2를 사용하여 수명이 긴 연결을 통해 레코드를 Lambda에 푸시하고 요청 헤더를 압축함으로써 지연 시간을 최소화합니다. Kinesis RegisterStreamConsumer API를 사용하여 스트림 소비자를 생성할 수 있습니다.

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

다음 결과가 표시됩니다:

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

함수가 레코드를 처리하는 속도를 높이려면 데이터 스트림에 샤드를 추가합니다. Lambda는 각 샤드의 레코드를 순서대로 처리합니다. 함수가 오류를 반환하면 샤드는 추가 레코드 처리를 중지합니다. 샤드가 많을수록 한 번에 더 많은 배치가 처리되므로 동시 실행에 대한 오류의 영향이 줄어듭니다.

함수가 총 동시 배치 수를 처리하기 위해 확장할 수 없는 경우 함수에 대한 할당량 증가를 요청하거나 동시성을 예약합니다.

실행 역할 권한

Lambda는 Kinesis 데이터 스트림과 관련된 리소스를 관리하기 위해 다음 권한이 필요합니다. 이러한 권한을 함수의 실행 역할에 추가합니다.

AWSLambdaKinesisExecutionRole 관리형 정책에는 이러한 권한이 포함되어 있습니다. 자세한 정보는 Lambda 실행 역할을 참조하십시오.

실패한 배치의 레코드를 SQS 대기열 또는 SNS 주제로 보내려면 함수에 추가 권한이 필요합니다. 다음과 같이 각 대상 서비스에는 서로 다른 권한이 필요합니다.

이벤트 소스로서 스트림 구성

이벤트 소스 매핑을 생성하여 Lambda가 데이터 스트림의 레코드를 Lambda 함수로 전송하도록 지시합니다. 여러 이벤트 소스 매핑을 생성하여 여러 Lambda 함수로 동일한 데이터를 처리하거나, 단일 함수로 여러 데이터 스트림의 항목을 처리할 수 있습니다. 여러 데이터 스트림에서 항목을 처리할 때 각 배치에는 단일 샤드/스트림의 레코드만 포함됩니다.

Lambda 콘솔의 Kinesis에서 읽도록 함수를 구성하려면 Kinesis 트리거를 생성합니다.

트리거를 생성하려면

  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수의 이름을 선택합니다.

  3. 함수 개요(Function overview)에서 트리거 추가(Add trigger)를 선택합니다.

  4. 트리거 유형을 선택합니다.

  5. 필요한 옵션을 구성한 다음 추가를 선택합니다.

Lambda는 Kinesis 이벤트 소스에 대해 다음과 같은 옵션을 지원합니다.

이벤트 소스 옵션

  • Kinesis 스트림 – 레코드를 읽을 Kinesis 스트림.

  • 소비자(선택 사항) - 전용 연결을 통해 스트림에서 읽으려면 스트림 소비자를 사용합니다.

  • 배치 크기(Batch size) - 각 배치에서 함수에 보낼 레코드 수입니다(최대 10,000개). Lambda는 한 번의 호출로 배치의 모든 레코드를 함수에 전달합니다. 단, 이벤트의 총 크기가 동기식 호출에 대한 페이로드 한도(6MB)를 초과하지 않아야 합니다.

  • 배치 기간(Batch window) - 함수를 호출하기 전에 레코드를 수집할 최대 기간(단위: 초)를 지정합니다.

  • 시작 위치 - 새 레코드, 기존의 모든 레코드 또는 특정 날짜 이후에 생성된 레코드만 처리합니다.

    • 최신 – 스트림에 추가된 새 레코드를 처리합니다.

    • 수평 트리밍 – 스트림의 모든 레코드를 처리합니다.

    • 타임스탬프 – 특정 시간에 시작하는 레코드를 시작합니다.

    기존 레코드 처리 후 함수는 캐치업되고 새 레코드를 계속 처리합니다.

  • On-failure destination(실패 시 대상) – 처리할 수 없는 레코드에 대한 SQS 대기열 또는 SNS 주제입니다. 너무 오래되었거나 모든 재시도를 다 사용한 레코드 배치를 폐기할 때, Lambda는 해당 배치에 대한 세부 정보를 대기열 또는 주제로 보냅니다.

  • Retry attempts(재시도) - 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 이는 배치가 함수에 도달하지 않은 제한 또는 서비스 오류에 적용되지 않습니다.

  • Maximum age of record(최대 레코드 사용 기간) – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다.

  • Split batch on error(오류 시 배치 분할) – 함수에서 오류를 반환하면 재시도하기 전에 배치를 두 개로 분할합니다. 원래 배치 크기 설정은 변경되지 않습니다.

  • Concurrent batches per shard(샤드당 동시 배치) – 동일한 샤드의 여러 배치를 동시에 처리합니다.

  • 활성화 – 이벤트 소스 매핑을 활성화하려면 true로 설정합니다. 레코드 처리를 중지하려면 false로 설정합니다. Lambda는 마지막으로 처리된 레코드를 추적하여 다시 활성화되면 해당 지점부터 처리를 다시 시작합니다.

참고

Kinesis는 각 샤드에 대해 요금을 부과하고 향상된 팬아웃의 경우 스트림에서 읽은 데이터에 대해 요금을 부과합니다. 요금 세부 정보는 Amazon Kinesis 요금을 참조하세요.

나중에 이벤트 소스 구성을 관리하기 위해 디자이너에서 트리거를 선택합니다.

이벤트 소스 매핑 API

AWS Command Line Interface(AWS CLI) 또는 AWS SDK를 사용하여 이벤트 소스를 관리하려면 다음 API 작업을 사용할 수 있습니다.

AWS CLI를 사용하여 이벤트 소스 매핑을 생성하려면 create-event-source-mapping 명령을 사용합니다. 다음 예제는 AWS CLI를 사용하여 my-function이라는 함수를 Kinesis 데이터 스트림에 매핑합니다. 데이터 스트림은 Amazon 리소스 이름(ARN)으로 지정되고, 배치 크기는 500이며 Unix 시간의 타임스탬프부터 시작합니다.

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

다음 결과가 표시됩니다:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

소비자를 사용하려면 스트림의 ARN 대신 소비자의 ARN을 지정합니다.

배치가 처리되는 방법을 사용자 지정하고 처리할 수 없는 레코드를 폐기할 때를 지정하는 추가 옵션을 구성합니다. 다음 예제는 두 번의 재시도 후 또는 레코드의 시간이 1시간을 넘는 경우 실패 레코드를 SQS 대기열로 보내는 이벤트 소스 매핑을 업데이트합니다.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

다음 출력이 나타나야 합니다.

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

업데이트된 설정은 비동기식으로 적용되며 프로세스가 완료된 후에야 출력에 반영됩니다. get-event-source-mapping 명령을 사용하여 현재 상태를 봅니다.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

다음 출력이 나타나야 합니다.

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

여러 배치를 동시에 처리하려면 --parallelization-factor 옵션을 사용합니다.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

오류 처리

Kinesis 스트림에서 레코드를 읽는 이벤트 소스 매핑은 동기식으로 함수를 호출하고 오류 시 재시도합니다. Lambda에서 함수가 제한되거나 함수 호출 없이 오류를 반환하는 경우, 레코드가 만료되거나 이벤트 소스 매핑에서 구성한 최대 사용 기간을 초과할 때까지 Lambda에서 재시도합니다.

함수가 오류를 반환하는 레코드를 받으면 Lambda에서 배치 레코드가 만료되거나, 최대 기간을 초과하거나, 구성된 재시도 할당량에 도달할 때까지 재시도합니다. 함수 오류의 경우에도 실패한 배치를 두 배치로 분할하도록 이벤트 소스 매핑을 구성할 수 있습니다. 보다 작은 배치로 재시도하면 잘못된 레코드가 격리되고 시간 초과 문제가 해결됩니다. 배치를 분할하는 작업은 재시도 할당량 횟수에 포함되지 않습니다.

오류 처리에서 실패를 측정하는 경우 Lambda는 레코드를 폐기하고 스트림에서 배치 처리를 계속합니다. 기본 설정을 사용하는 경우 이는 잘못된 레코드가 영향을 받은 샤드에 대한 처리를 최대 1주 동안 차단할 수 있음을 의미합니다. 이를 방지하려면 함수의 이벤트 소스 매핑을 사용자의 사례에 적합한 최대 레코드 사용 기간 및 합당한 재시도 횟수로 구성합니다.

폐기된 배치의 레코드를 보존하려면 실패한 이벤트 대상을 구성합니다. Lambda는 배치의 세부 정보와 함께 문서를 대상 대기열 또는 주제에 보냅니다.

실패한 이벤트 레코드의 대상을 구성하려면

  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.

  4. 소스에서 Stream invocation(스트림 호출)을 선택합니다.

  5. Stream(스트림)에서 함수에 매핑되는 스트림을 선택합니다.

  6. Destination type(대상 유형)에서 호출 레코드를 수신하는 리소스 유형을 선택합니다.

  7. Destination(대상)에서 리소스를 선택합니다.

  8. 저장(Save)을 선택합니다.

다음 예제에서는 Kinesis 스트림의 호출 레코드를 보여 줍니다.

예 호출 레코드

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

이 정보를 사용하여 문제 해결을 위해 스트림에서 영향을 받은 레코드를 검색할 수 있습니다. 실제 레코드는 포함되지 않으므로 이 레코드를 처리하고 실제 레코드가 만료되고 없어지기 전에 스트림에서 해당 레코드를 검색해야 합니다.

Amazon CloudWatch 측정치

Lambda는 해당 함수가 한 배치(batch)의 레코드 처리를 완료하면 IteratorAge 지표를 생성합니다. 이 측정치는 처리가 완료되었을 때 배치의 마지막 레코드가 얼마나 오래되었는지를 나타냅니다. 함수가 새 이벤트를 처리하는 경우 반복기 수명을 사용하여 레코드가 추가된 후 함수에서 레코드를 처리할 때까지의 지연 시간을 추정할 수 있습니다.

반복기 수명이 증가하는 추세이면 함수에 문제가 있음을 나타낼 수 있습니다. 자세한 정보는 Lambda 함수 지표 작업을 참조하십시오.

시간 범위

Lambda 함수는 연속 스트림 처리 애플리케이션을 실행할 수 있습니다. 스트림은 애플리케이션을 통해 연속적으로 흐르는 무한 데이터를 나타냅니다. 이 지속적으로 업데이트되는 입력의 정보를 분석하기 위해 시간의 항으로 정의된 윈도우를 사용하여 포함된 레코드를 바인딩할 수 있습니다.

텀블링 기간은 일정한 간격으로 시작되고 끝나는 고유한 시간대입니다. 기본적으로 Lambda 호출은 상태 비저장입니다. 즉, 외부 데이터베이스 없이는 여러 연속 호출에서 데이터를 처리하는 데 사용할 수 없습니다. 그러나 텀블링 기간을 활성화하면 호출 간에 상태를 유지할 수 있습니다. 이 상태에는 현재 윈도우에 대해 이전에 처리된 메시지의 집계 결과가 포함됩니다. 상태는 샤드당 최대 1MB가 될 수 있습니다. 이 크기를 초과하면 Lambda가 윈도우를 조기 종료합니다.

스트림의 각 레코드는 특정 윈도우에 속합니다. Lambda는 각 레코드를 한 번 이상 처리하지만 각 레코드가 한 번만 처리된다고 보장하지는 않습니다. 드물게 오류 처리와 같이 일부 레코드가 두 번 이상 처리될 수 있습니다. 레코드는 항상 처음부터 순서대로 처리됩니다. 레코드가 두 번 이상 처리되는 경우 레코드가 비순차적으로 처리될 수 있습니다.

집계 및 처리

집계 및 해당 집계의 최종 결과 처리를 위해 사용자 관리형 함수가 호출됩니다. Lambda는 윈도우 내에서 수신한 모든 레코드를 집계합니다. 이러한 레코드를 각각 별도의 호출인 여러 배치에서 받을 수 있습니다. 각 호출은 상태를 수신합니다. 따라서 텀블링 기간을 사용할 때 Lambda 함수 응답에는 state 속성을 포함해야 합니다. 응답에 state 속성이 포함되지 않은 경우 Lambda는 이 호출이 실패한 것으로 간주합니다. 이 조건을 충족하기 위해 함수는 다음 JSON 모양의 TimeWindowEventResponse 객체를 반환할 수 있습니다.

TimeWindowEventResponse

{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
참고

Java 함수의 경우 Map<String, String>을 사용하여 상태를 나타내는 것이 좋습니다.

윈도우 끝에서 isFinalInvokeForWindow 플래그는 이것이 최종 상태이며 처리 준비가 되었음을 나타내기 위해 true로 설정됩니다. 처리 후 윈도우가 완료되고 최종 호출이 완료된 다음 상태가 삭제됩니다.

윈도우 끝에서 Lambda가 집계 결과에 대한 작업을 위해 최종 처리를 사용합니다. 최종 처리는 동기식으로 호출됩니다. 성공적인 호출 후 함수가 시퀀스 번호를 검사하고 스트림 처리가 계속됩니다. 호출이 실패하면 Lambda 함수는 호출이 성공할 때까지 추가 처리를 일시 중단합니다.

예 KinesisTimeWindowEvent

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

구성

이벤트 소스 매핑을 생성하거나 업데이트할 때 텀블링 윈도우를 구성할 수 있습니다. 텀블링 윈도우를 구성하려면 윈도우를 초 단위로 지정합니다. 다음 예제 AWS Command Line Interface(AWS CLI) 명령은 텀블링 윈도우가 120초인 스트리밍 이벤트 소스 매핑을 생성합니다. 집계 및 처리를 위해 정의된 Lambda 함수는 tumbling-window-example-function입니다.

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda는 레코드가 스트림에 삽입된 시간을 기준으로 텀블링 윈도우 경계를 결정합니다. 모든 레코드에는 Lambda가 경계 결정에서 사용하는 대략적인 타임스탬프가 있습니다.

텀블링 윈도우 집계는 리샤딩을 지원하지 않습니다. 샤드가 끝나면 Lambda는 윈도우가 닫힌 것으로 간주하며 자식 샤드는 새 상태로 고유한 윈도우를 시작합니다.

텀블링 윈도우는 기존 재시도 정책 maxRetryAttemptsmaxRecordAge를 완벽하게 지원합니다.

예 Handler.py - 집계 및 처리

다음 Python 함수는 최종 상태를 집계한 다음 처리하는 방법을 보여줍니다.

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

배치 항목 실패 보고

이벤트 소스에서 스트리밍 데이터를 사용하고 처리할 때 기본적으로 Lambda는 배치가 완전히 성공한 경우에만 배치의 가장 높은 시퀀스 번호로 체크포인트를 수행합니다. Lambda는 다른 모든 결과를 완전한 실패로 처리하고 재시도 제한까지 배치 처리를 재시도합니다. 스트림에서 배치를 처리하는 동안 부분적인 성공을 허용하려면 ReportBatchItemFailures를 설정합니다. 부분적인 성공을 허용하면 레코드에 대한 재시도 횟수를 줄이는 데 도움이 되지만 성공한 레코드의 재시도 가능성을 완전히 막지는 못합니다.

ReportBatchItemFailures를 활성화하려면 ReportBatchItemFailures 목록에 열거형 값 FunctionResponseTypes를 포함시킵니다. 이 목록은 함수에 대해 활성화된 응답 유형을 나타냅니다. 이벤트 소스 매핑을 생성하거나 업데이트할 때 이 목록을 구성할 수 있습니다.

보고서 구문

배치 항목 실패에 대한 보고를 구성할 때 StreamsEventResponse 클래스는 배치 항목 실패 목록과 함께 반환됩니다. StreamsEventResponse 객체를 사용하여 배치에서 첫 번째 실패한 레코드의 시퀀스 번호를 반환할 수 있습니다. 올바른 응답 구문을 사용하여 고유한 사용자 지정 클래스를 생성할 수도 있습니다. 다음 JSON 구조는 필요한 응답 구문을 보여줍니다.

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }
참고

batchItemFailures 어레이에 여러 항목이 포함되어 있으면 Lambda는 시퀀스 번호가 가장 낮은 레코드를 체크포인트로 사용합니다. 그런 다음 Lambda는 해당 체크포인트에서 시작하여 모든 레코드를 다시 시도합니다.

성공 및 실패 조건

Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.

  • 비어 있는 batchItemFailure 목록

  • null batchItemFailure 목록

  • 비어 있는 EventResponse

  • null EventResponse

Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.

  • 빈 문자열 itemIdentifier

  • null itemIdentifier

  • 키 이름이 잘못된 itemIdentifier

Lambda는 재시도 전략에 따라 실패를 재시도합니다.

배치 이등분

호출이 실패하고 BisectBatchOnFunctionError가 활성화되어 있으면 ReportBatchItemFailures 설정에 관계 없이 배치가 이등분됩니다.

부분적 배치 성공 응답이 수신되고 BisectBatchOnFunctionErrorReportBatchItemFailures가 모두 활성화되면 배치가 반환된 시퀀스 번호에서 이등분되고 Lambda는 나머지 레코드만 재시도합니다.

Java

예 Handler.java - 새 StreamsEventResponse() 반환

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
Python

예 Handler.py - batchItemFailures[] 반환

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

Amazon Kinesis 구성 파라미터

모든 Lambda 이벤트 소스 유형은 동일한 CreateEventSourceMappingUpdateEventSourceMapping API 작업을 공유합니다. 그러나 일부 파라미터만 Kinesis에 적용됩니다.

Kinesis에 적용되는 이벤트 소스 파라미터
파라미터 필수 기본값 참고

BatchSize

N

100

최대값: 10,000

BisectBatchOnFunctionError

N

false

DestinationConfig

N

폐기된 레코드의 Amazon SQS 대기열 또는 Amazon SNS 주제 대상

활성

N

true

EventSourceArn

Y

데이터 스트림 또는 스트림 소비자의 ARN

FunctionName

Y

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1은 무한을 의미: Lambda는 레코드를 버리지 않습니다

최솟값: -1

최대값: 604,800

MaximumRetryAttempts

N

-1

-1(무한): 레코드가 만료될 때까지 실패한 레코드가 다시 시도됩니다

최솟값: -1

최대값: 10,000

ParallelizationFactor

N

1

최댓값: 10

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, 또는 LATEST

StartingPositionTimestamp

N

StartingPosition이 AT_TIMESTAMP로 설정된 경우에만 유효합니다. 읽기를 시작하는 시간(Unix 시간 초)

TumblingWindowInSeconds

N

최솟값: 0

최댓값: 900