Amazon MSK で Lambda を使用する - AWS Lambda

Amazon MSK で Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon Managed Streaming for Apache Kafka (Amazon MSK) は、Apache Kafka を使ってストリーミングデータを処理するアプリケーションを、構築および実行することを可能にするフルマネージドサービスです。Amazon MSK は、Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。また、Amazon MSK を使用すると、AWS Identity and Access Management (IAM) を使って複数のアベイラビリティーゾーンやセキュリティ向けにより簡単にアプリケーションを設定することができます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

Amazon MSK は、イベントソースとして Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは設定可能です (デフォルトでは 100 メッセージ)。詳細については、「バッチ処理動作」を参照してください。

Lambda は、パーティションごとにメッセージを順番に読み込みます。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

Lambda は現在、マルチ VPC サーバーレスタイプの MSK クラスターからのメッセージの消費をサポートしていません。

Amazon MSK をイベントソースとして設定する方法の例については、AWS Compute Blog の Using Amazon MSK as an event source for AWS Lambdaを参照してください。完全なチュートリアルについては、「Amazon MSK Labs」(Amazon MSK ラボ) の「Amazon MSK Lambda Integration」(Amazon MSK の Lambda 統合) を参照してください。

イベントの例

Lambda は、関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

MSK クラスター認証

Lambda には、Amazon MSK クラスターにアクセスする、レコードを取得する、およびその他タスクを実行するための許可が必要です。Amazon MSK は、MSK クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。

非認証アクセス

インターネット経由でクラスターにアクセスするクライアントがない場合は、非認証アクセスを使用できます。

SASL/SCRAM 認証

Amazon MSK は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda がクラスターに接続できるようにするには、認証情報 (ユーザー名とパスワード) を AWS Secrets Manager シークレットに保存します。

Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「AWS Secrets Manager を使用したユーザーネームとパスワードの認証」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしません。

IAM ロールベースの認証

IAM を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。MSK クラスターで IAM 認証がアクティブ化されており、認証用のシークレットを指定しない場合、Lambda はデフォルトで自動的に IAM 認証を使用します。ユーザーまたはロールベースのポリシーを作成してデプロイするには、IAM コンソール、または API を使用します。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「IAM access control」(IAM アクセスコントロール) を参照してください。

Lambda が MSK クラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、関数の実行ロールに以下の許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Amazon MSK Kafka actions」(Amazon MSK Kafka アクション) を参照してください。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

Amazon MSK の場合、Lambda がクライアントとして機能します。MSK クラスターのブローカーで Lambda を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、サーバーのトラストストア内の CA によって署名される必要があります。MSK クラスターは、Lambda でブローカーを認証するために Lambda にサーバー証明書を送信します。サーバー証明書は、AWS トラストストア内の認証局 (CA) によって署名される必要があります。

クライアント証明書を生成する方法の手順については、「Introducing mutual TLS authentication for Amazon MSK as an event source」(イベントソースとしての Amazon MSK のための相互 TLS 認証の紹介) を参照してください。

Amazon MSK は自己署名のサーバー証明書をサポートしません。これは、Amazon MSK のすべてのブローカーが、Lambda がデフォルトで信頼する Amazon Trust Services CA によって署名されたパブリック証明書を使用するためです。

Amazon MSK のための mTLS に関する詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Mutual TLS Authentication」(相互 TLS 認証) を参照してください。

mTLS シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

Lambda は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Lambda でのブートストラップブローカーの選択方法

Lambda は、クラスターで使用可能な認証方法、および認証用のシークレットが提供されているかどうかに基づき、ブートストラップブローカーを選択します。mTLS または SASL/SCRAM のシークレットを指定すると、Lambda は自動的にその認証方法を選択します。シークレットを指定しない場合、Lambda は、クラスターでアクティブ化されている中で、最も強力な認証方法を選択します。以下は、Lambda によるブローカー選択の優先度を、最も強力な認証から弱い認証の順に示したものです。

  • mTLS (mTLS 用のシークレットを提供)

  • SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)

  • SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)

  • 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)

  • プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

注記

Lambda から最も安全なブローカータイプへの接続ができない場合でも、Lambda は別の (安全性の低い) ブローカータイプへの接続を試行しません。安全性の低いブローカータイプを Lambda に選択させたい場合は、クラスターが使用している、より強力な認証方法をすべて無効にします。

API アクセスと許可の管理

Amazon MSK クラスターへのアクセスに加えて、関数にはさまざまな Amazon MSK API アクションを実行するための許可が必要です。これらの許可は、関数の実行ロールに追加します。ユーザーが Amazon MSK API アクションのいずれかにアクセスする必要がある場合は、ユーザーまたはロールのアイデンティティポリシーに必要な許可を追加します。

次の各許可を実行ロールに手動で追加できます。または、AWS マネージドポリシー AWSLambdaMSKExecutionRole を実行ロールにアタッチすることもできます。AWSLambdaMSKExecutionRole ポリシーには、以下にリストされているすべての必要な API アクションと VPC 許可が含まれています。

Lambda 関数の実行ロールに必要な許可

Amazon CloudWatch Logs のロググループでログを作成して保存するには、Lambda 関数の実行ロールに以下の許可が必要です。

Lambda がユーザーに代わって Amazon MSK クラスターにアクセスするには、Lambda 関数の実行ロールに次の許可が必要です。

必要なのは、kafka:DescribeCluster または kafka:DescribeClusterV2 のいずれかを追加することだけです。プロビジョンド MSK クラスターの場合、どちらの許可も機能します。サーバーレス MSK クラスターの場合は、kafka:DescribeClusterV2 を使用する必要があります。

注記

Lambda は関連付けられている AWSLambdaMSKExecutionRole マネージドポリシーから kafka:DescribeCluster の許可を最終的に削除する予定です。このポリシーを使用する場合、kafka:DescribeCluster を使用しているすべてのアプリケーションは、代わりに kafka:DescribeClusterV2 を使用するように移行する必要があります。

VPC アクセス許可

Amazon MSK クラスターにアクセスできるのが VPC 内のユーザーのみである場合、Lambda 関数には Amazon VPC リソースにアクセスするための許可が必要です。これらのリソースには、VPC、サブネット、セキュリティグループ、ネットワークインターフェイスが含まれます。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

Lambda 関数のオプションのアクセス許可

Lambda 関数には、以下を実行する許可も必要になる場合があります。

  • SASL/SCRAM 認証を使用している場合は、SCRAM シークレットにアクセスします。

  • Secrets Manager シークレットを記述する。

  • AWS Key Management Service (AWS KMS) カスタマー管理のキーにアクセスする。

Secrets Manager と AWS KMS 許可

Amazon MSK ブローカーに設定しているアクセスコントロールのタイプに応じて、Lambda 関数には SCRAM シークレットにアクセスするための許可 (SASL/SCRAM 認証を使用する場合)、または AWS KMS カスタマーマネージドキーを復号するための Secrets Manager シークレットが必要になる場合があります。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

実行ロールへのアクセス許可の追加

IAM コンソールを使用して実行ロールに AWS マネージドポリシー AWSLambdaMSKExecutionRole を追加するには、次の手順を実行します。

AWS 管理ポリシーを追加するには
  1. IAM コンソールの [Policies (ポリシー)] ページを開きます。

  2. 検索ボックスに、ポリシー名 (AWSLambdaMSKExecutionRole) を入力します。

  3. リストからポリシーを選択して、[ポリシーアクション] の [アタッチ] を選択します。

  4. 添付ポリシーページで、リストから実行ロールを選択し、[Attach policy](ポリシーの添付) を選択します。

IAM ポリシーを使用したユーザーアクセスの許可

デフォルトでは、ユーザーとロールには Amazon MSK API 操作を実行する許可がありません。組織またはアカウント内のユーザーにアクセス権を付与するには、アイデンティティベースのポリシーを追加または更新することができます。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドAmazon MSK Identity-Based Policy Examples を参照してください。

SASL/SCRAM 認証の使用

Amazon MSK は、TLS 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。AWS Secrets Manager シークレットを使用してユーザー名とパスワード認証を設定することで、Amazon MSK クラスターへのアクセスを制御することができます。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドUsername and password authentication with AWS Secrets Manager を参照してください。SASL/SCRAM 認証の詳細については、「RFC 5802」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしておりませんので注意してください。

認証と認可のエラー

Amazon MSK クラスターからのデータを消費するために必要な許可のいずれかが欠落している場合、Lambda は [LastProcessingResult] のイベントソースマッピングに以下のエラーメッセージのいずれかを表示します。

クラスターが Lambda の認可に失敗した

SALS/SCRAM または mTLS の場合、このエラーは、指定されたユーザーが以下の必要とされる Kafka アクセスコントロールリスト (ACL) 許可のすべてを持っていないことを示します。

  • DescribeConfigs クラスター

  • グループを記述する

  • グループを読み取る

  • トピックを記述する

  • トピックを読み取る

IAM アクセスコントロールの場合、関数の実行ロールにグループまたはトピックへのアクセスに必要な許可が 1 つ、または複数不足しています。「 IAM ロールベースの認証」で、必要な許可のリストを確認してください。

必要な Kafka クラスター許可を使用して Kafka ACL または IAM ポリシーのいずれかを作成するときは、リソースとしてトピックとグループを指定します。トピック名は、イベントソースマッピングのトピックと一致する必要があります。グループ名は、イベントソースマッピングの UUID と一致する必要があります。

必要な許可を実行ロールに追加した後は、変更が有効になるまで数分間かかる場合があります。

SASL 認証に失敗した

SASL/SCRAM の場合、このエラーは指定されたユーザー名とパスワードが無効であることを示します。

IAM アクセスコントロールの場合、実行ロールに MSK クラスターに対する kafka-cluster:Connect 許可がありません。この許可をロールに追加して、クラスターの Amazon リソースネーム (ARN) をリソースとして指定します。

このエラーは断続的に発生する場合があります。クラスターは、TCP 接続の数が Amazon MSK サービスクォータを超過すると、接続を拒否します。Lambda は接続に成功するまでバックオフし、再試行します。Lambda がクラスターに接続してレコードをポーリングすると、最後の処理結果が OK に変わります。

Server failed to authenticate Lambda (サーバーが Lambda の認証に失敗しました)

このエラーは、Amazon MSK Kafka ブローカーが Lambda の認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • mTLS 認証用のクライアント証明書を提供していない。

  • クライアント証明書を提供したが、ブローカーが mTLS を使用するように設定されていない。

  • クライアント証明書がブローカーに信頼されていない。

Provided certificate or private key is invalid (提供された証明書またはプライベートキーが無効です)

このエラーは、Amazon MSK コンシューマーが提供された証明書またはプライベートキーを使用できなかったことを示します。証明書とキーが PEM 形式を使用しており、プライベートキーの暗号化が PBES1 アルゴリズムを使用していることを確認してください。

ネットワーク構成

Lambda は、Amazon MSK クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。Lambda と AWS Security Token Service (AWS STS) に AWS PrivateLink VPC エンドポイントをデプロイすることをお勧めします。ポーラーは Lambda 関数に関連付けられた実行ロールを引き受けるため、AWS STS にアクセスする必要があります。Lambda は関数を呼び出すため、Lambda VPC エンドポイントにアクセスする必要があります。ブローカーに Lambda を認証するために Secrets Manager のシークレットを設定した場合、Secrets Manager 用に VPC エンドポイントのデプロイも行ってください。

または、MSK クラスターに関連付けられた VPC で、パブリックサブネットごとに 1 つの NAT ゲートウェイが含まれていることを確認します。詳しくは、「VPC に接続した関数のインターネットアクセスとサービスアクセス」を参照してください。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – イベントソースに指定されたセキュリティグループに対して、Amazon MSK ブローカーポート (プレーンテキストの場合は 9092、TLS の場合は 9094、SASL の場合は 9096、IAM の場合は 9098) 上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。イベントソースに指定されたセキュリティグループに対して、Amazon MSK ブローカーポート (プレーンテキストの場合は 9092、TLS の場合は 9094、SASL の場合は 9096、IAM の場合は 9098) 上のすべてのトラフィックを許可します。

  • NAT ゲートウェイの代わりに VPC エンドポイントを使用している場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

注記

Amazon VPC の設定は、Amazon MSK API を使用して検出できます。create-event-source-mapping コマンドを使用してセットアップ中に設定する必要はありません。

ネットワークの設定方法の詳細については、AWS Compute Blog の Setting up AWS Lambda with an Apache Kafka cluster within a VPC を参照してください。

Amazon MSK をイベントソースとして追加

イベントソースマッピングを作成するには、Amazon MSK を、Lambda コンソール、AWSSDK、またはAWS Command Line Interface(AWS CLI) を使用して Lambda 関数のトリガーとして追加します。Amazon MSK をトリガーとして追加すると、Lambda は Lambda 関数用の VPC 設定ではなく、Amazon MSK クラスター用の VPC 設定を引き受けることに注意してください。

このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。

注記

Amazon MSK のイベントソースマッピングを更新、無効化、または削除すると、変更が有効になるまでに最大 15 分かかる場合があります。この期間が経過する前に、イベントソースマッピングは引き続きイベントを処理し、以前の設定を使用して関数を呼び出す場合があります。これは、コンソールで表示されるイベントソースマッピングのステータスが、変更が適用されたことを示している場合にも当てはまります。

前提条件

  • Amazon MSK クラスターと Kafka トピック 詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドGetting Started Using Amazon MSK を参照してください。

  • MSK クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

カスタマイズ可能なコンシューマーグループ ID

Kafkaをイベントソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、Lambda は Kafka トピックのメッセージをすべて受け取るわけではありません。Lambda にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition パラメーターを無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition を使用してイベントソースを設定します。

指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。

Amazon MSK トリガーの追加 (コンソール)

Amazon MSK クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Amazon MSK トリガーをLambda 関数 (コンソール) に追加するには
  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. MSKトリガータイプを選択します。

    2. [MSK cluster] (MSK クラスター) で、クラスターを選択します。

    3. [Batch size] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。

    4. バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

    5. [Topic name] (トピック名)に、Kafka トピックの名前を入力します。

    6. (オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。

    7. (オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。

    8. (オプション) [Authentication] (認証) で、MSK クラスターのブローカーで認証するためのシークレットキーを選択します。

    9. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

Amazon MSK トリガーの追加 (AWS CLI)

Lambda 関数の Amazon MSK トリガーを作成および表示するには、次の例の AWS CLI コマンドを使用します。

AWS CLI を使用したトリガーの作成

次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。トピックの開始位置は LATEST に設定します。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function

詳細については、API リファレンスドキュメント CreateEventSourceMapping を参照してください。

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK イベントソースの Auto Scaling

初めて Amazon MSK イベントソースを作成するときは、Lambda が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、Lambda は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは Lambda で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、Lambda はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットの Lambda 関数がスロットリングされると、Lambda はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

Kafka トピックのスループットをモニタリングするには、関数がレコードを処理する間に Lambda が発行するオフセット遅延メトリクスを表示してください。

いくつの関数呼び出しが並行して発生しているかを確認するときは、関数の同時実行メトリクスも監視します。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

Amazon CloudWatch メトリクス

Lambda は、関数がレコードを処理している間に OffsetLag メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと関数のコンシューマグループが処理した最後のレコードの間のオフセットの差分です。レコードが追加されてからコンシューマグループがそれを処理するまでのレイテンシーを見積もるには、OffsetLag を使用できます。

OffsetLag の増加傾向は、関数のコンシューマグループに問題があることを示している可能性があります。詳細については、「Lambda 関数のメトリクスの使用」を参照してください。

Amazon MSK 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MSK に適用されるのは一部のパラメータのみです。

Amazon MSK に適用されるイベントソースパラメータ
Parameter 必須 デフォルト メモ

AmazonManagedKafkaEventSourceConfig

N

ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。

作成時にのみ設定可能

BatchSize

N

100

最大: 10,000

有効

N

有効

EventSourceArn

Y

作成時にのみ設定可能

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

SourceAccessConfigurations

N

認証情報なし

イベントソース用の、SASL/SCRAM あるいは CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) の認証情報

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON、または LATEST

作成時にのみ設定可能

StartingPositionTimestamp

N

StartingPosition が AT_TIMESTAMP に設定されている場合にのみ必要

トピック

Y

カフカのトピック名

作成時にのみ設定可能