Amazon MSK で Lambda を使用する - AWS Lambda

Amazon MSK で Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon Managed Streaming for Apache Kafka (Amazon MSK) は、Apache Kafka を使ってストリーミングデータを処理するアプリケーションを、構築および実行することを可能にするフルマネージドサービスです。Amazon MSK は、Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。また、Amazon MSK を使用すると、AWS Identity and Access Management (IAM) を使って複数のアベイラビリティーゾーンやセキュリティ向けにより簡単にアプリケーションを設定することができます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

Amazon MSK は、イベントソースとして Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは設定可能です (デフォルトでは 100 メッセージ)。詳細については、「バッチ処理動作」を参照してください。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

Lambda は、パーティションごとにメッセージを順番に読み込みます。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Amazon MSK をイベントソースとして設定する方法の例については、AWS Compute Blog の Using Amazon MSK as an event source for AWS Lambdaを参照してください。完全なチュートリアルについては、「Amazon MSK Labs」(Amazon MSK ラボ) の「Amazon MSK Lambda Integration」(Amazon MSK の Lambda 統合) を参照してください。

イベントの例

Lambda は、関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

MSK クラスター認証

Lambda には、Amazon MSK クラスターにアクセスする、レコードを取得する、およびその他タスクを実行するための許可が必要です。Amazon MSK は、MSK クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。

非認証アクセス

インターネット経由でクラスターにアクセスするクライアントがない場合は、非認証アクセスを使用できます。

SASL/SCRAM 認証

Amazon MSK は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda がクラスターに接続できるようにするには、認証情報 (ユーザー名とパスワード) を AWS Secrets Manager シークレットに保存します。

Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「AWS Secrets Manager を使用したユーザーネームとパスワードの認証」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしません。

IAM ロールベースの認証

IAM を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。MSK クラスターで IAM 認証がアクティブ化されており、認証用のシークレットを指定しない場合、Lambda はデフォルトで自動的に IAM 認証を使用します。ユーザーまたはロールベースのポリシーを作成してデプロイするには、IAM コンソール、または API を使用します。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「IAM access control」(IAM アクセスコントロール) を参照してください。

Lambda が MSK クラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、関数の実行ロールに以下の許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Amazon MSK Kafka actions」(Amazon MSK Kafka アクション) を参照してください。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

Amazon MSK の場合、Lambda がクライアントとして機能します。MSK クラスターのブローカーで Lambda を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、サーバーのトラストストア内の CA によって署名される必要があります。MSK クラスターは、Lambda でブローカーを認証するために Lambda にサーバー証明書を送信します。サーバー証明書は、AWS トラストストア内の認証局 (CA) によって署名される必要があります。

クライアント証明書を生成する方法の手順については、「Introducing mutual TLS authentication for Amazon MSK as an event source」(イベントソースとしての Amazon MSK のための相互 TLS 認証の紹介) を参照してください。

Amazon MSK は自己署名のサーバー証明書をサポートしません。これは、Amazon MSK のすべてのブローカーが、Lambda がデフォルトで信頼する Amazon Trust Services CA によって署名されたパブリック証明書を使用するためです。

Amazon MSK のための mTLS に関する詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Mutual TLS Authentication」(相互 TLS 認証) を参照してください。

mTLS シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

Lambda は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Lambda でのブートストラップブローカーの選択方法

Lambda は、クラスターで使用可能な認証方法、および認証用のシークレットが提供されているかどうかに基づき、ブートストラップブローカーを選択します。mTLS または SASL/SCRAM のシークレットを指定すると、Lambda は自動的にその認証方法を選択します。シークレットを指定しない場合、Lambda は、クラスターでアクティブ化されている中で、最も強力な認証方法を選択します。以下は、Lambda によるブローカー選択の優先度を、最も強力な認証から弱い認証の順に示したものです。

  • mTLS (mTLS 用のシークレットを提供)

  • SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)

  • SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)

  • 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)

  • プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

注記

Lambda から最も安全なブローカータイプへの接続ができない場合でも、Lambda は別の (安全性の低い) ブローカータイプへの接続を試行しません。安全性の低いブローカータイプを Lambda に選択させたい場合は、クラスターが使用している、より強力な認証方法をすべて無効にします。

API アクセスと許可の管理

Amazon MSK クラスターへのアクセスに加えて、関数にはさまざまな Amazon MSK API アクションを実行するための許可が必要です。これらの許可は、関数の実行ロールに追加します。ユーザーが Amazon MSK API アクションのいずれかにアクセスする必要がある場合は、ユーザーまたはロールのアイデンティティポリシーに必要な許可を追加します。

次の各許可を実行ロールに手動で追加できます。または、AWS マネージドポリシー AWSLambdaMSKExecutionRole を実行ロールにアタッチすることもできます。AWSLambdaMSKExecutionRole ポリシーには、以下にリストされているすべての必要な API アクションと VPC 許可が含まれています。

Lambda 関数の実行ロールに必要な許可

Amazon CloudWatch Logs のロググループでログを作成して保存するには、Lambda 関数の実行ロールに以下の許可が必要です。

Lambda がユーザーに代わって Amazon MSK クラスターにアクセスするには、Lambda 関数の実行ロールに次の許可が必要です。

必要なのは、kafka:DescribeCluster または kafka:DescribeClusterV2 のいずれかを追加することだけです。プロビジョンド MSK クラスターの場合、どちらの許可も機能します。サーバーレス MSK クラスターの場合は、kafka:DescribeClusterV2 を使用する必要があります。

注記

Lambda は関連付けられている AWSLambdaMSKExecutionRole マネージドポリシーから kafka:DescribeCluster の許可を最終的に削除する予定です。このポリシーを使用する場合、kafka:DescribeCluster を使用しているすべてのアプリケーションは、代わりに kafka:DescribeClusterV2 を使用するように移行する必要があります。

VPC アクセス許可

Amazon MSK クラスターにアクセスできるのが VPC 内のユーザーのみである場合、Lambda 関数には Amazon VPC リソースにアクセスするための許可が必要です。これらのリソースには、VPC、サブネット、セキュリティグループ、ネットワークインターフェイスが含まれます。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。これらのアクセス許可は、AWS マネージドポリシー AWSLambdaMSKExecutionRole に含まれています。

Lambda 関数のオプションのアクセス許可

Lambda 関数には、以下を実行する許可も必要になる場合があります。

  • SASL/SCRAM 認証を使用している場合は、SCRAM シークレットにアクセスします。

  • Secrets Manager シークレットを記述する。

  • AWS Key Management Service (AWS KMS) カスタマー管理のキーにアクセスする。

  • 失敗した呼び出しのレコードを送信先に送信します。

Secrets Manager と AWS KMS 許可

Amazon MSK ブローカーに設定しているアクセスコントロールのタイプに応じて、Lambda 関数には SCRAM シークレットにアクセスするための許可 (SASL/SCRAM 認証を使用する場合)、または AWS KMS カスタマーマネージドキーを復号するための Secrets Manager シークレットが必要になる場合があります。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。

レコードを送信先に送信する

失敗した呼び出しのレコードを失敗時の送信先に送信する場合、Lambda 関数にこれらのレコードを送信する権限が必要です。Kafka イベントソースマッピングでは、Amazon SNS トピック、Amazon SQS キュー、または Amazon S3 バケットのいずれかを、送信先として選択できます。レコードを SNS トピックに送信するには、関数の実行ロールに以下のアクセス許可が必要です。

レコードを SQS キューに送信するには、関数の実行ロールに以下のアクセス許可が必要です。

レコードを S3 バケットに送信するには、関数の実行ロールに以下のアクセス許可が必要です。

さらに、送信先に KMS キーを設定した場合、Lambda には送信先のタイプに応じて以下のアクセス許可が必要です。

  • S3 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:GenerateDataKey が必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。

  • SQS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decrypt および kms:GenerateDataKey が必要です。KMS キーと SQS キューの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。

  • SNS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decryptkms:GenerateDataKey が必要です。KMS キーと SNS トピックの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。

実行ロールへのアクセス許可の追加

IAM コンソールを使用して実行ロールに AWS マネージドポリシー AWSLambdaMSKExecutionRole を追加するには、次の手順を実行します。

AWS 管理ポリシーを追加するには
  1. IAM コンソールの [Policies (ポリシー)] ページを開きます。

  2. 検索ボックスに、ポリシー名 (AWSLambdaMSKExecutionRole) を入力します。

  3. リストからポリシーを選択して、[ポリシーアクション] の [アタッチ] を選択します。

  4. 添付ポリシーページで、リストから実行ロールを選択し、[Attach policy](ポリシーの添付) を選択します。

IAM ポリシーを使用したユーザーアクセスの許可

デフォルトでは、ユーザーとロールには Amazon MSK API 操作を実行する許可がありません。組織またはアカウント内のユーザーにアクセス権を付与するには、アイデンティティベースのポリシーを追加または更新することができます。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドAmazon MSK Identity-Based Policy Examples を参照してください。

認証と認可のエラー

Amazon MSK クラスターからのデータを消費するために必要な許可のいずれかが欠落している場合、Lambda は [LastProcessingResult] のイベントソースマッピングに以下のエラーメッセージのいずれかを表示します。

クラスターが Lambda の認可に失敗した

SALS/SCRAM または mTLS の場合、このエラーは、指定されたユーザーが以下の必要とされる Kafka アクセスコントロールリスト (ACL) 許可のすべてを持っていないことを示します。

  • DescribeConfigs クラスター

  • グループを記述する

  • グループを読み取る

  • トピックを記述する

  • トピックを読み取る

IAM アクセスコントロールの場合、関数の実行ロールにグループまたはトピックへのアクセスに必要な許可が 1 つ、または複数不足しています。「IAM ロールベースの認証」で、必要な許可のリストを確認してください。

必要な Kafka クラスター許可を使用して Kafka ACL または IAM ポリシーのいずれかを作成するときは、リソースとしてトピックとグループを指定します。トピック名は、イベントソースマッピングのトピックと一致する必要があります。グループ名は、イベントソースマッピングの UUID と一致する必要があります。

必要な許可を実行ロールに追加した後は、変更が有効になるまで数分間かかる場合があります。

SASL 認証に失敗した

SASL/SCRAM の場合、このエラーは指定されたユーザー名とパスワードが無効であることを示します。

IAM アクセスコントロールの場合、実行ロールに MSK クラスターに対する kafka-cluster:Connect 許可がありません。この許可をロールに追加して、クラスターの Amazon リソースネーム (ARN) をリソースとして指定します。

このエラーは断続的に発生する場合があります。クラスターは、TCP 接続の数が Amazon MSK サービスクォータを超過すると、接続を拒否します。Lambda は接続に成功するまでバックオフし、再試行します。Lambda がクラスターに接続してレコードをポーリングすると、最後の処理結果が OK に変わります。

Server failed to authenticate Lambda (サーバーが Lambda の認証に失敗しました)

このエラーは、Amazon MSK Kafka ブローカーが Lambda の認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • mTLS 認証用のクライアント証明書を提供していない。

  • クライアント証明書を提供したが、ブローカーが mTLS を使用するように設定されていない。

  • クライアント証明書がブローカーに信頼されていない。

Provided certificate or private key is invalid (提供された証明書またはプライベートキーが無効です)

このエラーは、Amazon MSK コンシューマーが提供された証明書またはプライベートキーを使用できなかったことを示します。証明書とキーが PEM 形式を使用しており、プライベートキーの暗号化が PBES1 アルゴリズムを使用していることを確認してください。

ネットワーク構成

Lambda が Kafka クラスターをイベントソースとして使用するには、クラスターが存在する Amazon VPC にアクセスする必要があります。VCP にアクセスするには、Lambda に AWS PrivateLink 「 VPC エンドポイント」をデプロイすることをお勧めします。Lambda および AWS Security Token Service (AWS STS) にエンドポイントをデプロイします。ブローカーが認証を使用する場合は、Secrets Manager 用の VPC エンドポイントもデプロイします。失敗時の送信先を設定した場合は、送信先サービスの VPC エンドポイントもデプロイします。

または、Kafka クラスターに関連付けられた VPC に、パブリックサブネットごとに 1 つの NAT ゲートウェイが含まれていることを確認します。詳細については、「VPC に接続された Lambda 関数にインターネットアクセスを有効にする」を参照してください。

VPC エンドポイントを使用する場合は、プライベート DNS 名を有効にするように設定する必要もあります。

MSK クラスターのイベントソースマッピングを作成すると、Lambda はクラスターの VPC のサブネットおよびセキュリティグループに Elastic Network Interface (ENI) が既に存在するかどうかを確認します。Lambda が既存の ENI を検出した場合、再利用しようとします。それ以外の場合、Lambda は新しい ENI を作成し、イベントソースに接続して関数を呼び出します。

注記

Lambda 関数は、Lambda サービスが所有する VPC 内で常に実行されます。これらの VPC はサービスによって自動的に管理され、顧客には表示されません。関数を Amazon VPC に接続することもできます。いずれの場合、関数の VPC 設定はイベントソースマッピングに影響しません。Lambda がイベントソースに接続する方法を判定するのは、イベントソースの VPC の設定のみです。

Amazon VPC の設定は、Amazon MSK API を使用して検出できます。create-event-source-mapping コマンドを使用してセットアップ中に設定する必要はありません。

ネットワークの設定方法の詳細については、AWS Compute Blog の Setting up AWS Lambda with an Apache Kafka cluster within a VPC を参照してください。

VPC セキュリティグループのルール

次のルール (最低限) に従ってクラスターを含む Amazon VPC のセキュリティグループを設定してください。

  • インバウンドルール – イベントソースに指定されたセキュリティグループに対して、Amazon MSK ブローカーポート (プレーンテキストの場合は 9092、TLS の場合は 9094、SASL の場合は 9096、IAM の場合は 9098) 上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。イベントソースに指定されたセキュリティグループに対して、Amazon MSK ブローカーポート (プレーンテキストの場合は 9092、TLS の場合は 9094、SASL の場合は 9096、IAM の場合は 9098) 上のすべてのトラフィックを許可します。

  • NAT ゲートウェイの代わりに VPC エンドポイントを使用している場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

VPCエンドポイントの使用

VPC エンドポイントを使用するとき、ENI を使用し、関数を呼び出す API コールはこれらのエンドポイントを経由してルーティングされます。Lambda サービスプリンシパルは、これらの ENI を使用するすべてのロールおよび関数に対し、sts:AssumeRole および lambda:InvokeFunction を呼び出す必要があります。

デフォルトでは、VPC エンドポイントはオープンな IAM ポリシーがあります。特定のプリンシパルのみがそのエンドポイントを使用して必要なアクションを実行するため、これらのポリシーを制限することがベストプラクティスです。イベントソースマッピングが Lambda 関数を呼び出せるようにするには、VPC エンドポイントポリシーは Lambda サービスプリンシパルが sts:AssumeRole および lambda:InvokeFunction を呼び出せるようにする必要があります。組織内で発生する API コールのみを許可するように VPC エンドポイントポリシーを制限すると、イベントソースマッピングが正しく機能しなくなります。

次の VPC エンドポイントポリシーの例では、AWS STS および Lambda エンドポイントに Lambda サービスプリンシパルの必要なアクセスを付与する方法について示しています。

例 VPC エンドポイントポリシー - AWS STS エンドポイント
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
例 VPC エンドポイントポリシー – Lambda エンドポイント
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Kafka ブローカーが認証を使用する場合、Secrets Manager エンドポイントの VPC エンドポイントポリシーを制限することもできます。Secrets Manager API を呼び出す場合、Lambda は Lambda サービスプリンシパルではなく、関数ロールを使用します。次の例では、Secrets Manager エンドポイントポリシーを示します。

例 VPC エンドポイントポリシー - Secrets Manager エンドポイント
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

障害が発生している宛先が設定されている場合、Lambda は関数のロールを使用し、Lambda が管理する ENI で s3:PutObjectsns:Publishsqs:sendMessage のいずれかを呼び出します。

Amazon MSK をイベントソースとして追加

イベントソースマッピングを作成するには、Amazon MSK を、Lambda コンソール、AWSSDK、またはAWS Command Line Interface(AWS CLI) を使用して Lambda 関数のトリガーとして追加します。Amazon MSK をトリガーとして追加すると、Lambda は Lambda 関数用の VPC 設定ではなく、Amazon MSK クラスター用の VPC 設定を引き受けることに注意してください。

このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。

前提条件

  • Amazon MSK クラスターと Kafka トピック 詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドGetting Started Using Amazon MSK を参照してください。

  • MSK クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

カスタマイズ可能なコンシューマーグループ ID

Kafkaをイベントソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、Lambda は Kafka トピックのメッセージをすべて受け取るわけではありません。Lambda にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition パラメーターを無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition を使用してイベントソースを設定します。

指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。

障害発生時の送信先

Kafka イベントソースからの失敗した呼び出しやサイズが大きすぎるペイロードの記録を保持するには、障害発生時の送信先を関数に設定します。呼び出しが失敗すると、Lambda は呼び出しの詳細を含む JSON レコードを送信先に送ります。

Amazon SNS トピック、Amazon SQS キュー、または Amazon S3 バケットのいずれかを送信先として選択できます。SNS トピックまたは SQS キューの送信先の場合、Lambda はレコードメタデータを送信先に送信します。S3 バケットの送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。

Lambda がレコードを選択した送信先に正常に送信するには、関数の実行ロールに関連するアクセス許可が含まれていることを確認してください。この表では、各送信先タイプで JSON 呼び出しレコードを受け取る方法についても説明しています。

送信先タイプ サポートされるイベントソース 必要なアクセス許可 宛先固有の JSON 形式

Amazon SQS キュー

  • Kinesis

  • DynamoDB

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードのメタデータを Message として送信先に渡します。

Amazon SNS トピック

  • Kinesis

  • DynamoDB

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードのメタデータを Message として送信先に渡します。

Amazon S3 バケット

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードをメタデータと共に送信先に保存します。

ヒント

ベストプラクティスとして、実行ロールにのみ必要な最小限のアクセス許可を含めます。

SNS と SQS の送信先

以下の例は、Kafka イベントソース呼び出しが失敗した場合に Lambda が SNS トピックまたは SQS キューの送信先に送信する内容を示しています。recordsInfo の各キーには、Kafka トピックとパーティションの両方がハイフンで区切られて含まれています。例えば、キー "Topic-0" の場合、Topic は Kafka トピック、0 はパーティションです。各トピックとパーティションについて、オフセットとタイムスタンプデータを使用して元の呼び出しレコードを検索できます。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 の送信先

S3 の送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。以下の例は、Kafka イベントソース呼び出しが失敗した場合に、Lambda が S3 バケットの送信先に送信することを示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、payload フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
ヒント

送信先バケットで S3 バージョニングを有効にすることをお勧めします。

障害発生時の送信先の設定

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [ソース] には、[イベントソースマッピング呼び出し] を選択します。

  5. [イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。

  6. [条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

  7. [送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

  8. [送信先] で、リソースを選択します。

  9. [Save] を選択します。

Lambda API を使用して障害発生時の送信先を設定することもできます。例えば、以下の CreateEventSourceMapping CLI コマンドは、SQS 障害発生時の送信先を MyFunction に追加します。

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

以下の UpdateEventSourceMapping CLI コマンドは、S3 障害発生時の送信先を、入力 uuid に関連付けられた Kafka イベントソースに追加します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

送信先を削除するには、destination-config パラメータの引数として空の文字列を指定します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon MSK トリガーの追加 (コンソール)

Amazon MSK クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Amazon MSK トリガーをLambda 関数 (コンソール) に追加するには
  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. MSKトリガータイプを選択します。

    2. [MSK cluster] (MSK クラスター) で、クラスターを選択します。

    3. [Batch size] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。

    4. バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

    5. [Topic name] (トピック名)に、Kafka トピックの名前を入力します。

    6. (オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。

    7. (オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。

    8. (オプション) [Authentication] (認証) で、MSK クラスターのブローカーで認証するためのシークレットキーを選択します。

    9. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

Amazon MSK トリガーの追加 (AWS CLI)

Lambda 関数の Amazon MSK トリガーを作成および表示するには、次の例の AWS CLI コマンドを使用します。

AWS CLI を使用したトリガーの作成

例 — IAM 認証を使用するクラスターのイベントソースマッピングを作成します。

次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。トピックの開始位置は LATEST に設定します。クラスターが IAM ロールベースの認証を使用する場合、SourceAccessConfiguration オブジェクトは必要ありません。例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — SASL/SCRAM 認証を使用するクラスターのイベントソースマッピングを作成します。

クラスターが SASL/SCRAM 認証を使用する場合は、SASL_SCRAM_512_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
例 — mTLS 認証を使用するクラスターのイベントソースマッピングを作成します。

クラスターが mTLS 認証を使用する場合は、CLIENT_CERTIFICATE_TLS_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

詳細については、API リファレンスドキュメント CreateEventSourceMapping を参照してください。

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

クロスアカウントのイベントソースマッピングの作成

マルチ VPC プライベート接続を使用して、Lambda 関数を別の AWS アカウントのプロビジョニングされた MSK クラスターに接続できます。マルチ VPC 接続は AWS PrivateLink を使用して、すべてのトラフィックを AWS ネットワーク内に保持します。

注記

サーバーレス MSK クラスターにはクロスアカウントイベントソースマッピングを作成できません。

クロスアカウントイベントソースマッピングを作成するには、まず MSK クラスターのマルチ VPC 接続を設定する必要があります。イベントソースマッピングを作成するときは、以下の例に示すように、クラスター ARN の代わりにマネージド VPC 接続 ARN を使用します。CreateEventSourceMapping オペレーションは、MSK クラスターが使用する認証タイプによっても異なります。

例 — IAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが IAM ロールベースの認証を使用する場合、SourceAccessConfiguration オブジェクトは必要ありません。例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — SASL/SCRAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが SASL/SCRAM 認証を使用する場合は、SASL_SCRAM_512_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

SASL/SCRAM 認証でクロスアカウントの Amazon MSK イベントソースマッピングにシークレットを使用する方法は 2 つあります。

  • Lambda 関数アカウントにシークレットを作成し、クラスターシークレットと同期します。2 つのシークレットを同期させるローテーションを作成します。このオプションでは、関数アカウントからシークレットを制御できます。

  • MSK クラスターに関連付けられているシークレットを使用してください。このシークレットは、Lambda 関数アカウントへのクロスアカウントアクセスを許可する必要があります。詳細については、「別のアカウントのユーザーの AWS Secrets Manager シークレットに対するアクセス許可」を参照してください。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
例 — mTLS 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが mTLS 認証を使用する場合は、CLIENT_CERTIFICATE_TLS_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。シークレットは、クラスターアカウントまたは Lambda 関数アカウントに保存できます。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Amazon MSK イベントソースの Auto Scaling

初めて Amazon MSK イベントソースを作成するときは、Lambda が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、Lambda は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは Lambda で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、Lambda はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットの Lambda 関数がスロットリングされると、Lambda はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

Kafka トピックのスループットをモニタリングするには、関数がレコードを処理する間に Lambda が発行するオフセット遅延メトリクスを表示してください。

いくつの関数呼び出しが並行して発生しているかを確認するときは、関数の同時実行メトリクスも監視します。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

Amazon CloudWatch メトリクス

Lambda は、関数がレコードを処理している間に OffsetLag メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと関数のコンシューマグループが処理した最後のレコードの間のオフセットの差分です。レコードが追加されてからコンシューマグループがそれを処理するまでのレイテンシーを見積もるには、OffsetLag を使用できます。

OffsetLag の増加傾向は、関数のコンシューマグループに問題があることを示している可能性があります。詳細については、「Lambda 関数のメトリクスの使用」を参照してください。

Amazon MSK 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MSK に適用されるのは一部のパラメータのみです。

Amazon MSK に適用されるイベントソースパラメータ
Parameter 必須 デフォルト メモ

AmazonManagedKafkaEventSourceConfig

N

ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。

作成時にのみ設定可能

BatchSize

N

100

最大: 10,000

有効

N

有効

EventSourceArn

Y

作成時にのみ設定可能

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

SourceAccessConfigurations

N

認証情報なし

イベントソース用の、SASL/SCRAM あるいは CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) の認証情報

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON、または LATEST

作成時にのみ設定可能

StartingPositionTimestamp

N

StartingPosition が AT_TIMESTAMP に設定されている場合にのみ必要

トピック

Y

カフカのトピック名

作成時にのみ設定可能