セルフマネージド型の Apache Kafka で Lambda を使用する - AWS Lambda

セルフマネージド型の Apache Kafka で Lambda を使用する

Lambdaは、Apache Kafkaイベントソースソースとしてサポートしています。Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。

ユーザーは、AWS マネージドの Kafka サービス、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、またはセルフマネージドの Kafka クラスターを使用できます。Amazon MSK で Lambda を使用する方法の詳細については、Amazon MSK で Lambda を使用する を参照してください。

このトピックでは、セルフマネージド型の Kafka クラスターで Lambda を使用する方法を説明します。AWS の用語集では、セルフマネージド型クラスターには、非 AWS のホストされた Kafka クラスターが含まれています。例えば、お使いの Kafka クラスターを、CloudKarafka などのクラウドプロバイダーでホストすることが可能です。ご自身のクラスターで他の AWS ホスティングオプションを使用することもできます。詳細については、AWS Big Data Blog の Best Practices for Running Apache Kafka on AWS を参照してください。

イベントソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Kafka ベースのイベントソースの場合、Lambda はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。詳しくは、「バッチ処理動作」を参照してください。

セルフマネージド型 Kafka をイベントソースとして使用する方法の例については、AWS Compute Blog の Using self-hosted Apache Kafka as an event source for AWS Lambda を参照してください。

Lambda は、Lambda 関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Kafka トピックと Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

{ "eventSource":"aws:SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Kafka クラスター認証

Lambda は、セルフマネージド型 Apache Kafka クラスターで認証するための方法をいくつかサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Kafka クラスターを設定しておいてください。Kafka セキュリティの詳細については、Kafka ドキュメントの「Security」(セキュリティ) セクションを参照してください。

VPC アクセス

VPC 内の Kafka ユーザーのみが Kafka ブローカーにアクセスする場合は、Amazon Virtual Private Cloud (Amazon VPC) アクセス用に Kafka イベントソースを設定する必要があります。

SASL/SCRAM 認証

Lambda は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda は、暗号化された認証情報を送信してクラスターで認証します。SASL/SCRAM 認証の詳細については、「RFC 5802」を参照してください。

Lambda は TLS 暗号化による SASL/PLAIN 認証をサポートします。SASL/PLAIN 認証では、Lambda が認証情報をクリアテキスト (暗号化されていないもの) としてサーバーに送信します。

SASL 認証の場合は、ユーザー名とパスワードをシークレットとして AWS Secrets Manager に保存します。Secrets Manager の使用に関する詳細については、「AWS Secrets Manager ユーザーガイド」の「チュートリアル: シークレットの作成と取得」を参照してください。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka では、Lambda がクライアントとして機能します。Kafka ブローカーで Lambda を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、サーバーのトラストストア内の CA によって署名される必要があります。

Kafka クラスターは、Lambda で Kafka ブローカーを認証するために Lambda にサーバー証明書を送信します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、Lambda トラストストア内の認証局 (CA) によって署名される必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。Lambda はルート証明書を使用して Kafka ブローカーを検証します。

mTLS の詳細については、「Introducing mutual TLS authentication for Amazon MSK as an event source」(イベントソースとしての Amazon MSK のための相互 TLS の紹介) を参照してください。

クライアント証明書シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

Lambda は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

サーバルート CA 証明書シークレットの設定

このシークレットは、Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、PEM 形式の Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。

{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"

API アクセスと許可の管理

セルフマネージド Kafka クラスターへのアクセスに加えて、Lambda 関数にはさまざまな API アクションを実行するための許可が必要です。これらの許可は、関数の実行ロールに追加します。ユーザーが API アクションのいずれかにアクセスする必要がある場合は、AWS Identity and Access Management (IAM) ユーザーまたはロールのアイデンティティポリシーに必要な許可を追加します。

Lambda 関数に必要なアクセス許可

Amazon CloudWatch Logs のロググループでログを作成して保存するには、Lambda 関数の実行ロールに以下の許可が必要です。

Lambda 関数のオプションのアクセス許可

Lambda 関数には、以下を実行する許可も必要になる場合があります。

  • Secrets Manager シークレットを記述する。

  • AWS Key Management Service (AWS KMS) カスタマー管理のキーにアクセスする。

  • Amazon VPC にアクセスする。

Secrets Manager と AWS KMS 許可

Kafka ブローカーに設定しているアクセスコントロールのタイプに応じて、Lambda 関数には Secrets Manager シークレットにアクセスするための許可、または AWS KMS カスタマーマネージドキーを復号化するための許可が必要になる場合があります。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

VPC アクセス許可

セルフマネージド Apache Kafka クラスターにアクセスできるのが VPC 内のユーザーのみである場合、Lambda 関数には Amazon VPC リソースにアクセスするための許可が必要です。これらのリソースには、VPC、サブネット、セキュリティグループ、ネットワークインターフェイスが含まれます。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

実行ロールへのアクセス許可の追加

セルフマネージド型 Apache Kafka クラスターが使用するその他の AWS サービスにアクセスするために、Lambda は、関数の実行ロールで定義されたアクセス許可ポリシーを使用します。

デフォルトでは、Lambda は、セルフマネージド型 Apache Kafka クラスターに対して、必須のまたはオプションのアクションを実行することはできません。これらのアクションを IAM 信頼ポリシーで作成および定義し、そのポリシーを実行ロールにアタッチする必要があります。この例では、Lambda に Amazon VPC リソースへのアクセスを許可する、ポリシーの作成方法を紹介します。

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

IAM コンソールで JSON ポリシードキュメントを作成する方法については、IAM ユーザーガイド[JSON] タブでのポリシーの作成を参照してください。

IAM ポリシーを使用したユーザーアクセスの許可

デフォルトで、IAM ユーザーとロールにはイベントソース API 操作を実行する許可がありません。組織またはアカウント内のユーザーにアクセス権を付与するには、アイデンティティベースのポリシーを作成または更新します。詳細については、IAM ユーザーガイドポリシーを使用した AWS リソースへのアクセスのコントロールを参照してください。

認証と認可のエラー

Kafka クラスターからのデータを消費するために必要な許可のいずれかが欠落している場合、Lambda は [LastProcessingResult] のイベントソースマッピングに以下のエラーメッセージのいずれかを表示します。

クラスターが Lambda の認可に失敗した

SALS/SCRAM または mTLS の場合、このエラーは、指定されたユーザーが以下の必要とされる Kafka アクセスコントロールリスト (ACL) 許可のすべてを持っていないことを示します。

  • DescribeConfigs クラスター

  • グループを記述する

  • グループを読み取る

  • トピックを記述する

  • トピックを読み取る

必要な kafka-cluster 許可を使用して Kafka ACL を作成するときは、リソースとしてトピックとグループを指定します。トピック名は、イベントソースマッピングのトピックと一致する必要があります。グループ名は、イベントソースマッピングの UUID と一致する必要があります。

必要な許可を実行ロールに追加した後は、変更が有効になるまで数分間かかる場合があります。

SASL 認証に失敗した

SASL/SCRAM または SASL/PLAIN の場合、このエラーは指定されたユーザー名とパスワードが無効であることを示します。

Server failed to authenticate Lambda (サーバーが Lambda の認証に失敗しました)

このエラーは、Kafka ブローカーが Lambda の認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • mTLS 認証用のクライアント証明書を提供していない。

  • クライアント証明書を提供したが、Kafka ブローカーが mTLS 認証を使用するように設定されていない。

  • クライアント証明書が Kafka ブローカーに信頼されていない。

Lambda failed to authenticate server (Lambda がサーバーの認証に失敗しました)

このエラーは、Lambda が Kafka ブローカーの認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • Kafka ブローカーは自己署名証明書またはプライベート CA を使用するが、サーバールート CA 証明書を提供しなかった。

  • サーバールート CA 証明書が、ブローカーの証明書に署名したルート CA と一致しない。

  • ブローカーの証明書にサブジェクトの別名としてブローカーの DNS 名または IP アドレスが含まれていないため、ホスト名の検証が失敗した。

Provided certificate or private key is invalid (提供された証明書またはプライベートキーが無効です)

このエラーは、Kafka コンシューマーが提供された証明書またはプライベートキーを使用できなかったことを示します。証明書とキーが PEM 形式を使用しており、プライベートキーの暗号化が PBES1 アルゴリズムを使用していることを確認してください。

ネットワーク構成

Kafka ブローカーへの Amazon VPC アクセスを設定する場合、Lambda には Kafka クラスターに関連付けられた Amazon VPC リソースへのアクセス権が必要です。Lambda と AWS Security Token Service (AWS STS) に AWS PrivateLink VPC endpoints をデプロイすることが推奨されます。ブローカーが認証を使用する場合は、Secrets Manager 用の VPC エンドポイントもデプロイします。

または、Kafka クラスターに関連付けられた VPC に、パブリックサブネットごとに 1 つの NAT ゲートウェイが含まれていることを確認します。詳しくは、「VPC に接続した関数のインターネットアクセスとサービスアクセス」を参照してください。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – イベントソース用に指定されたセキュリティグループに対して Kafka ブローカーポート上のすべてのトラフィックを許可します。Kafka は、デフォルトでポート 9092 を使用します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。イベントソース用に指定されたセキュリティグループに対して Kafka ブローカーポート上のすべてのトラフィックを許可します。Kafka は、デフォルトでポート 9092 を使用します。

  • NAT ゲートウェイの代わりに VPC エンドポイントを使用している場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

ネットワークの設定方法の詳細については、AWS Compute Blog の Setting up AWS Lambda with an Apache Kafka cluster within a VPC を参照してください。

Kafka クラスターをイベントソースとして追加する

イベントソースマッピングを作成するには、Lambda コンソール、AWSSDK、またはAWS Command Line Interface(AWS CLI) を使用して、Kafka クラスターを Lambda 関数のトリガーとして追加します。

このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。

前提条件

  • セルフマネージド型 Apache Kafka クラスター。Lambda は、Apache Kafka バージョン 0.10.0.0 以降をサポートしています。

  • セルフマネージド Kafka クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

カスタマイズ可能なコンシューマーグループ ID

Kafkaをイベントソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、Lambda は Kafka トピックのメッセージをすべて受け取るわけではありません。Lambda にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition パラメーターを無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition を使用してイベントソースを設定します。

指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。

セルフマネージド型 Kafka クラスターを追加する (コンソール)

セルフマネージド型 Apache Kafka クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Apache Kafka トリガーを Lambda 関数に追加するには (コンソール)

  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. Apache Kafka のトリガータイプを選択します。

    2. [Bootstrap servers] (ブートストラップサーバー) に、クラスター内の Kafka ブローカーのホストおよびポートのペアアドレスを入力し、[Add] (追加) を選択します。クラスター内の各 Kafka ブローカーで上記を繰り返します。

    3. [Topic name] (トピック名) に、クラスター内のレコードの保存に使用する Kafka トピックの名前を入力します。

    4. (オプション) [Batch size] (バッチサイズ) に、単一のバッチで取得できるメッセージの最大数を入力します。

    5. バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

    6. (オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。

    7. (オプション) [Starting position] (開始位置) で、[Latest] (最新) をクリックし、最新のレコードからストリームの読み込みを開始します。または [Trim horizon] (水平トリム) を選択し、利用可能な最も早いレコードから開始します。

    8. (オプション) [VPC] で、Kafka クラスターに Amazon VPC を選択します。次に、[VPC subnets] (VPC サブネット) と [VPC security groups] (VPC セキュリティグループ) を選択します。

      VPC 内のユーザーのみがブローカーにアクセスする場合、この設定は必須です。

    9. (オプション) [Authentication] (認証) で [Add] (追加) をクリックしてから、以下を実行します。

      1. クラスター内の Kafka ブローカーのアクセスまたは認証プロトコルを選択します。

        • Kafka ブローカーが SASL プレーンテキスト認証を使用する場合は、[BASIC_AUTH] を選択します。

        • ブローカーが SALS/SCRAM 認証を使用する場合は、[SASL_SCRAM] プロトコルのいずれかを選択します。

        • mTLS 認証を設定している場合は、[CLIENT_CERTIFICATE_TLS_AUTH] プロトコルを選択します。

      2. SASL/SCRAM または mTLS 認証の場合は、Kafka クラスターの認証情報が含まれる Secrets Manager シークレットキーを選択します。

    10. (オプション) Kafka ブローカーがプライベート CA によって署名された証明書を使用する場合、[Encryption] (暗号化) には Kafka ブローカーが TLS 暗号化に使用するルート CA 証明書が含まれる Secrets Manager シークレットを選択します。

      この設定は、SASL/SCRAM または SASL/PLAIN の TLS 暗号化、および mTLS 認証に適用されます。

    11. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

セルフマネージド型 Kafka クラスターを追加する (AWS CLI)

Lambda 関数のセルフマネージド型 Apache Kafka トリガーを作成および表示するには、次の AWS CLI コマンドの例を使用します。

SASL/SCRAM を使用する

Kafka ユーザーがインターネット経由で Kafka ブローカーにアクセスする場合は、SASL/SCRAM 認証用に作成した Secrets Manager シークレットを指定します。次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。

aws lambda create-event-source-mapping --topics AWSKafkaTopic --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName --function-name arn:aws:lambda:us-east-1:01234567890:function:my-kafka-function --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

詳細については、API リファレンスドキュメント CreateEventSourceMapping を参照してください。

VPC の使用

Kafka ブローカーにアクセスするのが VPC 内の Kafka ユーザーのみである場合、VPC、サブネット、および VPC セキュリティグループを指定する必要があります。次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。

aws lambda create-event-source-mapping --topics AWSKafkaTopic --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' --function-name arn:aws:lambda:us-east-1:01234567890:function:my-kafka-function --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

詳細については、API リファレンスドキュメント CreateEventSourceMapping を参照してください。

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Kafka クラスターをイベントソースとして使用する

Apache Kafka クラスターを Lambda 関数のトリガーとして追加すると、クラスターはイベントソースソースとして使用されます。

Lambda は、ユーザーが指定した StartingPosition に基づいて、CreateEventSourceMapping リクエストで Topics として指定された Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。

StartingPositionLATEST として指定すると、Lambda は、そのトピックに属する各パーティション内の最新のメッセージから読み取りを開始します。トリガーが設定されてから Lambda がメッセージの読み取りを開始するまでには若干の遅延が発生することがあるため、Lambda はこの期間中に生成されたメッセージを読み取りません。

Lambda は、指定された 1 つ、または複数の Kafka トピックパーティションからのレコードを処理し、関数に JSON ペイロードを送信します。利用可能なレコードが増えると、Lambda は関数がトピックに追いつくまで、CreateEventSourceMapping リクエストで指定された BatchSize 値に基づいて、バッチ内のレコードの処理を継続します。

関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

Lambda は、関数を最大 14 分実行できます。関数のタイムアウトは 14 分以下に設定してください (デフォルトのタイムアウト値は 3 秒です)。Lambda は、14 分を超える呼び出しを再試行する場合があります。

Kafka イベントソースのオートスケーリング

初めて Apache Kafka イベントソースを作成するときは、Lambda が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、Lambda は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは Lambda で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、Lambda はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットの Lambda 関数がオーバーロードすると、Lambda はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

Kafka トピックのスループットを監視するには、consumer_lagconsumer_offset などの Apache Kafka コンシューマーラグメトリックスを表示します。いくつの関数呼び出しが並行して発生しているかを確認するときは、関数の同時実行メトリクスも監視します。

イベントソースの API オペレーション

Kafka クラスターを、Lambda コンソール、AWS SDK、または AWS CLI を使用する Lambda 関数のイベントソースとして追加する場合、Lambda は API を使用してリクエストを処理します。

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

イベントソースマッピングエラー

Apache Kafka クラスターを Lambda 関数のイベントソースとして追加すると、関数でエラーが発生した場合、Kafka コンシューマーはレコードの処理を停止します。トピックパーティションのコンシューマーは、レコードのサブスクライブ、読み取り、処理を行います。その他の Kafka コンシューマーは、同じエラーが発生しない限り、レコードの処理を続行できます。

停止したコンシューマの原因を特定するには、StateTransitionReason のレスポンスの EventSourceMapping フィールドを確認します。以下は、受け取る可能性があるイベントソースエラーを説明するリストです。

ESM_CONFIG_NOT_VALID

イベントソースマッピングの設定が無効です。

EVENT_SOURCE_AUTHN_ERROR

Lambda がイベントソースを認証できませんでした。

EVENT_SOURCE_AUTHZ_ERROR

Lambda にイベントソースへのアクセスに必要な許可がありません。

FUNCTION_CONFIG_NOT_VALID

関数の設定が無効です。

注記

Lambda のイベントレコードが許容サイズ制限である 6 MB を超えると、未処理になります。

Amazon CloudWatch メトリクス

Lambda は、関数がレコードを処理している間に OffsetLag メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと関数のコンシューマグループが処理した最後のレコードの間のオフセットの差分です。レコードが追加されてからコンシューマグループがそれを処理するまでのレイテンシーを見積もるには、OffsetLag を使用できます。

OffsetLag の増加傾向は、関数のコンシューマグループに問題があることを示している可能性があります。詳細については、「Lambda 関数のメトリクスの使用」を参照してください。

セルフマネージド Apache Kafka の設定パラメータ

すべての Lambda イベントソースタイプは、同じ CreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Apache Kafka に適用されるのは一部のパラメータのみです。

セルフマネージド型 Apache Kafka に適用されるイベントソースパラメータ
Parameter [Required] (必須) デフォルト メモ

BatchSize

N

100

最大: 10,000

有効

N

有効

FunctionName

Y

SelfManagedEventSource

Y

Kafka ブローカー一覧 作成時にのみ設定可能

SelfManagedKafkaEventSourceConfig

N

ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。

作成時にのみ設定可能

SourceAccessConfigurations

N

認証情報なし

クラスターの VPC 情報または認証情報

SASL_PLAIN は、BASIC_AUTH に設定

StartingPosition

Y

TRIM_HORIZON または LATEST

作成時にのみ設定可能

トピック

Y

トピック名

作成時にのみ設定可能