セルフマネージド型の Apache Kafka で Lambda を使用する - AWS Lambda

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

セルフマネージド型の Apache Kafka で Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Lambdaは、Apache Kafkaイベントソースソースとしてサポートしています。Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。

ユーザーは、AWS マネージドの Kafka サービス、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、またはセルフマネージドの Kafka クラスターを使用できます。Amazon MSK で Lambda を使用する方法の詳細については、Amazon MSK で Lambda を使用する を参照してください。

このトピックでは、セルフマネージド型の Kafka クラスターで Lambda を使用する方法を説明します。AWS の用語集では、セルフマネージド型クラスターには、非 AWS のホストされた Kafka クラスターが含まれています。たとえば、お使いの Kafka クラスターを、Confluent Cloud などのクラウドプロバイダーでホストすることが可能です。

イベントソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、バッチの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Kafka ベースのイベントソースの場合、Lambda はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。詳しくは、「バッチ処理動作」を参照してください。

セルフマネージド型 Kafka をイベントソースとして使用する方法の例については、AWS Compute Blog の Using self-hosted Apache Kafka as an event source for AWS Lambda を参照してください。

イベントの例

Lambda は、Lambda 関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Kafka トピックと Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Kafka クラスター認証

Lambda は、セルフマネージド型 Apache Kafka クラスターで認証するための方法をいくつかサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Kafka クラスターを設定しておいてください。Kafka セキュリティの詳細については、Kafka ドキュメントの「Security」(セキュリティ) セクションを参照してください。

VPC アクセス

VPC 内の Kafka ユーザーのみが Kafka ブローカーにアクセスする場合は、Amazon Virtual Private Cloud (Amazon VPC) アクセス用に Kafka イベントソースを設定する必要があります。

SASL/SCRAM 認証

Lambda は、Transport Layer Security (TLS) 暗号化 (SASL_SSL) を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda は、暗号化された認証情報を送信してクラスターで認証します。Lambda は plaintext の SASL/PLAIN (SASL_PLAINTEXT) をサポートしません。SASL/SCRAM 認証の詳細については、「RFC 5802」を参照してください。

Lambda は SASL/PLAIN 認証もサポートします。このメカニズムはクリアテキスト認証情報を使用するので、この認証情報が保護されることを確実にするためにも、サーバーへの接続には TLS 暗号化を使用する必要があります。

SASL 認証の場合は、サインイン認証情報をシークレットとして AWS Secrets Manager に保存します。Secrets Manager の使用に関する詳細については、「AWS Secrets Manager ユーザーガイド」の「チュートリアル: シークレットの作成と取得」を参照してください。

重要

認証に Secrets Manager を使用するには、シークレットを Lambda 関数と同じ AWS リージョンに保存する必要があります。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka では、Lambda がクライアントとして機能します。Kafka ブローカーで Lambda を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、サーバーのトラストストア内の CA によって署名される必要があります。

Kafka クラスターは、Lambda で Kafka ブローカーを認証するために Lambda にサーバー証明書を送信します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、Lambda トラストストア内の認証局 (CA) によって署名される必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。Lambda はルート証明書を使用して Kafka ブローカーを検証します。

mTLS の詳細については、「Introducing mutual TLS authentication for Amazon MSK as an event source」(イベントソースとしての Amazon MSK のための相互 TLS の紹介) を参照してください。

クライアント証明書シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

Lambda は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

サーバルート CA 証明書シークレットの設定

このシークレットは、Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、PEM 形式の Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

API アクセスと許可の管理

セルフマネージド Kafka クラスターへのアクセスに加えて、Lambda 関数にはさまざまな API アクションを実行するための許可が必要です。これらの許可は、関数の実行ロールに追加します。ユーザーが API アクションのいずれかにアクセスする必要がある場合は、AWS Identity and Access Management (IAM) ユーザーまたはロールのアイデンティティポリシーに必要な許可を追加します。

Lambda 関数に必要なアクセス許可

Amazon CloudWatch Logs のロググループでログを作成して保存するには、Lambda 関数の実行ロールに以下の許可が必要です。

Lambda 関数のオプションのアクセス許可

Lambda 関数には、以下を実行する許可も必要になる場合があります。

  • Secrets Manager シークレットを記述する。

  • AWS Key Management Service (AWS KMS) カスタマー管理のキーにアクセスする。

  • Amazon VPC にアクセスする。

  • 失敗した呼び出しのレコードを送信先に送信します。

Secrets Manager と AWS KMS 許可

Kafka ブローカーに設定しているアクセスコントロールのタイプに応じて、Lambda 関数には Secrets Manager シークレットにアクセスするための許可、または AWS KMS カスタマーマネージドキーを復号化するための許可が必要になる場合があります。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

VPC アクセス許可

セルフマネージド Apache Kafka クラスターにアクセスできるのが VPC 内のユーザーのみである場合、Lambda 関数には Amazon VPC リソースにアクセスするための許可が必要です。これらのリソースには、VPC、サブネット、セキュリティグループ、ネットワークインターフェイスが含まれます。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

レコードを送信先に送信する

失敗した呼び出しのレコードを失敗時の送信先に送信する場合、Lambda 関数にこれらのレコードを送信する権限が必要です。Kafka イベントソースマッピングでは、Amazon SNS トピック、Amazon SQS キュー、または Amazon S3 バケットのいずれかを、送信先として選択できます。レコードを SNS トピックに送信するには、関数の実行ロールに以下のアクセス許可が必要です。

レコードを SQS キューに送信するには、関数の実行ロールに以下のアクセス許可が必要です。

レコードを S3 バケットに送信するには、関数の実行ロールに以下のアクセス許可が必要です。

さらに、送信先に KMS キーを設定した場合、Lambda には送信先のタイプに応じて以下のアクセス許可が必要です。

  • S3 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:GenerateDataKey が必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。

  • SQS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decrypt および kms:GenerateDataKey が必要です。KMS キーと SQS キューの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。

  • SNS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decryptkms:GenerateDataKey が必要です。KMS キーと SNS トピックの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。

実行ロールへのアクセス許可の追加

セルフマネージド型 Apache Kafka クラスターが使用するその他の AWS サービスにアクセスするために、Lambda は、関数の実行ロールで定義されたアクセス許可ポリシーを使用します。

デフォルトでは、Lambda は、セルフマネージド型 Apache Kafka クラスターに対して、必須のまたはオプションのアクションを実行することはできません。これらのアクションを IAM 信頼ポリシーで作成および定義し、そのポリシーを実行ロールにアタッチする必要があります。この例では、Lambda に Amazon VPC リソースへのアクセスを許可する、ポリシーの作成方法を紹介します。

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

IAM コンソールで JSON ポリシードキュメントを作成する方法については、IAM ユーザーガイド[JSON] タブでのポリシーの作成を参照してください。

IAM ポリシーを使用したユーザーアクセスの許可

デフォルトでは、ユーザーおよびロールにはイベントソースの API オペレーションを実行するアクセス許可がありません。組織またはアカウント内のユーザーにアクセス権を付与するには、アイデンティティベースのポリシーを作成または更新します。詳細については、IAM ユーザーガイドポリシーを使用した AWS リソースへのアクセスのコントロールを参照してください。

認証と認可のエラー

Kafka クラスターからのデータを消費するために必要な許可のいずれかが欠落している場合、Lambda は [LastProcessingResult] のイベントソースマッピングに以下のエラーメッセージのいずれかを表示します。

クラスターが Lambda の認可に失敗した

SALS/SCRAM または mTLS の場合、このエラーは、指定されたユーザーが以下の必要とされる Kafka アクセスコントロールリスト (ACL) 許可のすべてを持っていないことを示します。

  • DescribeConfigs クラスター

  • グループを記述する

  • グループを読み取る

  • トピックを記述する

  • トピックを読み取る

必要な kafka-cluster 許可を使用して Kafka ACL を作成するときは、リソースとしてトピックとグループを指定します。トピック名は、イベントソースマッピングのトピックと一致する必要があります。グループ名は、イベントソースマッピングの UUID と一致する必要があります。

必要な許可を実行ロールに追加した後は、変更が有効になるまで数分間かかる場合があります。

SASL 認証に失敗した

SASL/SCRAM または SASL/PLAIN の場合、このエラーは指定されたサインイン認証情報が無効であることを示します。

Server failed to authenticate Lambda (サーバーが Lambda の認証に失敗しました)

このエラーは、Kafka ブローカーが Lambda の認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • mTLS 認証用のクライアント証明書を提供していない。

  • クライアント証明書を提供したが、Kafka ブローカーが mTLS 認証を使用するように設定されていない。

  • クライアント証明書が Kafka ブローカーに信頼されていない。

Lambda failed to authenticate server (Lambda がサーバーの認証に失敗しました)

このエラーは、Lambda が Kafka ブローカーの認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • Kafka ブローカーは自己署名証明書またはプライベート CA を使用するが、サーバールート CA 証明書を提供しなかった。

  • サーバールート CA 証明書が、ブローカーの証明書に署名したルート CA と一致しない。

  • ブローカーの証明書にサブジェクトの別名としてブローカーの DNS 名または IP アドレスが含まれていないため、ホスト名の検証が失敗した。

Provided certificate or private key is invalid (提供された証明書またはプライベートキーが無効です)

このエラーは、Kafka コンシューマーが提供された証明書またはプライベートキーを使用できなかったことを示します。証明書とキーが PEM 形式を使用しており、プライベートキーの暗号化が PBES1 アルゴリズムを使用していることを確認してください。

ネットワーク構成

Kafka ブローカーへの Amazon VPC アクセスを設定する場合、Lambda は Kafka クラスターが存在する Amazon VPC リソースにアクセスできる必要があります。Lambda と AWS Security Token Service (AWS STS) に AWS PrivateLink VPC endpoints をデプロイすることが推奨されます。ブローカーが認証を使用する場合は、Secrets Manager 用の VPC エンドポイントもデプロイします。失敗時の送信先を設定した場合は、送信先サービスの VPC エンドポイントもデプロイします。

または、Kafka クラスターに関連付けられた VPC に、パブリックサブネットごとに 1 つの NAT ゲートウェイが含まれていることを確認します。詳細については、「VPC に接続した関数のインターネットアクセスとサービスアクセス」を参照してください。

VPC エンドポイントを使用する場合は、プライベート DNS 名を有効にするように設定する必要もあります。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – イベントソース用に指定されたセキュリティグループに対して Kafka ブローカーポート上のすべてのトラフィックを許可します。Kafka は、デフォルトでポート 9092 を使用します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。イベントソース用に指定されたセキュリティグループに対して Kafka ブローカーポート上のすべてのトラフィックを許可します。Kafka は、デフォルトでポート 9092 を使用します。

  • NAT ゲートウェイの代わりに VPC エンドポイントを使用している場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

ネットワークの設定方法の詳細については、AWS Compute Blog の Setting up AWS Lambda with an Apache Kafka cluster within a VPC を参照してください。

Kafka クラスターをイベントソースとして追加する

イベントソースマッピングを作成するには、Lambda コンソール、AWSSDK、またはAWS Command Line Interface(AWS CLI) を使用して、Kafka クラスターを Lambda 関数のトリガーとして追加します。

このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。

前提条件

  • セルフマネージド型 Apache Kafka クラスター。Lambda は、Apache Kafka バージョン 0.10.1.0 以降をサポートしています。

  • セルフマネージド Kafka クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

カスタマイズ可能なコンシューマーグループ ID

Kafkaをイベントソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、Lambda は Kafka トピックのメッセージをすべて受け取るわけではありません。Lambda にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition パラメーターを無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition を使用してイベントソースを設定します。

指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。

障害発生時の送信先

Kafka イベントソースからの失敗した呼び出しやサイズが大きすぎるペイロードの記録を保持するには、障害発生時の送信先を関数に設定します。呼び出しが失敗すると、Lambda は呼び出しの詳細を含む JSON レコードを送信先に送ります。

Amazon SNS トピック、Amazon SQS キュー、または Amazon S3 バケットのいずれかを送信先として選択できます。SNS トピックまたは SQS キューの送信先の場合、Lambda はレコードメタデータを送信先に送信します。S3 バケットの送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。

Lambda がレコードを選択した送信先に正常に送信するには、関数の実行ロールに関連するアクセス許可が含まれていることを確認してください。この表では、各送信先タイプで JSON 呼び出しレコードを受け取る方法についても説明しています。

送信先タイプ サポートされるイベントソース 必要なアクセス許可 宛先固有の JSON 形式

Amazon SQS キュー

  • Kinesis

  • DynamoDB

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードのメタデータを Message として送信先に渡します。

Amazon SNS トピック

  • Kinesis

  • DynamoDB

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードのメタデータを Message として送信先に渡します。

Amazon S3 バケット

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードをメタデータと共に送信先に保存します。

ヒント

ベストプラクティスとして、実行ロールにのみ必要な最小限のアクセス許可を含めます。

SNS と SQS の送信先

以下の例は、Kafka イベントソース呼び出しが失敗した場合に Lambda が SNS トピックまたは SQS キューの送信先に送信する内容を示しています。recordsInfo の各キーには、Kafka トピックとパーティションの両方がハイフンで区切られて含まれています。例えば、キー "Topic-0" の場合、Topic は Kafka トピック、0 はパーティションです。各トピックとパーティションについて、オフセットとタイムスタンプデータを使用して元の呼び出しレコードを検索できます。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 の送信先

S3 の送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。以下の例は、Kafka イベントソース呼び出しが失敗した場合に、Lambda が S3 バケットの送信先に送信することを示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、payload フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
ヒント

送信先バケットで S3 バージョニングを有効にすることをお勧めします。

障害発生時の送信先の設定

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [ソース] には、[イベントソースマッピング呼び出し] を選択します。

  5. [イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。

  6. [条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

  7. [送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

  8. [送信先] で、リソースを選択します。

  9. [Save] を選択します。

Lambda API を使用して障害発生時の送信先を設定することもできます。例えば、以下の CreateEventSourceMapping CLI コマンドは、SQS 障害発生時の送信先を MyFunction に追加します。

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

以下の UpdateEventSourceMapping CLI コマンドは、S3 障害発生時の送信先を、入力 uuid に関連付けられた Kafka イベントソースに追加します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

送信先を削除するには、destination-config パラメータの引数として空の文字列を指定します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

セルフマネージド型 Kafka クラスターを追加する (コンソール)

セルフマネージド型 Apache Kafka クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Apache Kafka トリガーを Lambda 関数に追加するには (コンソール)
  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. Apache Kafka のトリガータイプを選択します。

    2. [Bootstrap servers] (ブートストラップサーバー) に、クラスター内の Kafka ブローカーのホストおよびポートのペアアドレスを入力し、[Add] (追加) を選択します。クラスター内の各 Kafka ブローカーで上記を繰り返します。

    3. [Topic name] (トピック名) に、クラスター内のレコードの保存に使用する Kafka トピックの名前を入力します。

    4. (オプション) [Batch size] (バッチサイズ) に、単一のバッチで取得できるメッセージの最大数を入力します。

    5. バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

    6. (オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。

    7. (オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。

    8. (オプション) [VPC] で、Kafka クラスターに Amazon VPC を選択します。次に、[VPC subnets] (VPC サブネット) と [VPC security groups] (VPC セキュリティグループ) を選択します。

      VPC 内のユーザーのみがブローカーにアクセスする場合、この設定は必須です。

    9. (オプション) [Authentication] (認証) で [Add] (追加) をクリックしてから、以下を実行します。

      1. クラスター内の Kafka ブローカーのアクセスまたは認証プロトコルを選択します。

        • Kafka ブローカーが SASL/PLAIN 認証を使用する場合は、[BASIC_AUTH] を選択します。

        • ブローカーが SALS/SCRAM 認証を使用する場合は、[SASL_SCRAM] プロトコルのいずれかを選択します。

        • mTLS 認証を設定している場合は、[CLIENT_CERTIFICATE_TLS_AUTH] プロトコルを選択します。

      2. SASL/SCRAM または mTLS 認証の場合は、Kafka クラスターの認証情報が含まれる Secrets Manager シークレットキーを選択します。

    10. (オプション) Kafka ブローカーがプライベート CA によって署名された証明書を使用する場合、[Encryption] (暗号化) には Kafka ブローカーが TLS 暗号化に使用するルート CA 証明書が含まれる Secrets Manager シークレットを選択します。

      この設定は、SASL/SCRAM または SASL/PLAIN の TLS 暗号化、および mTLS 認証に適用されます。

    11. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

セルフマネージド型 Kafka クラスターを追加する (AWS CLI)

Lambda 関数のセルフマネージド型 Apache Kafka トリガーを作成および表示するには、次の AWS CLI コマンドの例を使用します。

SASL/SCRAM を使用する

Kafka ユーザーがインターネット経由で Kafka ブローカーにアクセスする場合は、SASL/SCRAM 認証用に作成した Secrets Manager シークレットを指定します。次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

VPC の使用

Kafka ブローカーにアクセスするのが VPC 内の Kafka ユーザーのみである場合、VPC、サブネット、および VPC セキュリティグループを指定する必要があります。次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Kafka クラスターをイベントソースとして使用する

Apache Kafka クラスターを Lambda 関数のトリガーとして追加すると、クラスターはイベントソースソースとして使用されます。

Lambda は、ユーザーが指定した StartingPosition に基づいて、CreateEventSourceMapping リクエストで Topics として指定された Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。

StartingPositionLATEST として指定すると、Lambda は、そのトピックに属する各パーティション内の最新のメッセージから読み取りを開始します。トリガーが設定されてから Lambda がメッセージの読み取りを開始するまでには若干の遅延が発生することがあるため、Lambda はこの期間中に生成されたメッセージを読み取りません。

Lambda は、指定された 1 つ、または複数の Kafka トピックパーティションからのレコードを処理し、関数に JSON ペイロードを送信します。利用可能なレコードが増えると、Lambda は関数がトピックに追いつくまで、CreateEventSourceMapping リクエストで指定された BatchSize 値に基づいて、バッチ内のレコードの処理を継続します。

関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。すべての再試行が失敗したレコードを、障害発生時の送信先に送信して、後で処理することができます。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

Kafka イベントソースのオートスケーリング

初めて Apache Kafka イベントソースを作成するときは、Lambda が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、Lambda は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは Lambda で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、Lambda はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットの Lambda 関数がオーバーロードすると、Lambda はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

Kafka トピックのスループットを監視するには、consumer_lagconsumer_offset などの Apache Kafka コンシューマーラグメトリックスを表示します。いくつの関数呼び出しが並行して発生しているかを確認するときは、関数の同時実行メトリクスも監視します。

イベントソースの API オペレーション

Kafka クラスターを、Lambda コンソール、AWS SDK、または AWS CLI を使用する Lambda 関数のイベントソースとして追加する場合、Lambda は API を使用してリクエストを処理します。

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

イベントソースマッピングエラー

Apache Kafka クラスターを Lambda 関数のイベントソースとして追加すると、関数でエラーが発生した場合、Kafka コンシューマーはレコードの処理を停止します。トピックパーティションのコンシューマーは、レコードのサブスクライブ、読み取り、処理を行います。その他の Kafka コンシューマーは、同じエラーが発生しない限り、レコードの処理を続行できます。

停止したコンシューマの原因を特定するには、StateTransitionReason のレスポンスの EventSourceMapping フィールドを確認します。以下は、受け取る可能性があるイベントソースエラーを説明するリストです。

ESM_CONFIG_NOT_VALID

イベントソースマッピングの設定が無効です。

EVENT_SOURCE_AUTHN_ERROR

Lambda がイベントソースを認証できませんでした。

EVENT_SOURCE_AUTHZ_ERROR

Lambda にイベントソースへのアクセスに必要な許可がありません。

FUNCTION_CONFIG_NOT_VALID

関数の設定が無効です。

注記

Lambda のイベントレコードが許容サイズ制限である 6 MB を超えると、未処理になります。

Amazon CloudWatch メトリクス

Lambda は、関数がレコードを処理している間に OffsetLag メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと関数のコンシューマグループが処理した最後のレコードの間のオフセットの差分です。レコードが追加されてからコンシューマグループがそれを処理するまでのレイテンシーを見積もるには、OffsetLag を使用できます。

OffsetLag の増加傾向は、関数のコンシューマグループに問題があることを示している可能性があります。詳細については、「Lambda 関数のメトリクスの使用」を参照してください。

セルフマネージド Apache Kafka の設定パラメータ

すべての Lambda イベントソースタイプは、同じ CreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Apache Kafka に適用されるのは一部のパラメータのみです。

セルフマネージド型 Apache Kafka に適用されるイベントソースパラメータ
Parameter 必須 デフォルト メモ

BatchSize

N

100

最大: 10,000

有効

N

有効

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

SelfManagedEventSource

Y

Kafka ブローカー一覧 作成時にのみ設定可能

SelfManagedKafkaEventSourceConfig

N

ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。

作成時にのみ設定可能

SourceAccessConfigurations

N

認証情報なし

クラスターの VPC 情報または認証情報

SASL_PLAIN は、BASIC_AUTH に設定

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON、または LATEST

作成時にのみ設定可能

StartingPositionTimestamp

N

StartingPosition が AT_TIMESTAMP に設定されている場合にのみ必要

トピック

Y

トピック名

作成時にのみ設定可能