AWS Lambda를 Amazon Kinesis와 함께 사용 - AWS Lambda

AWS Lambda를 Amazon Kinesis와 함께 사용

AWS Lambda 함수를 사용하여 Amazon Kinesis 데이터 스트림의 레코드를 처리할 수 있습니다.

Kinesis 데이터 스트림은 일련의 샤드입니다. 샤드마다 데이터 레코드 시퀀스가 포함되어 있습니다. 소비자는 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션입니다. Lambda 함수를 공유 처리량 소비자(표준 반복기)에 매핑하거나 향상된 팬 아웃 기능이 있는 전용 처리량 소비자에 매핑할 수 있습니다.

표준 반복기의 경우 Lambda는 HTTP 프로토콜을 사용하여 Kinesis 스트림의 각 샤드를 폴링합니다. 이벤트 소스 매핑은 샤드의 다른 소비자와 읽기 처리량을 공유합니다.

지연 시간을 최소화하고 읽기 처리량을 최대화하기 위해 향상된 팬아웃으로 데이터 스트림 소비자를 생성할 수 있습니다. 스트림 소비자는 각 샤드에 대해 전용 연결을 설정하므로 스트림에서 읽는 다른 애플리케이션에 영향을 주지 않습니다. 전용 처리량은 많은 애플리케이션에서 동일한 데이터를 읽거나, 큰 레코드를 갖는 스트림을 재처리하는 경우에 유용합니다. Kinesis는 HTTP/2를 통해 Lambda에 레코드를 푸시합니다.

Kinesis 데이터 스트림에 대한 자세한 내용은 Amazon Kinesis Data Streams에서 데이터 읽기를 참조하세요.

참고

HTTP/2 스트림 소비자는 오류 처리를 사용할 수 없습니다.

Lambda는 데이터 스트림에서 레코드를 읽고 스트림 레코드를 포함하는 이벤트와 동기적으로 함수를 호출합니다. Lambda는 레코드를 배치(batch)로 읽고 함수를 호출하여 배치의 레코드를 처리합니다.

예 Kinesis 레코드 이벤트

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

기본적으로, Lambda는 스트림에서 레코드가 사용 가능한 즉시 함수를 호출합니다. Lambda가 스트림에서 읽는 배치에 하나의 레코드만 있는 경우, Lambda는 함수에 하나의 레코드만 전송합니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 배치 기간을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 전체 배치를 수집하거나 배치 기간이 만료될 때까지 스트림에서 레코드를 계속 읽습니다.

함수가 오류를 반환하면 Lambda는 처리가 성공할 때까지 또는 데이터가 만료될 때까지 배치(batch)를 재시도합니다. 샤드 중지를 방지하기 위해 보다 작은 배치 크기로 재시도하거나, 재시도 횟수를 제한하거나, 너무 오래된 레코드를 폐기하도록 이벤트 소스 매핑을 구성할 수 있습니다. 폐기된 이벤트를 유지하기 위해 실패한 배치에 대한 세부 정보를 SQS 대기열 또는 SNS 주제로 보내도록 이벤트 소스 매핑을 구성할 수 있습니다.

또한 각 샤드의 여러 배치를 병렬로 처리하여 동시성을 증가할 수 있습니다. Lambda에서는 동시에 각 샤드에서 최대 10개의 배치를 처리할 수 있습니다. 샤드당 동시 배치 수를 증가하는 경우 Lambda에서는 파티션-키 수준에서의 유효한 처리를 계속 보장합니다.

데이터 스트림과 함수 구성

Lambda 함수는 데이터 스트림의 소비자 애플리케이션입니다. 이 함수는 각 샤드에서 한 번에 한 개의 레코드 배치를 처리합니다. Lambda 함수를 데이터 스트림나(표준 반복자)이나 스트림의 소비자(향상된 팬아웃)에게 매핑할 수 있습니다.

표준 반복자의 경우 Lambda는 초당 1회의 속도로 Kinesis 스트림의 각 샤드에서 레코드를 폴링합니다. 더 많은 레코드를 사용할 수 있는 경우 Lambda은 함수가 스트림을 따라 잡을 때까지 배치 처리를 유지합니다. 이벤트 소스 매핑은 샤드의 다른 소비자와 읽기 처리량을 공유합니다.

지연 시간을 최소화하고 읽기 처리량을 최대화하려면 향상된 팬아웃으로 데이터 스트림 소비자를 생성하세요. 향상된 팬아웃 소비자는 각 샤드에 대해 전용 연결을 설정하므로 스트림에서 읽는 다른 애플리케이션에 영향을 주지 않습니다. 스트림 소비자는 HTTP/2를 사용하여 수명이 긴 연결을 통해 레코드를 Lambda에 푸시하고 요청 헤더를 압축함으로써 지연 시간을 최소화합니다. Kinesis RegisterStreamConsumer API를 사용하여 스트림 소비자를 생성할 수 있습니다.

$ aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

함수가 레코드를 처리하는 속도를 높이려면 데이터 스트림에 샤드를 추가합니다. Lambda는 각 샤드의 레코드를 순서대로 처리합니다. 함수가 오류를 반환하면 샤드는 추가 레코드 처리를 중지합니다. 샤드가 많을수록 한 번에 더 많은 배치가 처리되므로 동시 실행에 대한 오류의 영향이 줄어듭니다.

함수가 총 동시 배치 수를 처리하기 위해 확장할 수 없는 경우 함수에 대한 할당량 증가를 요청하거나 동시성을 예약합니다.

실행 역할 권한

Lambda는 Kinesis 데이터 스트림과 관련된 리소스를 관리하기 위해 다음 권한이 필요합니다. 이러한 권한을 함수의 실행 역할에 추가합니다.

AWSLambdaKinesisExecutionRole 관리형 정책에는 이러한 권한이 포함되어 있습니다. 자세한 내용은 AWS Lambda 실행 역할 단원을 참조하십시오.

실패한 배치의 레코드를 대기열 또는 주제로 보내려면 함수에 추가 권한이 필요합니다. 다음과 같이 각 대상 서비스에는 서로 다른 권한이 필요합니다.

이벤트 소스로서 스트림 구성

이벤트 소스 매핑을 생성하여 Lambda가 데이터 스트림의 레코드를 Lambda 함수로 전송하도록 지시합니다. 여러 이벤트 소스 매핑을 생성하여 여러 Lambda 함수로 동일한 데이터를 처리하거나, 단일 함수로 여러 데이터 스트림의 항목을 처리할 수 있습니다.

함수가 Lambda 콘솔의 Kinesis에서 읽도록 구성하려면 Kinesis 트리거를 생성합니다.

트리거를 생성하려면

  1. Lambda 콘솔 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. Designer에서 트리거 추가를 선택합니다.

  4. 트리거 유형을 선택합니다.

  5. 필요한 옵션을 구성하고 추가를 선택합니다.

Lambda는 Kinesis 이벤트 소스에 대해 다음과 같은 옵션을 지원합니다.

이벤트 소스 옵션

  • Kinesis 스트림 – 레코드를 읽을 Kinesis 스트림.

  • 소비자(선택 사항) – 전용 연결을 통해 스트림에서 읽으려면 스트림 소비자를 사용합니다.

  • 배치 크기 – 각 배치에서 함수로 전송되는 레코드 수(최대 10,000개)입니다. Lambda은 한 번의 호출로 배치의 모든 레코드를 함수에 전달합니다. 단, 이벤트의 총 크기가 동기식 호출(6 MB)에 대한 페이로드 한도를 초과하지 않아야 합니다.

  • 배치 기간 – 함수를 호출하기 전에 레코드를 수집할 최대 기간(단위: 초)를 지정합니다.

  • 시작 위치 – 새 레코드, 기존의 모든 레코드 또는 특정 날짜 이후에 생성된 레코드만 처리합니다.

    • 최신 – 스트림에 추가된 새 레코드를 처리합니다.

    • 수평 트리밍 – 스트림의 모든 레코드를 처리합니다.

    • 타임스탬프 – 특정 시간에 시작하는 레코드를 시작합니다.

    기존 레코드 처리 후 함수는 캐치업되고 새 레코드를 계속 처리합니다.

  • On-failure destination(실패 시 대상) – 처리할 수 없는 레코드에 대한 SQS 대기열 또는 SNS 주제입니다. 레코드 배치가 너무 오래되었거나 모든 재시도를 다 사용했기 때문에 Lambda에서 해당 배치를 폐기하는 경우 해당 배치에 대한 세부 정보를 대기열 또는 주제로 보냅니다.

  • Retry attempts(재시도) – 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 이는 배치가 함수에 도달하지 않은 제한 또는 서비스 오류에 적용되지 않습니다.

  • Maximum age of record(최대 레코드 사용 기간) – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다.

  • Split batch on error(오류 시 배치 분할) – 함수에서 오류를 반환하면 재시도하기 전에 배치를 두 개로 분할합니다.

  • Concurrent batches per shard(샤드당 동시 배치) – 동일한 샤드의 여러 배치를 동시에 처리합니다.

  • 활성화 – 이벤트 소스 매핑을 활성화하려면 true로 설정합니다. 레코드 처리를 중지하려면 false로 설정합니다. Lambda는 마지막으로 처리된 레코드를 추적하여 다시 활성화되면 해당 지점부터 처리를 다시 시작합니다.

참고

Kinesis는 각 샤드에 대해 요금을 부과하고 향상된 팬아웃의 경우 스트림에서 읽은 데이터에 대해 요금을 부과합니다. 요금에 대한 자세한 내용은 Amazon Kinesis 요금을 참조하십시오.

나중에 이벤트 소스 구성을 관리하기 위해 디자이너에서 트리거를 선택합니다.

이벤트 소스 매핑 API

AWS CLI 또는 AWS SDK로 이벤트 소스 매핑을 관리하려면 다음 API 작업을 사용합니다.

AWS CLI를 사용하여 이벤트 소스 매핑을 생성하려면 create-event-source-mapping 명령을 사용합니다. 다음 예제는 AWS CLI를 사용하여 my-function 함수를 Kinesis 데이터 스트림에 매핑합니다. 데이터 스트림은 Amazon 리소스 이름(ARN)으로 지정되고, 배치 크기는 500이며 Unix 시간의 타임스탬프부터 시작합니다.

$ aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

소비자를 사용하려면 스트림의 ARN 대신 소비자의 ARN을 지정합니다.

배치가 처리되는 방법을 사용자 지정하고 처리할 수 없는 레코드를 폐기할 때를 지정하는 추가 옵션을 구성합니다. 다음 예제는 두 번의 재시도 후 또는 레코드의 시간이 1시간을 넘는 경우 실패 레코드를 SQS 대기열로 보내는 이벤트 소스 매핑을 업데이트합니다.

$ aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}' { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

업데이트된 설정은 비동기식으로 적용되며 프로세스가 완료된 후에야 출력에 반영됩니다. get-event-source-mapping 명령을 사용하여 현재 상태를 봅니다.

$ aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

여러 배치를 동시에 처리하려면 --parallelization-factor 옵션을 사용합니다.

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

오류 처리

Kinesis 스트림에서 레코드를 읽는 이벤트 소스 매핑은 동기식으로 함수를 호출하고 오류 시 재시도합니다. 함수가 제한되거나 Lambda 서비스에서 함수 호출 없이 오류를 반환하는 경우 레코드가 만료되거나 이벤트 소스 매핑에 구성된 최대 사용 기간을 초과할 때까지 Lambda에서 재시도합니다.

함수가 오류를 반환하는 레코드를 받으면 Lambda에서 배치 레코드가 만료되거나, 최대 기간을 초과하거나, 구성된 재시도 할당량에 도달할 때까지 재시도합니다. 함수 오류의 경우에도 실패한 배치를 두 배치로 분할하도록 이벤트 소스 매핑을 구성할 수 있습니다. 보다 작은 배치로 재시도하면 잘못된 레코드가 격리되고 시간 초과 문제가 해결됩니다. 배치를 분할하는 작업은 재시도 할당량에 포함되지 않습니다.

오류 처리에서 실패를 측정하는 경우 Lambda는 레코드를 폐기하고 스트림에서 배치 처리를 계속합니다. 기본 설정을 사용하는 경우 이는 잘못된 레코드가 영향을 받은 샤드에 대한 처리를 최대 one week 동안 차단할 수 있음을 의미합니다. 이를 방지하려면 함수의 이벤트 소스 매핑을 사용자의 사례에 적합한 최대 레코드 사용 기간 및 합당한 재시도 횟수로 구성합니다.

폐기된 배치의 레코드를 유지하려면 실패한 이벤트 대상을 구성합니다. Lambda는 배치의 세부 정보와 함께 문서를 대상 대기열 또는 주제에 보냅니다.

실패한 이벤트 레코드의 대상을 구성하려면

  1. Lambda 콘솔 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 디자이너에서 대상 추가를 선택합니다.

  4. 소스에서 Stream invocation(스트림 호출)을 선택합니다.

  5. Stream(스트림)에서 함수에 매핑되는 스트림을 선택합니다.

  6. Destination type(대상 유형)에서 호출 레코드를 수신하는 리소스 유형을 선택합니다.

  7. Destination(대상)에서 리소스를 선택합니다.

  8. 저장을 선택합니다.

다음 예제에서는 Kinesis 스트림의 호출 레코드를 보여 줍니다.

예 호출 레코드

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

이 정보를 사용하여 문제 해결을 위해 스트림에서 영향을 받은 레코드를 검색할 수 있습니다. 실제 레코드는 포함되지 않으므로 이 레코드를 처리하고 실제 레코드가 만료되고 없어지기 전에 스트림에서 해당 레코드를 검색해야 합니다.

Amazon CloudWatch 지표

Lambda는 해당 함수가 한 배치(batch)의 레코드 처리를 완료하면 IteratorAge 측정치를 생성합니다. 이 측정치는 처리가 완료되었을 때 배치의 마지막 레코드가 얼마나 오래되었는지를 나타냅니다. 함수가 새 이벤트를 처리하는 경우 반복기 수명을 사용하여 레코드가 추가된 후 함수에서 레코드를 처리할 때까지의 지연 시간을 추정할 수 있습니다.

반복기 수명이 증가하는 추세이면 함수에 문제가 있음을 나타낼 수 있습니다. 자세한 내용은 AWS Lambda 함수 지표 작업 단원을 참조하십시오.