Amazon MSK에서 Lambda 사용 - AWS Lambda

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon MSK에서 Lambda 사용

참고

Lambda 함수 이외의 대상으로 데이터를 전송하거나 데이터를 전송하기 전에 데이터를 보강하려는 경우 Amazon EventBridge 파이프를 참조하세요.

Amazon Managed Streaming for Apache Kafka(Amazon MSK)는 Apache Kafka를 사용하여 스트리밍 데이터를 처리하는 애플리케이션의 구축 및 실행을 위해 사용할 수 있는 완전관리형 서비스입니다. Amazon MSK는 Kafka를 실행하는 클러스터의 설정, 크기 조정, 관리를 간소화합니다. 또한 Amazon MSK는 여러 Availability Zones에 맞게, 그리고 AWS Identity and Access Management(IAM)를 통한 보안을 위해 애플리케이션을 쉽게 구성할 수 있도록 도와줍니다. Amazon MSK는 Kafka의 여러 오픈 소스 버전을 지원합니다.

이벤트 소스로서 Amazon MSK는 Amazon Simple Queue Service(Amazon SQS) 또는 Amazon Kinesis를 사용하는 것과 유사하게 작동합니다. Lambda는 이벤트 소스의 새 메시지를 내부적으로 폴링한 다음 대상 Lambda 함수를 동기적으로 호출합니다. Lambda는 메시지를 배치 단위로 읽고 이를 함수에 이벤트 페이로드로 제공합니다. 최대 배치 크기를 구성할 수 있습니다. 기본값은 메시지 100건입니다. 자세한 내용은 일괄 처리 동작 단원을 참조하십시오.

참고

Lambda 함수의 최대 제한 시간은 일반적으로 15분이지만 Amazon MSK, 자체 관리형 Apache Kafka, Amazon DocumentDB, ActiveMQ 및 RabbitMQ용 Amazon MQ에 대한 이벤트 소스 매핑은 최대 제한 시간이 14분인 함수만 지원합니다. 이 제약 조건에 따라 이벤트 소스 매핑에서 함수 오류 및 재시도를 적절히 처리할 수 있습니다.

Lambda는 각 파티션에 대해 순차적으로 메시지를 읽습니다. 단일 Lambda 페이로드에는 여러 파티션의 메시지가 포함될 수 있습니다. Lambda는 각 배치를 처리한 후 해당 배치에 있는 메시지의 오프셋을 커밋합니다. 함수가 배치의 어떤 메시지에 대해 오류를 반환하면 Lambda는 처리가 성공하거나 메시지가 만료될 때까지 전체 메시지 배치를 다시 시도합니다.

주의

Lambda 이벤트 소스 매핑은 각 이벤트를 한 번 이상 처리하므로 일괄 처리가 중복될 수 있습니다. 중복 이벤트와 관련된 잠재적 문제를 방지하려면 함수 코드를 멱등성으로 만드는 것이 좋습니다. 자세한 내용은 AWS 지식 센터의 멱등성 Lambda 함수를 만들려면 어떻게 해야 합니까?를 참조하세요.

Amazon MSK를 이벤트 소스로 구성하는 방법의 예는 AWS 컴퓨팅 블로그에서 Using Amazon MSK as an event source for AWS Lambda를 참조하세요. 전체 자습서는 Amazon MSK Labs에서 Amazon MSK Lambda 통합을 참조하세요.

예제 이벤트

Lambda는 함수를 호출할 때 이벤트 파라미터의 메시지 배치를 보냅니다. 이벤트 페이로드에는 메시지 배열이 포함됩니다. 각 배열 항목에는 Amazon MSK 주제 및 파티션 식별자에 대한 세부 정보와 함께 타임스탬프 및 base64로 인코딩된 메시지가 포함됩니다.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

MSK 클러스터 인증

Lambda는 Amazon MSK 클러스터에 액세스하고 레코드를 검색하고 다른 태스크를 수행할 수 있는 권한이 필요합니다. Amazon MSK는 MSK 클러스터에 대한 클라이언트 액세스를 제어하는 몇 가지 옵션을 지원합니다.

인증되지 않은 액세스

인터넷을 통해 클러스터에 액세스하는 클라이언트가 없는 경우 인증되지 않은 액세스를 사용할 수 있습니다.

SASL/SCRAM 인증

Amazon MSK는 전송 계층 보안(TLS) 암호화를 통해 Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism(SASL/SCRAM) 인증을 지원합니다. Lambda가 클러스터에 연결하려면 인증 자격 증명 (사용자 이름 및 암호)을 AWS Secrets Manager 비밀 정보에 저장합니다.

Secrets Manager 사용에 대한 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서AWS Secrets Manager를 사용한 사용자 이름 및 암호 인증을 참조하세요.

Amazon MSK는 SASL/PLAIN 인증을 지원하지 않습니다.

IAM 역할 기반 인증

IAM을 사용하여 MSK 클러스터에 연결하는 클라이언트의 자격 증명을 인증할 수 있습니다. IAM 인증이 MSK 클러스터에서 활성 상태이고 인증을 위한 암호를 제공하지 않으면 Lambda는 자동으로 IAM 인증을 사용하도록 기본 설정됩니다. 사용자 또는 역할 기반 정책을 생성하고 배포하려면 IAM 콘솔 또는 API를 사용합니다. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서IAM 액세스 제어를 참조하세요.

Lambda가 MSK 클러스터에 연결하고 레코드를 읽고 기타 필수 작업을 수행할 수 있도록 하려면 함수의 실행 역할에 다음 권한을 추가합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

이러한 권한의 범위를 특정 클러스터, 주제 및 그룹으로 설정할 수 있습니다. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서Amazon MSK Kafka 작업을 참조하세요.

상호 TLS 인증

상호 TLS(mTLS)는 클라이언트와 서버 간의 양방향 인증을 제공합니다. 클라이언트는 서버가 클라이언트를 확인할 수 있도록 서버에 인증서를 보내고, 서버는 클라이언트가 서버를 확인할 수 있도록 클라이언트에 인증서를 보냅니다.

Amazon MSK의 경우 Lambda가 클라이언트 역할을 합니다. 클라이언트 인증서(Secrets Manager의 비밀 정보)를 구성하여 MSK 클러스터의 브로커로 Lambda를 인증합니다. 클라이언트 인증서는 서버의 신뢰 저장소에 있는 CA에서 서명해야 합니다. MSK 클러스터는 서버 인증서를 Lambda로 전송하여 Lambda로 브로커를 인증합니다. 서버 인증서는 AWS 신뢰 저장소에 있는 인증 기관(CA)에서 서명해야 합니다.

클라이언트 인증서를 생성하는 방법에 대한 지침은 Introducing mutual TLS authentication for Amazon MSK as an event source를 참조하세요.

Amazon MSK는 자체 서명된 서버 인증서를 지원하지 않습니다. Amazon MSK의 모든 브로커는 기본적으로 Lambda가 신뢰하는 Amazon Trust Services CA에서 서명한 퍼블릭 인증서를 사용하기 때문입니다.

Amazon MSK용 mTLS에 대한 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서상호 TLS 인증을 참조하세요.

mTLS 암호 구성

CLIENT_CERTIFICATE_TLS_AUTH 비밀 정보에 인증서 필드와 프라이빗 키 필드가 필요합니다. 암호화된 프라이빗 키의 경우 비밀 정보에 프라이빗 키 암호가 필요합니다. 인증서와 프라이빗 키는 모두 PEM 형식이어야 합니다.

참고

Lambda는 PBES1(PBES2가 아님) 프라이빗 키 암호화 알고리즘을 지원합니다.

인증서 필드에는 클라이언트 인증서부터 시작하여 중간 인증서가 이어지고 루트 인증서로 끝나는 인증서 목록이 포함되어야 합니다. 각 인증서는 다음 구조의 새 줄에서 시작해야 합니다.

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager는 최대 65,536바이트의 보안 정보를 지원하므로 긴 인증서 체인을 위한 충분한 공간입니다.

프라이빗 키는 다음 구조의 PKCS #8 형식이어야 합니다.

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

암호화된 프라이빗 키의 경우 다음 구조를 사용합니다.

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

다음 예제에서는 암호화된 프라이빗 키를 사용한 mTLS 인증용 비밀 정보 콘텐츠를 표시합니다. 암호화된 개인 키의 경우 비밀 정보에 프라이빗 키 암호를 포함합니다.

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Lambda이 부트스트랩 브로커를 선택하는 방법

Lambda는 클러스터에서 사용할 수 있는 인증 방법 및 인증을 위한 암호 제공 여부를 기반으로 부트스트랩 브로커를 선택합니다. mTLS 또는 SASL/SCRAM에 대한 암호를 제공하면 Lambda가 자동으로 해당 인증 방법을 선택합니다. 암호를 제공하지 않으면 Lambda가 클러스터에서 활성화된 가장 강력한 인증 방법을 선택합니다. 다음은 Lambda가 가장 강력한 인증부터 가장 약한 인증까지 브로커를 선택하는 우선 순위입니다.

  • mTLS(MTL에 제공되는 암호)

  • SASL/SCRAM(SASL/SCRAM에 대해 제공되는 암호)

  • SASL IAM(암호가 제공되지 않았으며 IAM 인증이 활성화됨)

  • 인증되지 않은 TLS(암호가 제공되지 않고 IAM 인증이 활성화되지 않음)

  • 일반 텍스트(암호가 제공되지 않고 IAM 인증 및 인증되지 않은 TLS가 모두 활성화되지 않음)

참고

Lambda가 가장 안전한 브로커 유형에 연결할 수 없는 경우 Lambda는 다른 (약한) 브로커 유형에 연결을 시도하지 않습니다. Lambda가 더 약한 브로커 유형을 선택하게 하려면 클러스터에서 더 강력한 인증 방법을 모두 비활성화하십시오.

API 액세스 및 권한 관리

Amazon MSK 클러스터에 액세스하는 것 외에도 함수에는 다양한 Amazon MSK API 작업을 수행할 수 있는 권한이 필요합니다. 이러한 권한을 함수의 실행 역할에 추가합니다. 사용자가 Amazon MSK API 작업에 액세스해야 하는 경우 사용자 또는 역할에 대한 자격 증명 정책에 필요한 권한을 추가합니다.

실행 역할에 다음과 같은 권한을 각각 수동으로 추가할 수 있습니다. 또는 AWS 관리형 정책 AWSLambdaMSKExecutionRole을 실행 역할에 연결할 수도 있습니다. AWSLambdaMSKExecutionRole 정책에는 아래 나열된 모든 필수 API 작업 및 VPC 권한이 포함되어 있습니다.

필요한 Lambda 함수 실행 역할 권한

Amazon CloudWatch Logs의 로그 그룹에 로그를 생성하고 저장하려면 Lambda 함수의 실행 역할에 다음 권한이 있어야 합니다.

Lambda가 사용자를 대신하여 Amazon MSK 클러스터에 액세스하기 위해서는 Lambda 함수의 실행 역할에 다음 권한이 있어야 합니다.

kafka:DescribeClusterkafka:DescribeClusterV2 중 하나만 추가하면 됩니다. 프로비저닝된 MSK 클러스터의 경우 두 권한 모두 작동합니다. 서버리스 MSK 클러스터의 경우 kafka:DescribeClusterV2을 사용해야 합니다.

참고

Lambda는 결국 관련된 AWSLambdaMSKExecutionRole 관리 정책에서 kafka:DescribeCluster 권한을 제거할 계획입니다. 이 정책을 사용하면, 대신 kafka:DescribeClusterV2을 사용하기 위해서는 kafka:DescribeCluster을 이용하는 애플리케이션을 모두 마이그레이션해야 합니다.

VPC 권한

VPC 내의 사용자만 Amazon MSK 클러스터에 액세스할 수 있는 경우 Lambda 함수에는 Amazon VPC 리소스에 액세스할 수 있는 권한이 있어야 합니다. 이러한 리소스에는 VPC, 서브넷, 보안 그룹 및 네트워크 인터페이스가 있습니다. 리소스에 액세스하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다. 이러한 권한은 AWSLambdaMSKExecutionRole AWS 관리형 정책에 포함됩니다.

선택적 Lambda 함수 권한

Lambda 함수에 또한 다음 권한이 필요할 수 있습니다.

  • SASL/SCRAM 인증을 사용하는 경우 SCRAM 암호에 액세스하십시오.

  • Secrets Manager 비밀 정보를 설명합니다.

  • AWS Key Management Service(AWS KMS) 고객 관리형 키에 액세스합니다.

  • 실패한 간접 호출 기록을 대상으로 전송합니다.

Secrets Manager 및 AWS KMS 권한

Amazon MSK 브로커에 대해 구성하는 액세스 제어 유형에 따라 Lambda 함수에는 SCRAM 비밀(SASL/SCRAM 인증 이용 시) 또는 Secrets Manager 암호 정보에 액세스하여 AWS KMS 고객 관리형 키를 복호화할 수 있는 권한이 필요할 수도 있습니다. 리소스에 액세스하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

대상으로 레코드 전송

실패한 간접 호출의 레코드를 장애 시 대상으로 전송하려면 Lambda 함수에 이러한 레코드를 전송할 권한이 있어야 합니다. Kafka 이벤트 소스 매핑의 경우 Amazon SNS 주제, Amazon SQS 대기열 또는 Amazon S3 버킷을 대상으로 선택할 수 있습니다. 레코드를 SNS 주제로 전송하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

레코드를 SQS 대기열로 전송하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

레코드를 S3 버킷으로 전송하려면 함수의 실행 역할에 다음 권한이 주어져야 합니다.

또한 대상에 KMS 키를 구성한 경우 대상 유형에 따라 Lambda에는 다음과 같은 권한이 필요합니다.

  • S3 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:GenerateDataKey가 필요합니다. KMS 키와 S3 버킷 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:GenerateDataKey를 허용합니다.

  • SQS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decryptkms:GenerateDataKey가 필요합니다. KMS 키와 SQS 대기열 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKeykms:ReEncrypt를 허용합니다.

  • SNS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decryptkms:GenerateDataKey가 필요합니다. KMS 키와 SNS 주제 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKeykms:ReEncrypt를 허용합니다.

실행 역할에 권한 추가

IAM 콘솔을 사용하여 AWS 관리형 정책 AWSLambdaMSKExecutionRole을 실행 역할에 추가하려면 다음 단계를 따릅니다.

AWS 관리형 정책을 추가하려면
  1. IAM 콘솔에서 정책 페이지를 엽니다.

  2. 검색 상자에 정책 이름(AWSLambdaMSKExecutionRole)을 입력합니다.

  3. 목록에서 정책을 선택한 다음 정책 작업(Policy actions), 연결(Attach)을 선택합니다.

  4. 정책 연결(Attach policy) 페이지의 목록에서 실행 역할을 선택한 다음 정책 연결을 선택합니다.

IAM 정책을 사용하여 사용자에게 액세스 권한 부여

기본적으로 사용자 및 역할은 Amazon MSK API 작업을 수행할 수 있는 권한이 없습니다. 조직 또는 계정의 사용자에게 액세스 권한을 부여하려면 자격 증명 기반 정책을 추가 혹은 업데이트할 수 있습니다. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서Amazon MSK 자격 증명 기반 정책 예제를 참조하세요.

인증 및 권한 부여 오류

Amazon MSK 클러스터의 데이터를 사용하는 데 필요한 권한이 누락된 경우 Lambda는 LastProcessingResult 아래의 이벤트 소스 매핑에 다음 오류 메시지 중 하나를 표시합니다.

클러스터가 Lambda를 인증하지 못함

SASL/SCRAM 또는 mMTS의 경우 이 오류는 제공된 사용자에게 다음 필수 Kafka 액세스 제어 목록(ACL) 권한이 모두 있지는 않음을 나타냅니다.

  • DescribeConfigs 클러스터

  • 그룹 설명

  • 그룹 읽기

  • 주제 설명

  • Thread-Topic

IAM 액세스 제어의 경우 함수의 실행 역할에 그룹 또는 주제에 액세스하는 데 필요한 하나 이상의 권한이 없습니다. IAM 역할 기반 인증에서 필요한 권한 목록을 검토합니다.

필수 Kafka 클러스터 권한이 있는 Kafka ACL 또는 IAM 정책을 생성할 때 주제와 그룹을 리소스로 지정합니다. 주제 이름은 이벤트 소스 매핑의 주제와 일치해야 합니다. 그룹 이름은 이벤트 소스 매핑의 UUID와 일치해야 합니다.

실행 역할에 필요한 권한을 추가한 후 변경 사항이 적용되기까지 몇 분 정도 소요될 수 있습니다.

SASL 인증 실패

SASL/SCRAM의 경우 이 오류는 제공된 사용자 이름과 암호가 유효하지 않음을 나타냅니다.

IAM 액세스 제어의 경우 실행 역할에 MSK 클러스터에 대한 kafka-cluster:Connect 권한이 없습니다. 역할에 이 권한을 추가하고 클러스터의 Amazon 리소스 이름(ARN)을 리소스로 지정합니다.

이 오류가 간헐적으로 발생하는 경우가 있습니다. TCP 연결 수가 Amazon MSK 서비스 할당량을 초과하면 클러스터는 연결을 거부합니다. Lambda는 연결이 성공할 때까지 취소하고 다시 시도합니다. Lambda가 클러스터에 연결하고 레코드를 폴링하면 마지막 처리 결과가 OK로 변경됩니다.

서버가 Lambda를 인증하지 못함

이 오류는 Amazon MSK Kafka 브로커가 Lambda로 인증하지 못했음을 나타냅니다. 이 오류는 다음과 같은 이유로 발생할 수 있습니다.

  • mTLS 인증을 위한 클라이언트 인증서를 제공하지 않았습니다.

  • 클라이언트 인증서를 제공했지만 브로커가 mTLS를 사용하도록 구성되지 않았습니다.

  • 브로커가 클라이언트 인증서를 신뢰하지 않습니다.

제공된 인증서 또는 프라이빗 키가 잘못됨

이 오류는 Amazon MSK 소비자가 제공된 인증서 또는 프라이빗 키를 사용할 수 없음을 나타냅니다. 인증서와 키가 PEM 형식을 사용하고 프라이빗 키 암호화가 PBES1 알고리즘을 사용하는지 확인합니다.

네트워크 구성

Kafka 브로커에 대한 Amazon VPC 액세스를 구성하는 경우 Lambda는 Kafka 클러스터가위치한 Amazon VPC 리소스에 대한 액세스 권한이 있어야 합니다. Lambda에 대한 AWS PrivateLink VPC 엔드포인트와 AWS Security Token Service(AWS STS)를 배포하는 것이 좋습니다. 브로커가 인증을 사용하는 경우 Secrets Manager용 VPC 엔드포인트도 배포합니다. 장애 시 대상을 구성한 경우 대상 서비스를 위한 VPC 엔드포인트도 배포합니다.

또는 Kafka 클러스터와 연결된 VPC에 퍼블릭 서브넷당 하나의 NAT 게이트웨이가 포함되는지 확인합니다. 자세한 내용은 VPC 연결 함수의 인터넷 및 서비스 액세스 단원을 참조하십시오.

또한 VPC 엔드포인트를 사용하는 경우 프라이빗 DNS 이름을 활성화하도록 구성해야 합니다.

다음 규칙(최소)으로 Amazon VPC 보안 그룹을 구성합니다.

  • 인바운드 규칙 - 이벤트 소스에 지정된 보안 그룹에 대해 Amazon MSK 브로커 포트(일반 텍스트의 경우 9092, TLS의 경우 9094, SASL의 경우 9096, IAM의 경우 9098)의 모든 트래픽을 허용합니다.

  • 아웃바운드 규칙 - 모든 대상에 대해 포트 443의 모든 트래픽을 허용합니다. 이벤트 소스에 지정된 보안 그룹에 대해 Amazon MSK 브로커 포트(일반 텍스트의 경우 9092, TLS의 경우 9094, SASL의 경우 9096, IAM의 경우 9098)의 모든 트래픽을 허용합니다.

  • NAT 게이트웨이 대신 VPC 엔드포인트를 사용하는 경우 VPC 엔드포인트와 연결된 보안 그룹은 이벤트 소스의 보안 그룹에서 포트 443의 모든 인바운드 트래픽을 허용해야 합니다.

참고

Amazon VPC 구성은 Amazon MSK API를 통해 검색할 수 있습니다. create-event-source-mapping 명령을 사용하여 설정하는 동안 이 옵션을 구성할 필요가 없습니다.

네트워크 구성에 대한 자세한 내용은 AWS 컴퓨팅 블로그의 Setting up AWS Lambda with an Apache Kafka cluster within a VPC를 참조하세요.

Amazon MSK를 이벤트 소스로 추가

이벤트 소스 매핑을 생성하려면 Lambda 콘솔, AWS SDK 또는 AWS Command Line Interface(AWS CLI)를 사용해 Amazon MSK를 Lambda 함수 트리거로 추가합니다. Amazon MSK를 트리거로 추가하면 Lambda는 Lambda 함수의 VPC 설정이 아니라 Amazon MSK 클러스터의 VPC 설정을 가정합니다.

이 단원에서는 Lambda 콘솔 및 AWS CLI를 사용해 이벤트 소스 매핑을 생성하는 방법을 설명합니다.

필수 조건

  • Amazon MSK 클러스터와 Kafka 주제. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서Amazon MSK 사용 시작하기를 참조하세요.

  • MSK 클러스터에서 사용하는 AWS 리소스에 액세스할 수 있는 권한이 있는 실행 역할입니다.

사용자 지정이 가능한 소비자 그룹 ID

Kafka를 이벤트 소스로 설정할 때 소비자 그룹 ID를 지정할 수 있습니다. 이 소비자 그룹 ID는 Lambda 함수에 가입하려는 Kafka 소비자 그룹의 기존 식별자입니다. 이 기능을 사용하여 진행 중인 모든 Kafka 레코드 처리 설정을 다른 소비자에서 Lambda로 원활하게 마이그레이션할 수 있습니다.

소비자 그룹 ID를 지정하고 해당 소비자 그룹 내에 다른 활성 폴러가 있는 경우 Kafka는 모든 소비자에게 메시지를 배포합니다. 즉, Lambda는 Kafka 주제에 대한 모든 메시지를 수신하지는 않습니다. Lambda가 주제에 있는 모든 메시지를 처리하도록 하려면 해당 소비자 그룹의 다른 모든 폴러를 끄십시오.

또한 소비자 그룹 ID를 지정했는데 Kafka가 동일한 ID를 가진 유효한 기존 소비자 그룹을 찾으면 Lambda는 이벤트 소스 매핑을 위한 StartingPosition 파라미터를 무시합니다. 대신 Lambda는 소비자 그룹의 커밋된 오프셋에 따라 레코드 처리를 시작합니다. 소비자 그룹 ID를 지정했는데 Kafka가 기존 소비자 그룹을 찾을 수 없는 경우, Lambda는 StartingPosition에서 지정된 대로 이벤트 소스를 구성합니다.

지정하는 소비자 그룹 ID는 모든 Kafka 이벤트 소스 중에서 고유해야 합니다. 지정된 소비자 그룹 ID로 Kafka 이벤트 소스 매핑을 생성한 후에는 이 값을 업데이트할 수 없습니다.

장애 시 대상

실패한 간접 호출 또는 Kafka 이벤트 소스의 과대 페이로드에 대한 기록을 유지하려면 함수에 장애 시 대상을 구성합니다. 간접 호출이 실패하면 Lambda는 간접 호출의 세부 정보가 포함된 JSON 레코드를 대상으로 전송합니다.

Amazon SNS 주제, Amazon SQS 대기열 또는 Amazon S3 버킷 중에서 대상으로 선택할 수 있습니다. SNS 주제 또는 SQS 대기열 대상의 경우, Lambda는 레코드 메타데이터를 대상으로 전송합니다. S3 버킷 대상의 경우, Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 대상으로 전송합니다.

Lambda가 선택한 목적지로 레코드를 성공적으로 전송하려면 함수의 실행 역할에 관련 권한이 포함되어야 합니다. 또한 표에는 각 대상 유형이 JSON 간접 호출 레코드를 수신하는 방법도 설명되어 있습니다.

대상 유형 다음 이벤트 소스에서 지원 필요한 권한 대상별 JSON 형식

Amazon SQS 대기열

  • Kinesis

  • DynamoDB

  • 자체 관리형 Apache Kafka 및 관리형 Apache Kafka

Lambda는 간접 호출 레코드 메타데이터를 Message로 대상에 전달합니다.

Amazon SNS 주제

  • Kinesis

  • DynamoDB

  • 자체 관리형 Apache Kafka 및 관리형 Apache Kafka

Lambda는 간접 호출 레코드 메타데이터를 Message로 대상에 전달합니다.

Amazon S3 버킷

  • 자체 관리형 Apache Kafka 및 관리형 Apache Kafka

Lambda는 간접 호출 레코드와 메타데이터와 함께 대상에 저장합니다.

작은 정보

가장 좋은 방법은 실행 역할에만 필요한 최소 권한을 포함하는 것입니다.

SNS 및 SQS 대상

다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 SQS 대기열 대상으로 전송하는 내용을 보여줍니다. recordsInfo 아래의 각 키에는 하이픈으로 구분된 Kafka 주제와 파티션이 모두 포함되어 있습니다. 예를 들어, "Topic-0" 키의 경우 Topic은 Kafka 주제이고 0은 파티션입니다. 각 주제 및 파티션에 대해 오프셋과 타임스탬프 데이터를 사용하여 원래의 간접 호출 레코드를 찾을 수 있습니다.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 대상

S3 대상의 경우, Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 대상으로 전송합니다. 다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 S3 버킷 대상으로 전송하는 것을 보여줍니다. SQS 및 SNS 대상에 대한 이전 예제의 모든 필드 외에도 payload 필드에는 원래 간접 호출 레코드가 이스케이프된 JSON 문자열로 포함되어 있습니다.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
작은 정보

대상 버킷에서 S3 버전 관리를 활성화하는 것이 좋습니다.

장애 시 대상 구성

이 콘솔을 사용하여 장애 시 대상을 구성하려면 다음 단계를 따르세요.

  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.

  4. 소스의 경우 이벤트 소스 매핑 간접 호출을 선택합니다.

  5. 이벤트 소스 매핑의 경우 이 함수에 대해 구성된 이벤트 소스를 선택합니다.

  6. 조건의 경우 실패 시를 선택합니다. 이벤트 소스 매핑 간접 호출의 경우 이 조건만 수락됩니다.

  7. 대상 유형의 경우 Lambda가 간접 호출 레코드를 전송할 대상 유형을 선택합니다.

  8. Destination(대상)에서 리소스를 선택합니다.

  9. 저장을 선택합니다.

Lambda API를 사용하여 장애 시 대상을 구성할 수도 있습니다. 예를 들어, 다음과 같은 CreateEventSourceMapping CLI 명령은 SQS 실패 시 목적지를 MyFunction에 추가합니다.

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

다음 UpdateEventSourceMapping CLI 명령은 uuid 입력과 연결된 Kafka 이벤트 소스에 S3 장애 시 대상을 추가합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

대상을 제거하려면 destination-config 파라미터의 인수로 빈 문자열을 제공합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon MSK 트리거 추가(콘솔)

다음 단계에 따라 Amazon MSK 클러스터 및 Kafka 주제를 Lambda 함수의 트리거로 추가합니다.

Lambda 함수에 Amazon MSK 트리거를 추가하려면(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. Lambda 함수의 이름을 선택합니다.

  3. 함수 개요(Function overview)에서 트리거 추가(Add trigger)를 선택합니다.

  4. 트리거 구성에서 다음을 수행합니다.

    1. MSK 트리거 유형을 선택합니다.

    2. MSK 클러스터에 해당 클러스터를 선택합니다.

    3. 배치 크기(Batch size)에 단일 배치에서 검색할 최대 메시지 수를 입력합니다.

    4. Batch window에서 Lambda가 함수를 호출하기 전에 레코드를 수집하는 데 걸리는 최대 시간(초)을 입력합니다.

    5. 주제 이름에 Kafka 주제의 이름을 입력합니다.

    6. (선택 사항) Consumer group ID에서 가입할 Kafka 소비자 그룹의 ID를 입력합니다.

    7. (선택 사항) 시작 위치의 경우 최신 레코드에서 스트림 읽기를 시작하려면 최신을 선택하고, 사용 가능한 가장 빠른 레코드에서 시작하려면 수평 트리밍을 선택하고, 읽기를 시작할 타임스탬프를 지정하려면 타임스탬프를 선택합니다.

    8. (선택 사항) 인증(Authentication)에서 MSK 클러스터의 브로커로 인증하기 위한 비밀 키를 선택합니다.

    9. 테스트를 위해 트리거를 비활성화된 상태에서 생성하려면(권장됨) 트리거 사용을 선택 해제합니다. 또는 트리거를 즉시 사용하려면 트리거 사용을 선택합니다.

  5. 트리거를 생성하려면 추가를 선택합니다.

Amazon MSK 트리거 추가(AWS CLI)

다음 예제 AWS CLI 명령을 사용하여 Lambda 함수에 대한 Amazon MSK 트리거를 생성하고 확인합니다.

AWS CLI를 사용하여 트리거 생성

예 — IAM 인증을 사용하는 클러스터의 이벤트 소스 매핑 생성

다음 예제에서는 create-event-source-mapping AWS CLI 명령을 사용해 my-kafka-function이라는 Lambda 함수를 AWSKafkaTopic이라는 Kafka 주제에 매핑합니다. 주제의 시작 위치를 LATEST으로 설정합니다. 클러스터가 IAM 역할 기반 인증을 사용하는 경우 SourceAccessConfiguration 객체가 필요하지 않습니다. 예제

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
예 — SASL/SCRAM 인증을 사용하는 클러스터의 이벤트 소스 매핑 생성

클러스터가 SASL/SCRAM 인증을 사용하는 경우 SourceAccessConfiguration 객체를 포함하며 이는 SASL_SCRAM_512_AUTH 및 Secrets Manager 비밀 ARN을 지정합니다.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
예 — mTLS 인증을 사용하는 클러스터의 이벤트 소스 매핑 생성

클러스터가 mTLS을 사용하는 경우 SourceAccessConfiguration 객체를 포함하며 이는 CLIENT_CERTIFICATE_TLS_AUTH 및 Secrets Manager 비밀 ARN을 지정합니다.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

자세한 내용은 CreateEventSourceMapping API 참조 문서를 참조하세요.

AWS CLI를 사용하여 상태 확인

다음 예제에서는 get-event-source-mapping AWS CLI 명령을 사용해 생성한 이벤트 소스 매핑의 상태를 설명합니다.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

계정 간 이벤트 소스 매핑 생성

다중 VPC 프라이빗 연결을 사용하여 Lambda 함수를 다른 AWS 계정의 클러스터에 프로비저닝된 MSK 클러스터에 연결할 수 있습니다. 다중 VPC 연결은 AWS PrivateLink를 사용하며, 이는 모든 트래픽을 AWS 네트워크 내에 유지합니다.

참고

서버리스 MSK 클러스터에는 계정 간 이벤트 소스 매핑을 생성할 수 없습니다.

계정 간 이벤트 소스 매핑을 생성하려면 먼저 MSK 클러스터의 다중 VPC 연결을 구성해야 합니다. 이벤트 소스 매핑을 생성할 때는 다음 예에서 표시된 것처럼 클러스터 ARN 대신 관리형 VPC 연결 ARN을 사용합니다. CreateEventSourceMapping 작업도 MSK 클러스터가 사용하는 인증 유형에 따라 달라집니다.

예 — IAM 인증을 사용하는 클러스터의 계정 간 이벤트 소스 매핑 생성

클러스터가 IAM 역할 기반 인증을 사용하는 경우 SourceAccessConfiguration 객체가 필요하지 않습니다. 예제

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
예 — SASL/SCRAM 인증을 사용하는 클러스터의 계정 간 이벤트 소스 매핑 생성

클러스터가 SASL/SCRAM 인증을 사용하는 경우 SourceAccessConfiguration 객체를 포함하며 이는 SASL_SCRAM_512_AUTH 및 Secrets Manager 비밀 ARN을 지정합니다.

SASL/SCRAM 인증을 통한 계정 간 Amazon MSK 이벤트 소스 매핑에 암호를 사용하는 방법에는 두 가지가 있습니다.

  • Lambda 함수 계정에서 시크릿을 생성하고 클러스터 암호와 동기화합니다. 로테이션을 생성하여 두 암호가 동기화된 상태를 유지합니다. 이 옵션을 사용하면 함수 계정에서 시크릿을 제어할 수 있습니다.

  • MSK 클러스터와 연결된 암호를 사용합니다. 이 암호는 Lambda 함수 계정에 대한 교차 계정 액세스를 허용해야 합니다. 자세한 내용은 다른 계정에 있는 사용자의 AWS Secrets Manager 암호에 대한 권한을 참조하세요.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
예 — mTLS 인증을 사용하는 클러스터의 계정 간 이벤트 소스 매핑 생성

클러스터가 mTLS을 사용하는 경우 SourceAccessConfiguration 객체를 포함하며 이는 CLIENT_CERTIFICATE_TLS_AUTH 및 Secrets Manager 비밀 ARN을 지정합니다. 암호는 클러스터 계정 또는 Lambda 함수 계정에 저장할 수 있습니다.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Amazon MSK 이벤트 소스의 Auto Scaling

Amazon MSK 이벤트 소스를 처음 생성하면 Lambda는 Kafka 주제의 모든 파티션을 처리할 한 명의 소비자를 할당합니다. 각 소비자는 증가한 워크로드를 처리하기 위해 여러 프로세서를 병렬로 실행합니다. 그뿐 아니라 Lambda는 워크로드에 따라 소비자 수를 자동으로 늘리거나 줄입니다. 각 파티션에서 메시지 순서를 유지하기 위해 최대 소비자 수는 주제의 파티션당 하나의 소비자입니다.

Lambda는 1분 간격으로 주제의 모든 파티션의 소비자 오프셋 지연을 평가합니다. 지연이 너무 높으면 파티션은 Lambda가 메시지를 처리할 수 있는 것보다 더 빠르게 메시지를 수신합니다. 필요에 따라 Lambda는 주제에 소비자를 추가하거나 제거합니다. 소비자를 추가 또는 제거하는 크기 조정 프로세스는 평가 후 3분 이내에 진행됩니다.

대상 Lambda 함수가 제한되면 Lambda는 소비자 수를 줄입니다. 이 동작은 소비자가 검색하고 함수에 보낼 수 있는 메시지 수를 줄임으로써 함수의 워크로드를 줄입니다.

Kafka 주제의 처리량을 모니터링하려면 Lambda에서 함수가 레코드를 처리하는 동안 내보내는 오프셋 지연 지표를 확인합니다.

얼마나 많은 함수 호출이 병렬로 발생하는지 확인하기 위해 함수에 대한 동시성 지표를 확인할 수도 있습니다.

폴링 및 스트리밍 시작 위치

이벤트 소스 매핑 생성 및 업데이트 중 스트림 폴링은 최종적으로 일관됩니다.

  • 이벤트 소스 매핑 생성 중 스트림에서 이벤트 폴링을 시작하는 데 몇 분 정도 걸릴 수 있습니다.

  • 이벤트 소스 매핑 업데이트 중 스트림에서 이벤트 폴링을 중지했다가 다시 시작하는 데 몇 분 정도 걸릴 수 있습니다.

이 동작은 스트림의 시작 위치로 LATEST를 지정하면 이벤트 소스 매핑이 생성 또는 업데이트 중에 이벤트를 놓칠 수 있음을 의미합니다. 누락된 이벤트가 없도록 하기 위해서는 스트림 시작 위치를 TRIM_HORIZON 또는 AT_TIMESTAMP로 지정하세요.

Amazon CloudWatch 지표

Lambda는 함수가 레코드를 처리하는 동안 OffsetLag 지표를 내보냅니다. 이 지표의 값은 Kafka 이벤트 소스 주제에 작성된 마지막 레코드와 함수의 소비자 그룹에서 처리한 마지막 레코드 간의 오프셋 차이입니다. 레코드가 추가되는 시점과 소비자 그룹이 이를 처리하는 시점 사이의 지연 시간을 추정하는 데 OffsetLag를 사용할 수 있습니다.

OffsetLag의 증가 추세는 함수의 소비자 그룹 폴러에 문제가 있음을 나타낼 수 있습니다. 자세한 내용은 Lambda 함수 지표 작업 단원을 참조하십시오.

Amazon MSK 구성 파라미터

모든 Lambda 이벤트 소스 유형은 동일한 CreateEventSourceMappingUpdateEventSourceMapping API 작업을 공유합니다. 그러나 일부 파라미터만 Amazon MSK에 적용됩니다.

Amazon MSK에 적용되는 이벤트 소스 파라미터
파라미터 필수 기본값 참고

AmazonManagedKafkaEventSourceConfig

N

기본적으로 고유한 값으로 설정되는 소비자 그룹 ID 필드를 포함합니다.

생성 시에만 설정할 수 있음

BatchSize

N

100

최대값: 10,000

활성

N

활성

EventSourceArn

Y

생성 시에만 설정할 수 있음

FunctionName

Y

FilterCriteria

N

Lambda 이벤트 필터링

MaximumBatchingWindowInSeconds

N

500ms

일괄 처리 동작

SourceAccessConfigurations

N

자격 증명 없음

이벤트 소스에 대한 CLIENT_CERTIFICATE_TLS_AUTH(MutualTLS) 또는 SASL/SCRAM

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, 또는 LATEST

생성 시에만 설정할 수 있음

StartingPositionTimestamp

N

StartingPosition이 AT_TIMESTAMP로 설정된 경우에만 필요합니다.

주제

Y

Kafka 주제 이름

생성 시에만 설정할 수 있음