자체 관리형 Apache Kafka에서 Lambda 사용 - AWS Lambda

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

자체 관리형 Apache Kafka에서 Lambda 사용

참고

Lambda 함수 이외의 대상으로 데이터를 전송하거나 데이터를 전송하기 전에 데이터를 보강하려는 경우 Amazon EventBridge 파이프를 참조하세요.

Lambda는 이벤트 소스Apache Kafka를 지원합니다. Apache Kafka는 데이터 파이프라인 및 스트리밍 분석과 같은 워크로드를 지원하는 오픈 소스 이벤트 스트리밍 플랫폼입니다.

AWS 관리형 Kafka 서비스인 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 자체 관리형 Kafka 클러스터를 사용할 수 있습니다. 자세한 내용은 Amazon MSK에서 Lambda 사용에 대한 자세한 내용은 Amazon MSK에서 Lambda 사용을 참조하세요.

이 주제에서는 자체 관리형 Kafka 클러스터에서 Lambda를 사용하는 방법을 설명합니다. AWS 용어에서 자체 관리형 클러스터는 AWS가 아닌 다른 서비스에 호스팅된 Kafka 클러스터를 포함합니다. 예를 들어, Confluent Cloud와 같은 클라우드 공급업체를 통해 Kafka 클러스터를 호스팅할 수 있습니다.

이벤트 소스로서 Apache Kafka는 Amazon Simple Queue Service(Amazon SQS) 또는 Amazon Kinesis를 사용하는 것과 유사하게 작동합니다. Lambda는 이벤트 소스의 새 메시지를 내부적으로 폴링한 다음 대상 Lambda 함수를 동기적으로 호출합니다. Lambda는 메시지를 배치 단위로 읽고 이를 함수에 이벤트 페이로드로 제공합니다. 최대 배치 크기는 구성 가능합니다. (기본값은 100개의 메시지입니다.)

Kafka 기반 이벤트 소스의 경우 Lambda는 일괄 처리 기간 및 배치 크기와 같은 처리 제어 파라미터를 지원합니다. 자세한 정보는 일괄 처리 동작 섹션을 참조하세요.

자체 관리형 Kafka를 이벤트 소스로 사용하는 방법의 예는 AWS 컴퓨팅 블로그에서 Using self-hosted Apache Kafka as an event source for AWS Lambda를 참조하세요.

예제 이벤트

Lambda는 Lambda 함수를 호출할 때 이벤트 파라미터의 메시지 배치를 보냅니다. 이벤트 페이로드에는 메시지 배열이 포함됩니다. 각 배열 항목에는 Kafka 주제 및 Kafka 파티션 식별자에 대한 세부 정보와 함께 타임스탬프 및 base64로 인코딩된 메시지가 포함됩니다.

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Kafka 클러스터 인증

Lambda는 자체 관리형 Apache Kafka 클러스터를 통해 인증하는 여러 가지 방법을 지원합니다. 지원되는 인증 방법 중 하나를 사용하도록 Kafka 클러스터를 구성해야 합니다. Kafka 보안에 대한 자세한 내용은 Kafka 설명서의 보안 섹션을 참조하세요.

VPC 액세스

VPC 내의 Kafka 사용자만 Kafka 브로커에 액세스하는 경우 Amazon Virtual Private Cloud(Amazon VPC) 액세스에 대해 Kafka 이벤트 소스를 구성해야 합니다.

SASL/SCRAM 인증

Lambda는 전송 계층 보안(TLS) 암호화(SASL_SSL)를 통해 Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism(SASL/SCRAM) 인증을 지원합니다. Lambda는 암호화된 자격 증명을 전송하여 클러스터에서 인증합니다. Lambda는 일반 텍스트(SASL_PLAINTEXT)가 포함된 SASL/SCRAM을 지원하지 않습니다. SASL/SCRAM 인증에 관한 자세한 내용은 RFC 5802를 참조하세요.

Lambda는 SASL/PLAIN 인증도 지원합니다. 이 메커니즘은 일반 텍스트 보안 인증을 사용하므로, 서버에 연결할 때 TLS 암호화를 사용하여 보안 인증 정보를 보호해야 합니다.

SASL 인증의 경우 로그인 자격 증명을 AWS Secrets Manager에 보안 암호로 저장합니다. Secrets Manager 사용에 대한 자세한 내용은 AWS Secrets Manager사용 설명서자습서: 비밀 정보 생성 및 검색을 참조하세요.

중요

인증에 Secrets Manager를 사용하려면 Lambda 함수와 동일한 AWS 리전에 보안 암호를 저장해야 합니다.

상호 TLS 인증

상호 TLS(mTLS)는 클라이언트와 서버 간의 양방향 인증을 제공합니다. 클라이언트는 서버가 클라이언트를 확인할 수 있도록 서버에 인증서를 보내고, 서버는 클라이언트가 서버를 확인할 수 있도록 클라이언트에 인증서를 보냅니다.

자체 관리형 Apache Kafka에서 Lambda는 클라이언트 역할을 수행합니다. 클라이언트 인증서(Secrets Manager의 비밀 정보)를 구성하여 Kafka 브로커로 Lambda를 인증합니다. 클라이언트 인증서는 서버의 신뢰 저장소에 있는 CA에서 서명해야 합니다.

Kafka 클러스터는 서버 인증서를 Lambda로 전송하여 Lambda로 Kafka 브로커를 인증합니다. 서버 인증서는 퍼블릭 CA 인증서 또는 프라이빗 CA/자체 서명 인증서일 수 있습니다. 퍼블릭 CA 인증서는 Lambda 신뢰 저장소에 있는 인증 기관(CA)에서 서명해야 합니다. 프라이빗 CA/자체 서명 인증서의 경우 서버 루트 CA 인증서(Secrets Manager의 비밀 정보로)를 구성합니다. Lambda는 루트 인증서를 사용하여 Kafka 브로커를 확인합니다.

mTLS에 대한 자세한 내용은 이벤트 소스로 Amazon MSK에 대한 상호 TLS 인증 도입을 참조하세요.

클라이언트 인증서 비밀 정보 구성

CLIENT_CERTIFICATE_TLS_AUTH 비밀 정보에 인증서 필드와 프라이빗 키 필드가 필요합니다. 암호화된 프라이빗 키의 경우 비밀 정보에 프라이빗 키 암호가 필요합니다. 인증서와 프라이빗 키는 모두 PEM 형식이어야 합니다.

참고

Lambda는 PBES1(PBES2가 아님) 프라이빗 키 암호화 알고리즘을 지원합니다.

인증서 필드에는 클라이언트 인증서부터 시작하여 중간 인증서가 이어지고 루트 인증서로 끝나는 인증서 목록이 포함되어야 합니다. 각 인증서는 다음 구조의 새 줄에서 시작해야 합니다.

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager는 최대 65,536바이트의 보안 정보를 지원하므로 긴 인증서 체인을 위한 충분한 공간입니다.

프라이빗 키는 다음 구조의 PKCS #8 형식이어야 합니다.

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

암호화된 프라이빗 키의 경우 다음 구조를 사용합니다.

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

다음 예제에서는 암호화된 프라이빗 키를 사용한 mTLS 인증용 비밀 정보 콘텐츠를 표시합니다. 암호화된 프라이빗 키의 경우 비밀 정보에 프라이빗 키 암호를 포함합니다.

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

서버 루트 CA 인증서 비밀 정보 구성

Kafka 브로커가 프라이빗 CA에서 서명한 인증서로 TLS 암호화를 사용하는 경우 이 비밀 정보를 생성합니다. VPC, SASL/SCRAM, SASL/PLAIN 또는 mTLS 인증에 TLS 암호화를 사용할 수 있습니다.

서버 루트 CA 인증서 비밀 정보에는 PEM 형식의 Kafka 브로커의 루트 CA 인증서가 포함된 필드가 필요합니다. 다음 예제는 비밀 정보의 구조를 보여줍니다.

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

API 액세스 및 권한 관리

자체 관리형 Kafka 클러스터에 액세스하는 것 외에도 Lambda 함수에는 다양한 API 작업을 수행할 수 있는 권한이 필요합니다. 함수의 실행 역할에 이러한 권한을 추가합니다. 사용자가 API 작업에 액세스해야 하는 경우 AWS Identity and Access Management(IAM) 사용자 또는 역할에 자격 증명 정책에 필요한 권한을 추가합니다.

필요한 Lambda 함수 권한

Amazon CloudWatch Logs의 로그 그룹에 로그를 생성하고 저장하려면 Lambda 함수의 실행 역할에 다음 권한이 있어야 합니다.

선택적 Lambda 함수 권한

Lambda 함수에 또한 다음 권한이 필요할 수 있습니다.

  • Secrets Manager 비밀 정보를 설명합니다.

  • AWS Key Management Service(AWS KMS) 고객 관리형 키에 액세스합니다.

  • Amazon VPC에 액세스합니다.

  • 실패한 간접 호출 기록을 대상으로 전송합니다.

Secrets Manager 및 AWS KMS 권한

Kafka 브로커에 대해 구성하는 액세스 제어 유형에 따라 Lambda 함수에는 Secrets Manager 비밀 정보에 액세스하거나 AWS KMS 고객 관리형 키를 복호화할 수 있는 권한이 필요할 수 있습니다. 리소스에 액세스하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

VPC 권한

VPC 내의 사용자만 자체 관리형 Apache Kafka 클러스터에 액세스할 수 있는 경우 Lambda 함수에 Amazon VPC 리소스에 액세스할 수 있는 권한이 있어야 합니다. 이러한 리소스에는 VPC, 서브넷, 보안 그룹 및 네트워크 인터페이스가 있습니다. 리소스에 액세스하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

대상으로 레코드 전송

실패한 간접 호출의 레코드를 장애 시 대상으로 전송하려면 Lambda 함수에 이러한 레코드를 전송할 권한이 있어야 합니다. Kafka 이벤트 소스 매핑의 경우 Amazon SNS 주제, Amazon SQS 대기열 또는 Amazon S3 버킷을 대상으로 선택할 수 있습니다. 레코드를 SNS 주제로 전송하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

레코드를 SQS 대기열로 전송하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

레코드를 S3 버킷으로 전송하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

또한 대상에 KMS 키를 구성한 경우 대상 유형에 따라 Lambda에는 다음과 같은 권한이 필요합니다.

  • S3 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:GenerateDataKey가 필요합니다. KMS 키와 S3 버킷 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:GenerateDataKey를 허용합니다.

  • SQS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decryptkms:GenerateDataKey가 필요합니다. KMS 키와 SQS 대기열 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKeykms:ReEncrypt를 허용합니다.

  • SNS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decryptkms:GenerateDataKey가 필요합니다. KMS 키와 SNS 주제 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKeykms:ReEncrypt를 허용합니다.

실행 역할에 권한 추가

자체 관리형 Apache Kafka 클러스터가 사용하는 다른 AWS 서비스에 액세스하기 위해 Lambda는 함수의 실행 역할에 정의된 권한 정책을 사용합니다.

기본적으로 Lambda는 자체 관리형 Apache Kafka 클러스터에 대한 필수 또는 선택적 작업을 수행할 수 없습니다. IAM 신뢰 정책에서 이러한 작업을 생성하고 정의한 다음 정책을 실행 역할에 연결해야 합니다. 이 예제에서는 Lambda가 Amazon VPC 리소스에 액세스하도록 허용하는 정책을 생성하는 방법을 보여줍니다.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

IAM 콘솔에서 JSON 정책 문서를 생성하는 방법에 대한 자세한 내용은 IAM 사용 설명서JSON 탭에서 정책 생성을 참조하세요.

IAM 정책을 사용하여 사용자에게 액세스 권한 부여

기본적으로 사용자 및 역할에는 이벤트 소스 API 작업을 수행할 수 있는 권한이 없습니다. 조직 또는 계정의 사용자에게 액세스 권한을 부여하려면 자격 증명 기반 정책을 생성 혹은 업데이트합니다. 자세한 내용은 IAM 사용 설명서정책을 사용하여 AWS 리소스에 대한 액세스 제어를 참조하세요.

인증 및 권한 부여 오류

Kafka 클러스터의 데이터를 사용하는 데 필요한 권한이 누락된 경우 Lambda는 LastProcessingResult 아래의 이벤트 소스 매핑에 다음 오류 메시지 중 하나를 표시합니다.

클러스터가 Lambda를 인증하지 못함

SASL/SCRAM 또는 mMTS의 경우 이 오류는 제공된 사용자에게 다음 필수 Kafka 액세스 제어 목록(ACL) 권한이 모두 있지는 않음을 나타냅니다.

  • DescribeConfigs 클러스터

  • 그룹 설명

  • 그룹 읽기

  • 주제 설명

  • Thread-Topic

필수 kafka-cluster 권한으로 Kafka ACL을 생성할 때 주제와 그룹을 리소스로 지정합니다. 주제 이름은 이벤트 소스 매핑의 주제와 일치해야 합니다. 그룹 이름은 이벤트 소스 매핑의 UUID와 일치해야 합니다.

실행 역할에 필요한 권한을 추가한 후 변경 사항이 적용되기까지 몇 분 정도 소요될 수 있습니다.

SASL 인증 실패

SASL/SCRAM 또는 SASL/PLAIN의 경우 이 오류는 제공된 로그인 자격 증명이 유효하지 않음을 나타냅니다.

서버가 Lambda를 인증하지 못함

이 오류는 Kafka 브로커가 Lambda를 인증하지 못했음을 나타냅니다. 이 오류는 다음과 같은 이유로 발생할 수 있습니다.

  • mTLS 인증을 위한 클라이언트 인증서를 제공하지 않았습니다.

  • 클라이언트 인증서를 제공했지만 Kafka 브로커가 mTLS 인증을 사용하도록 구성되지 않았습니다.

  • Kafka 브로커가 클라이언트 인증서를 신뢰하지 않습니다.

Lambda가 서버를 인증하지 못함

이 오류는 Lambda가 Kafka 브로커를 인증하지 못했음을 나타냅니다. 이 오류는 다음과 같은 이유로 발생할 수 있습니다.

  • Kafka 브로커는 자체 서명된 인증서 또는 사설 CA를 사용하지만 서버 루트 CA 인증서를 제공하지 않았습니다.

  • 서버 루트 CA 인증서가 브로커 인증서에 서명한 루트 CA와 일치하지 않습니다.

  • 브로커의 인증서에 브로커의 DNS 이름 또는 IP 주소가 주체 대체 이름으로 포함되어 있지 않기 때문에 호스트 이름 검증에 실패했습니다.

제공된 인증서 또는 프라이빗 키가 잘못됨

이 오류는 Kafka 소비자가 제공된 인증서 또는 프라이빗 키를 사용할 수 없음을 나타냅니다. 인증서와 키가 PEM 형식을 사용하고 프라이빗 키 암호화가 PBES1 알고리즘을 사용하는지 확인합니다.

네트워크 구성

Kafka 브로커에 대한 Amazon VPC 액세스를 구성하는 경우 Lambda는 Kafka 클러스터가위치한 Amazon VPC 리소스에 대한 액세스 권한이 있어야 합니다. Lambda에 대한 AWS PrivateLink VPC 엔드포인트와 AWS Security Token Service(AWS STS)를 배포하는 것이 좋습니다. 브로커가 인증을 사용하는 경우 Secrets Manager용 VPC 엔드포인트도 배포합니다. 장애 시 대상을 구성한 경우 대상 서비스를 위한 VPC 엔드포인트도 배포합니다.

또는 Kafka 클러스터와 연결된 VPC에 퍼블릭 서브넷당 하나의 NAT 게이트웨이가 포함되는지 확인합니다. 자세한 내용은 VPC 연결 함수의 인터넷 및 서비스 액세스 단원을 참조하십시오.

또한 VPC 엔드포인트를 사용하는 경우 프라이빗 DNS 이름을 활성화하도록 구성해야 합니다.

다음 규칙(최소)으로 Amazon VPC 보안 그룹을 구성합니다.

  • 인바운드 규칙 – 이벤트 소스에 대해 지정된 보안 그룹에 대해 Kafka 브로커 포트의 모든 트래픽을 허용합니다. Kafka는 기본적으로 포트 9092를 사용합니다.

  • 아웃바운드 규칙 - 모든 대상에 대해 포트 443의 모든 트래픽을 허용합니다. 이벤트 소스에 대해 지정된 보안 그룹에 대해 Kafka 브로커 포트의 모든 트래픽을 허용합니다. Kafka는 기본적으로 포트 9092를 사용합니다.

  • NAT 게이트웨이 대신 VPC 엔드포인트를 사용하는 경우 VPC 엔드포인트와 연결된 보안 그룹은 이벤트 소스의 보안 그룹에서 포트 443의 모든 인바운드 트래픽을 허용해야 합니다.

네트워크 구성에 대한 자세한 내용은 AWS 컴퓨팅 블로그의 Setting up AWS Lambda with an Apache Kafka cluster within a VPC를 참조하세요.

Kafka 클러스터를 이벤트 소스로 추가

이벤트 소스 매핑을 생성하려면 Lambda 콘솔, AWS SDK 또는 AWS Command Line Interface(AWS CLI)를 사용해 Kafka를 Lambda 함수 트리거로 추가합니다.

이 단원에서는 Lambda 콘솔 및 AWS CLI를 사용해 이벤트 소스 매핑을 생성하는 방법을 설명합니다.

참고

자체 관리형 Apache Kafka의 이벤트 소스 매핑을 업데이트, 비활성화 또는 삭제하면 최대 15분이 지나야 변경 사항이 적용됩니다. 이 기간이 경과하기 전에 이벤트 소스 매핑은 계속해서 이벤트를 처리하고 이전 설정을 사용하여 함수를 호출할 수 있습니다. 이는 콘솔에 표시된 이벤트 소스 매핑의 상태가 변경 사항이 적용된 것으로 표시되는 경우에도 마찬가지입니다.

필수 조건

  • 자체 관리형 Apache Kafka 클러스터. Lambda는 Apache Kafka 버전 0.10.1.0 이상을 지원합니다.

  • 자체 관리형 Kafka 클러스터에서 사용하는 AWS 리소스에 액세스할 수 있는 권한이 있는 실행 역할입니다.

사용자 지정이 가능한 소비자 그룹 ID

Kafka를 이벤트 소스로 설정할 때 소비자 그룹 ID를 지정할 수 있습니다. 이 소비자 그룹 ID는 Lambda 함수에 가입하려는 Kafka 소비자 그룹의 기존 식별자입니다. 이 기능을 사용하여 진행 중인 모든 Kafka 레코드 처리 설정을 다른 소비자에서 Lambda로 원활하게 마이그레이션할 수 있습니다.

소비자 그룹 ID를 지정하고 해당 소비자 그룹 내에 다른 활성 폴러가 있는 경우 Kafka는 모든 소비자에게 메시지를 배포합니다. 즉, Lambda는 Kafka 주제에 대한 모든 메시지를 수신하지는 않습니다. Lambda가 주제에 있는 모든 메시지를 처리하도록 하려면 해당 소비자 그룹의 다른 모든 폴러를 끄십시오.

또한 소비자 그룹 ID를 지정했는데 Kafka가 동일한 ID를 가진 유효한 기존 소비자 그룹을 찾으면 Lambda는 이벤트 소스 매핑을 위한 StartingPosition 파라미터를 무시합니다. 대신 Lambda는 소비자 그룹의 커밋된 오프셋에 따라 레코드 처리를 시작합니다. 소비자 그룹 ID를 지정했는데 Kafka가 기존 소비자 그룹을 찾을 수 없는 경우, Lambda는 StartingPosition에서 지정된 대로 이벤트 소스를 구성합니다.

지정하는 소비자 그룹 ID는 모든 Kafka 이벤트 소스 중에서 고유해야 합니다. 지정된 소비자 그룹 ID로 Kafka 이벤트 소스 매핑을 생성한 후에는 이 값을 업데이트할 수 없습니다.

장애 시 대상

실패한 간접 호출 또는 Kafka 이벤트 소스의 과대 페이로드에 대한 기록을 유지하려면 함수에 장애 시 대상을 구성합니다. 간접 호출이 실패하면 Lambda는 간접 호출의 세부 정보가 포함된 JSON 레코드를 대상으로 전송합니다.

Amazon SNS 주제, Amazon SQS 대기열 또는 Amazon S3 버킷 중에서 대상으로 선택할 수 있습니다. SNS 주제 또는 SQS 대기열 대상의 경우, Lambda는 레코드 메타데이터를 대상으로 전송합니다. S3 버킷 대상의 경우, Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 대상으로 전송합니다.

Lambda가 선택한 목적지로 레코드를 성공적으로 전송하려면 함수의 실행 역할에 관련 권한이 포함되어야 합니다. 또한 표에는 각 대상 유형이 JSON 간접 호출 레코드를 수신하는 방법도 설명되어 있습니다.

대상 유형 다음 이벤트 소스에서 지원 필요한 권한 대상별 JSON 형식

Amazon SQS 대기열

  • Kinesis

  • DynamoDB

  • 자체 관리형 Apache Kafka 및 관리형 Apache Kafka

Lambda는 간접 호출 레코드 메타데이터를 Message로 대상에 전달합니다.

Amazon SNS 주제

  • Kinesis

  • DynamoDB

  • 자체 관리형 Apache Kafka 및 관리형 Apache Kafka

Lambda는 간접 호출 레코드 메타데이터를 Message로 대상에 전달합니다.

Amazon S3 버킷

  • 자체 관리형 Apache Kafka 및 관리형 Apache Kafka

Lambda는 간접 호출 레코드와 메타데이터와 함께 대상에 저장합니다.

작은 정보

가장 좋은 방법은 실행 역할에만 필요한 최소 권한을 포함하는 것입니다.

SNS 및 SQS 대상

다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 SQS 대기열 대상으로 전송하는 내용을 보여줍니다. recordsInfo 아래의 각 키에는 하이픈으로 구분된 Kafka 주제와 파티션이 모두 포함되어 있습니다. 예를 들어, "Topic-0" 키의 경우 Topic은 Kafka 주제이고 0은 파티션입니다. 각 주제 및 파티션에 대해 오프셋과 타임스탬프 데이터를 사용하여 원래의 간접 호출 레코드를 찾을 수 있습니다.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 대상

S3 대상의 경우, Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 대상으로 전송합니다. 다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 S3 버킷 대상으로 전송하는 것을 보여줍니다. SQS 및 SNS 대상에 대한 이전 예제의 모든 필드 외에도 payload 필드에는 원래 간접 호출 레코드가 이스케이프된 JSON 문자열로 포함되어 있습니다.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
작은 정보

대상 버킷에서 S3 버전 관리를 활성화하는 것이 좋습니다.

장애 시 대상 구성

이 콘솔을 사용하여 장애 시 대상을 구성하려면 다음 단계를 따르세요.

  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.

  4. 소스의 경우 이벤트 소스 매핑 간접 호출을 선택합니다.

  5. 이벤트 소스 매핑의 경우 이 함수에 대해 구성된 이벤트 소스를 선택합니다.

  6. 조건의 경우 실패 시를 선택합니다. 이벤트 소스 매핑 간접 호출의 경우 이 조건만 수락됩니다.

  7. 대상 유형의 경우 Lambda가 간접 호출 레코드를 전송할 대상 유형을 선택합니다.

  8. Destination(대상)에서 리소스를 선택합니다.

  9. 저장을 선택합니다.

Lambda API를 사용하여 장애 시 대상을 구성할 수도 있습니다. 예를 들어, 다음과 같은 CreateEventSourceMapping CLI 명령은 SQS 실패 시 목적지를 MyFunction에 추가합니다.

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

다음 UpdateEventSourceMapping CLI 명령은 uuid 입력과 연결된 Kafka 이벤트 소스에 S3 장애 시 대상을 추가합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

대상을 제거하려면 destination-config 파라미터의 인수로 빈 문자열을 제공합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

자체 관리형 Kafka 클러스터 추가(콘솔)

다음 단계에 따라 자체 관리형 Apache Kafka 클러스터 및 Kafka 주제를 Lambda 함수의 트리거로 추가합니다.

Lambda 함수에 Apache Kafka 트리거를 추가하려면(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. Lambda 함수의 이름을 선택합니다.

  3. 함수 개요(Function overview)에서 트리거 추가(Add trigger)를 선택합니다.

  4. 트리거 구성에서 다음을 수행합니다.

    1. Apache Kafka 트리거 유형을 선택합니다.

    2. Bootstrap 서버에는 클러스터의 Kafka 브로커 호스트 및 포트 페어 주소를 입력한 다음 추가를 선택합니다. 클러스터의 각 Kafka 브로커에 대해 이를 반복합니다.

    3. 주제 이름에는 클러스터에 레코드를 저장하는 데 사용되는 Kafka 주제의 이름을 입력합니다.

    4. (선택 사항) 배치 크기(Batch size)에 단일 배치에서 검색할 최대 레코드 수를 입력합니다.

    5. Batch window에서 Lambda가 함수를 호출하기 전에 레코드를 수집하는 데 걸리는 최대 시간(초)을 입력합니다.

    6. (선택 사항) Consumer group ID에서 가입할 Kafka 소비자 그룹의 ID를 입력합니다.

    7. (선택 사항) 시작 위치의 경우 최신 레코드에서 스트림 읽기를 시작하려면 최신을 선택하고, 사용 가능한 가장 빠른 레코드에서 시작하려면 수평 트리밍을 선택하고, 읽기를 시작할 타임스탬프를 지정하려면 타임스탬프를 선택합니다.

    8. (선택 사항) VPC에서 Kafka 클러스터용 Amazon VPC를 선택합니다. 그런 다음 VPC 서브넷(VPC subnets)VPC 보안 그룹(VPC security groups)을 선택합니다.

      VPC 내의 사용자만 브로커에 액세스하는 경우 이 설정이 필요합니다.

    9. (선택 사항) 인증(Authentication)에서 추가(Add)를 선택한 후 다음을 수행합니다.

      1. 클러스터에 있는 Kafka 브로커의 액세스 또는 인증 프로토콜을 선택합니다.

        • Kafka 브로커가 SASL/PLAIN 인증을 사용하는 경우 BASIC_AUTH를 선택합니다.

        • 브로커가 SASL/SCRAM 인증을 사용하는 경우 SASL_SCRAM 프로토콜 중 하나를 선택합니다.

        • mTLS 인증을 구성하는 경우 CLIENT_CERTIFICATE_TLS_AUTH 프로토콜을 선택합니다.

      2. SASL/SCRAM 또는 mTLS 인증의 경우 Kafka 클러스터에 대한 자격 증명이 포함된 Secrets Manager 비밀 키를 선택합니다.

    10. (선택 사항) 암호화(Encryption)에서 Kafka 브로커가 프라이빗 CA에서 서명한 인증서를 사용하는 경우 Kafka 브로커가 TLS 암호화에 사용하는 루트 CA 인증서가 포함된 Secrets Manager 암호를 선택합니다.

      이 설정은 SASL/SCRAM 또는 SASL/PLANE에 대한 TLS 암호화 및 mTLS 인증에 적용됩니다.

    11. 테스트를 위해 트리거를 비활성화된 상태에서 생성하려면(권장됨) 트리거 사용을 선택 해제합니다. 또는 트리거를 즉시 사용하려면 트리거 사용을 선택합니다.

  5. 트리거를 생성하려면 추가를 선택합니다.

자체 관리형 Kafka 클러스터 추가(AWS CLI)

다음 예제 AWS CLI 명령을 사용하여 Lambda 함수에 대한 자체 관리형 Apache Kafka 트리거를 생성하고 확인합니다.

SASL/SCRAM 사용

Kafka 사용자가 인터넷을 통해 Kafka 브로커에 액세스한다면 SASL/SCRAM 인증용으로 생성한 Secrets Manager 비밀 정보를 지정합니다. 다음 예제에서는 create-event-source-mapping AWS CLI 명령을 사용해 my-kafka-function이라는 Lambda 함수를 AWSKafkaTopic이라는 Kafka 주제에 매핑합니다.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

VPC 사용

VPC 내의 Kafka 사용자만 Kafka 브로커에 액세스한다면 VPC, 서브넷 및 VPC 보안 그룹을 지정해야 합니다. 다음 예제에서는 create-event-source-mapping AWS CLI 명령을 사용해 my-kafka-function이라는 Lambda 함수를 AWSKafkaTopic이라는 Kafka 주제에 매핑합니다.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

AWS CLI를 사용하여 상태 확인

다음 예제에서는 get-event-source-mapping AWS CLI 명령을 사용해 생성한 이벤트 소스 매핑의 상태를 설명합니다.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Kafka 클러스터를 이벤트 소스로 사용

Apache Kafka 클러스터를 Lambda 함수의 트리거로 추가하면 클러스터가 이벤트 소스로 사용됩니다.

Lambda는 사용자가 지정한 StartingPosition을 기반으로 CreateEventSourceMapping 요청에서 Topics로 지정한 Kafka 주제에서 이벤트 데이터를 읽습니다. 성공적인 처리 후, Kafka 토픽은 Kafka 클러스터에 커밋됩니다.

StartingPositionLATEST로 지정하면 Lambda는 주제에 속한 각 파티션의 최신 메시지에서 읽기를 시작합니다. 트리거 구성 후 Lambda가 메시지를 읽기 시작하기까지 약간의 지연이 발생할 수 있으므로 Lambda는 이 기간 중에 생성된 메시지를 읽지 않습니다.

Lambda는 지정한 하나 이상의 Kafka 주제 파티션의 레코드를 처리하고 JSON 페이로드를 함수로 보냅니다. 사용 가능한 레코드가 더 있는 경우 Lambda는 함수가 주제를 따라잡을 때까지 CreateEventSourceMapping 요청에서 지정한 BatchSize 값을 기반으로 배치로 레코드를 계속 처리합니다.

함수가 배치의 어떤 메시지에 대해 오류를 반환하면 Lambda는 처리가 성공하거나 메시지가 만료될 때까지 전체 메시지 배치를 다시 시도합니다. 모든 재시도에 실패한 레코드를 실패 시 대상으로 전송하여 나중에 처리하도록 할 수 있습니다.

참고

Lambda 함수의 최대 제한 시간은 일반적으로 15분이지만 Amazon MSK, 자체 관리형 Apache Kafka, Amazon DocumentDB, ActiveMQ 및 RabbitMQ용 Amazon MQ에 대한 이벤트 소스 매핑은 최대 제한 시간이 14분인 함수만 지원합니다. 이 제약 조건에 따라 이벤트 소스 매핑에서 함수 오류 및 재시도를 적절히 처리할 수 있습니다.

폴링 및 스트리밍 시작 위치

이벤트 소스 매핑 생성 및 업데이트 중 스트림 폴링은 최종적으로 일관됩니다.

  • 이벤트 소스 매핑 생성 중 스트림에서 이벤트 폴링을 시작하는 데 몇 분 정도 걸릴 수 있습니다.

  • 이벤트 소스 매핑 업데이트 중 스트림에서 이벤트 폴링을 중지했다가 다시 시작하는 데 몇 분 정도 걸릴 수 있습니다.

이 동작은 스트림의 시작 위치로 LATEST를 지정하면 이벤트 소스 매핑이 생성 또는 업데이트 중에 이벤트를 놓칠 수 있음을 의미합니다. 누락된 이벤트가 없도록 하기 위해서는 스트림 시작 위치를 TRIM_HORIZON 또는 AT_TIMESTAMP로 지정하세요.

Kafka 이벤트 소스의 Auto Scaling

Apache Kafka 이벤트 소스를 처음 생성하면 Lambda는 한 명의 소비자를 할당하여 Kafka 주제의 모든 파티션을 처리합니다. 각 소비자는 증가한 워크로드를 처리하기 위해 여러 프로세서를 병렬로 실행합니다. 그뿐 아니라 Lambda는 워크로드에 따라 소비자 수를 자동으로 늘리거나 줄입니다. 각 파티션에서 메시지 순서를 유지하기 위해 최대 소비자 수는 주제의 파티션당 하나의 소비자입니다.

Lambda는 1분 간격으로 주제의 모든 파티션의 소비자 오프셋 지연을 평가합니다. 지연이 너무 높으면 파티션은 Lambda가 메시지를 처리할 수 있는 것보다 더 빠르게 메시지를 수신합니다. 필요에 따라 Lambda는 주제에 소비자를 추가하거나 제거합니다. 소비자를 추가 또는 제거하는 크기 조정 프로세스는 평가 후 3분 이내에 진행됩니다.

대상 Lambda 함수가 오버로드되면 Lambda는 소비자 수를 줄입니다. 이 동작은 소비자가 검색하고 함수에 보낼 수 있는 메시지 수를 줄임으로써 함수의 워크로드를 줄입니다.

Kafka 주제의 처리량을 모니터링하려면 consumer_lagconsumer_offset 같은 Apache Kafka 소비자 지표를 확인합니다. 얼마나 많은 함수 호출이 병렬로 발생하는지 확인하기 위해 함수에 대한 동시성 지표를 확인할 수도 있습니다.

이벤트 소스 API 작업

Lambda 콘솔, AWS SDK 또는 AWS CLI를 사용하여 Kafka 클러스터를 Lambda 함수의 이벤트 소스로 추가하면 Lambda는 API를 사용하여 요청을 처리합니다.

AWS Command Line Interface(AWS CLI) 또는 AWS SDK를 사용하여 이벤트 소스를 관리하려면 다음 API 작업을 사용할 수 있습니다.

이벤트 소스 매핑 오류

Lambda 함수의 이벤트 소스로 Apache Kafka 클러스터를 추가한 경우 함수에 오류가 발생하면 Kafka 소비자가 레코드 처리를 중지합니다. 토픽 파티션의 소비자는 레코드를 구독하고, 읽고, 처리하는 소비자입니다. 다른 Kafka 소비자는 동일한 오류가 발생하지 않는 한 레코드 처리를 계속할 수 있습니다.

중지된 소비자의 원인을 확인하려면 StateTransitionReason 응답의 EventSourceMapping 필드를 확인하세요. 다음 목록에는 발생할 수 있는 이벤트 소스 오류가 나왔습니다.

ESM_CONFIG_NOT_VALID

이벤트 소스 매핑 구성이 잘못되었습니다.

EVENT_SOURCE_AUTHN_ERROR

Lambda가 이벤트 소스를 인증하지 못했습니다.

EVENT_SOURCE_AUTHZ_ERROR

Lambda에 이벤트 소스에 액세스하는 데 필요한 권한이 없습니다.

FUNCTION_CONFIG_NOT_VALID

함수의 구성이 유효하지 않습니다.

참고

Lambda 이벤트 레코드가 허용되는 크기 제한인 6MB를 초과하면 처리되지 않을 수 있습니다.

Amazon CloudWatch 지표

Lambda는 함수가 레코드를 처리하는 동안 OffsetLag 지표를 내보냅니다. 이 지표의 값은 Kafka 이벤트 소스 주제에 작성된 마지막 레코드와 함수의 소비자 그룹에서 처리한 마지막 레코드 간의 오프셋 차이입니다. 레코드가 추가되는 시점과 소비자 그룹이 이를 처리하는 시점 사이의 지연 시간을 추정하는 데 OffsetLag를 사용할 수 있습니다.

OffsetLag의 증가 추세는 함수의 소비자 그룹 폴러에 문제가 있음을 나타낼 수 있습니다. 자세한 내용은 Lambda 함수 지표 작업 단원을 참조하십시오.

자체 관리형 Apache Kafka 구성 파라미터

모든 Lambda 이벤트 소스 유형은 동일한 CreateEventSourceMappingUpdateEventSourceMapping API 작업을 공유합니다. 그러나 파라마터 중 일부는 Apache Kafka에 적용됩니다.

자체 관리형 Apache Kafka에 적용되는 이벤트 소스 파라미터
파라미터 필수 기본값 참고

BatchSize

N

100

최대값: 10,000

활성

N

활성

FunctionName

Y

FilterCriteria

N

Lambda 이벤트 필터링

MaximumBatchingWindowInSeconds

N

500ms

일괄 처리 동작

SelfManagedEventSource

Y

Kafka 브로커의 목록. 생성 시에만 설정할 수 있음

SelfManagedKafkaEventSourceConfig

N

기본적으로 고유한 값으로 설정되는 소비자 그룹 ID 필드를 포함합니다.

생성 시에만 설정할 수 있음

SourceAccessConfigurations

N

자격 증명 없음

클러스터에 대한 VPC 정보 또는 인증 자격 증명

SASL_PLAIN의 경우 BASIC_AUTH로 설정

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, 또는 LATEST

생성 시에만 설정할 수 있음

StartingPositionTimestamp

N

StartingPosition이 AT_TIMESTAMP로 설정된 경우에만 필요합니다.

주제

Y

주제 이름

생성 시에만 설정할 수 있음