Amazon MSK で Lambda を使用する - AWS Lambda

Amazon MSK で Lambda を使用する

Amazon Managed Streaming for Apache Kafka (Amazon MSK) は、Apache Kafka を使ってストリーミングデータを処理するアプリケーションを、構築および実行することを可能にするフルマネージドサービスです。Amazon MSK は、Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。また、Amazon MSK を使用すると、AWS Identity and Access Management (IAM) を使って複数のアベイラビリティーゾーンやセキュリティ向けにより簡単にアプリケーションを設定することができます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

Amazon MSK は、イベントソースとして Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Amazon MSK をイベントソースとして設定する方法の例については、AWS Compute Blog の Using Amazon MSK as an event source for AWS Lambdaを参照してください。完全なチュートリアルについては、「Amazon MSK Labs」(Amazon MSK ラボ) の「Amazon MSK Lambda Integration」(Amazon MSK の Lambda 統合) を参照してください。

Kafka ベースのイベントソースの場合、Lambda はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。詳細については、バッチ処理動作 を参照してください。

Lambda は、パーティションごとにメッセージを順番に読み込みます。Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

Lambda は、関数を最大 14 分実行できます。関数のタイムアウトは 14 分以下に設定してください (デフォルトのタイムアウト値は 3 秒です)。Lambda は、14 分を超える呼び出しを再試行する場合があります。

Lambda は、関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

MSK クラスター認証

Lambda には、Amazon MSK クラスターにアクセスする、レコードを取得する、およびその他タスクを実行するための許可が必要です。Amazon MSK は、MSK クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。

非認証アクセス

インターネット経由でクラスターにアクセスするクライアントがない場合は、非認証アクセスを使用できます。

SASL/SCRAM 認証

Amazon MSK は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda がクラスターに接続できるようにするには、認証情報 (ユーザー名とパスワード) を AWS Secrets Manager シークレットに保存します。

Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「AWS Secrets Manager を使用したユーザーネームとパスワードの認証」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしません。

IAM ロールベースの認証

IAM を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。IAM ユーザーまたはロールベースのポリシーを作成してデプロイするには、IAM コンソール、または API を使用します。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「IAM access control」(IAM アクセスコントロール) を参照してください。

Lambda が MSK クラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、関数の実行ロールに以下の許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/group-name" ] } ] }

これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Amazon MSK Kafka actions」(Amazon MSK Kafka アクション) を参照してください。IAM が使用するグループ名は、イベントソースマッピングの UUID と同じです。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

Amazon MSK の場合、Lambda がクライアントとして機能します。MSK クラスターのブローカーで Lambda を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、サーバーのトラストストア内の CA によって署名される必要があります。MSK クラスターは、Lambda でブローカーを認証するために Lambda にサーバー証明書を送信します。サーバー証明書は、AWS トラストストア内の認証局 (CA) によって署名される必要があります。

クライアント証明書を生成する方法の手順については、「Introducing mutual TLS authentication for Amazon MSK as an event source」(イベントソースとしての Amazon MSK のための相互 TLS 認証の紹介) を参照してください。

Amazon MSK は自己署名のサーバー証明書をサポートしません。これは、Amazon MSK のすべてのブローカーが、Lambda がデフォルトで信頼する Amazon Trust Services CA によって署名されたパブリック証明書を使用するためです。

Amazon MSK のための mTLS に関する詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Mutual TLS Authentication」(相互 TLS 認証) を参照してください。

mTLS シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

Lambda は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

API アクセスと許可の管理

Amazon MSK クラスターへのアクセスに加えて、関数にはさまざまな Amazon MSK API アクションを実行するための許可が必要です。これらの許可は、関数の実行ロールに追加します。ユーザーが Amazon MSK API アクションのいずれかにアクセスする必要がある場合は、IAM ユーザーまたはロールのアイデンティティポリシーに必要な許可を追加します。

Lambda 関数の実行ロールに必要な許可

Lambda 関数の実行ロールには、ユーザーに代わって MSK クラスターにアクセスするための以下の許可が必要です。実行ロールに AWS 管理ポリシー AWSLambdaMSKExecutionRole を追加するか、アクセス許可を持つカスタムポリシーを作成して、次のアクションを実行できます。

実行ロールへのアクセス許可の追加

IAM コンソールを使用して実行ロールに AWS マネージドポリシー AWSLambdaMSKExecutionRole を追加するには、次の手順を実行します。

AWS 管理ポリシーを追加するには

  1. IAM コンソールの [Policies (ポリシー)] ページを開きます。

  2. 検索ボックスに、ポリシー名 (AWSLambdaMSKExecutionRole) を入力します。

  3. リストからポリシーを選択して、[ポリシーアクション] の [アタッチ] を選択します。

  4. 添付ポリシーページで、リストから実行ロールを選択し、[Attach policy](ポリシーの添付) を選択します。

IAM ポリシーを使用したユーザーアクセスの許可

デフォルトで、IAM ユーザーとロールには Amazon MSK API 操作を実行する許可がありません。組織またはアカウント内のユーザーにアクセス権を付与するには、アイデンティティベースのポリシーを追加または更新することができます。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドAmazon MSK Identity-Based Policy Examples を参照してください。

SASL/SCRAM 認証の使用

Amazon MSK は、TLS 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。AWS Secrets Manager シークレットを使用してユーザー名とパスワード認証を設定することで、Amazon MSK クラスターへのアクセスを制御することができます。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドUsername and password authentication with AWS Secrets Manager を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしておりませんので注意してください。

認証と認可のエラー

Amazon MSK クラスターからのデータを消費するために必要な許可のいずれかが欠落している場合、Lambda は [LastProcessingResult] のイベントソースマッピングに以下のエラーメッセージのいずれかを表示します。

クラスターが Lambda の認可に失敗した

SALS/SCRAM または mTLS の場合、このエラーは、指定されたユーザーが以下の必要とされる Kafka アクセスコントロールリスト (ACL) 許可のすべてを持っていないことを示します。

  • DescribeConfigs クラスター

  • グループを記述する

  • グループを読み取る

  • トピックを記述する

  • トピックを読み取る

IAM アクセスコントロールの場合、関数の実行ロールにグループまたはトピックへのアクセスに必要な許可が 1 つ、または複数不足しています。「 IAM ロールベースの認証」で、必要な許可のリストを確認してください。

必要な Kafka クラスター許可を使用して Kafka ACL または IAM ポリシーのいずれかを作成するときは、リソースとしてトピックとグループを指定します。トピック名は、イベントソースマッピングのトピックと一致する必要があります。グループ名は、イベントソースマッピングの UUID と一致する必要があります。

必要な許可を実行ロールに追加した後は、変更が有効になるまで数分間かかる場合があります。

SASL 認証に失敗した

SASL/SCRAM の場合、このエラーは指定されたユーザー名とパスワードが無効であることを示します。

IAM アクセスコントロールの場合、実行ロールに MSK クラスターに対する kafka-cluster:Connect 許可がありません。この許可をロールに追加して、クラスターの Amazon リソースネーム (ARN) をリソースとして指定します。

このエラーは断続的に発生する場合があります。クラスターは、TCP 接続の数が Amazon MSK サービスクォータを超過すると、接続を拒否します。Lambda は接続に成功するまでバックオフし、再試行します。Lambda がクラスターに接続してレコードをポーリングすると、最後の処理結果が OK に変わります。

Server failed to authenticate Lambda (サーバーが Lambda の認証に失敗しました)

このエラーは、Amazon MSK Kafka ブローカーが Lambda の認証に失敗したことを示します。このエラーは、以下が原因で発生する可能性があります。

  • mTLS 認証用のクライアント証明書を提供していない。

  • クライアント証明書を提供したが、ブローカーが mTLS を使用するように設定されていない。

  • クライアント証明書がブローカーに信頼されていない。

Provided certificate or private key is invalid (提供された証明書またはプライベートキーが無効です)

このエラーは、Amazon MSK コンシューマーが提供された証明書またはプライベートキーを使用できなかったことを示します。証明書とキーが PEM 形式を使用しており、プライベートキーの暗号化が PBES1 アルゴリズムを使用していることを確認してください。

ネットワーク構成

Lambda は、Amazon MSK クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。Lambda と AWS Security Token Service (AWS STS) に AWS PrivateLink VPC エンドポイントをデプロイすることをお勧めします。ポーラーは Lambda 関数に関連付けられた実行ロールを引き受けるため、AWS STS にアクセスする必要があります。Lambda は関数を呼び出すため、Lambda VPC エンドポイントにアクセスする必要があります。ブローカーに Lambda を認証するために Secrets Manager のシークレットを設定した場合、Secrets Manager 用に VPC エンドポイントのデプロイも行ってください。

または、MSK クラスターに関連付けられた VPC で、パブリックサブネットごとに 1 つの NAT ゲートウェイが含まれていることを確認します。詳細については、VPC に接続した関数のインターネットアクセスとサービスアクセス を参照してください。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – イベントソースに指定されたセキュリティグループに対して、Amazon MSK ブローカーポート (プレーンテキストの場合は 9092、TLS の場合は 9094、SASL の場合は 9096、IAM の場合は 9098) 上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。イベントソースに指定されたセキュリティグループに対して、Amazon MSK ブローカーポート (プレーンテキストの場合は 9092、TLS の場合は 9094、SASL の場合は 9096、IAM の場合は 9098) 上のすべてのトラフィックを許可します。

  • NAT ゲートウェイの代わりに VPC エンドポイントを使用している場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

注記

Amazon VPC の設定は、Amazon MSK API を使用して検出できます。create-event-source-mapping コマンドを使用してセットアップ中に設定する必要はありません。

ネットワークの設定方法の詳細については、AWS Compute Blog の Setting up AWS Lambda with an Apache Kafka cluster within a VPC を参照してください。

Amazon MSK をイベントソースとして追加

イベントソースマッピングを作成するには、Amazon MSK を、Lambda コンソール、AWSSDK、またはAWS Command Line Interface(AWS CLI) を使用して Lambda 関数のトリガーとして追加します。

このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。

前提条件

  • Amazon MSK クラスターと Kafka トピック 詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドGetting Started Using Amazon MSK を参照してください。

  • MSK クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

Amazon MSK トリガーの追加 (コンソール)

Amazon MSK クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Amazon MSK トリガーをLambda 関数 (コンソール) に追加するには

  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. MSKトリガータイプを選択します。

    2. [MSK cluster] (MSK クラスター) で、クラスターを選択します。

    3. [Batch size] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。

    4. [Topic name] (トピック名)に、Kafka トピックの名前を入力します。

    5. (オプション) [Starting position] (開始位置) で、[Latest] (最新) をクリックし、最新のレコードからストリームの読み込みを開始します。または [Trim horizon] (水平トリム) を選択し、利用可能な最も早いレコードから開始します。

    6. (オプション) [Authentication] (認証) で、MSK クラスターのブローカーで認証するためのシークレットキーを選択します。

    7. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

Amazon MSK トリガーの追加 (AWS CLI)

Lambda 関数の Amazon MSK トリガーを作成および表示するには、次の例の AWS CLI コマンドを使用します。

AWS CLI を使用したトリガーの作成

次の例では、create-event-source-mapping AWS CLI コマンドを使用して、Lambda 関数 my-kafka-function を、Kafka トピック AWSKafkaTopic にマップします。トピックの開始位置は LATEST に設定します。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-west-2:arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function

詳細については、API リファレンスドキュメント CreateEventSourceMapping を参照してください。

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK イベントソースの Auto Scaling

初めて Amazon MSK イベントソースを作成するときは、Lambda が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、Lambda は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは Lambda で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、Lambda はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットの Lambda 関数がオーバーロードすると、Lambda はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

Kafka トピックのスループットをモニタリングするには、関数がレコードを処理する間に Lambda が発行するオフセット遅延メトリクスを表示してください。

いくつの関数呼び出しが並行して発生しているかを確認するときは、関数の同時実行メトリクスも監視します。

Amazon CloudWatch メトリクス

Lambda は、関数がレコードを処理している間に OffsetLag メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと Lambda が処理した最後のレコードの間のオフセットの差分です。レコードが追加されてから関数がそれを処理するまでのレイテンシーを見積もるには、OffsetLag を使用できます。

OffsetLag の増加傾向は、関数に問題があることを示している可能性があります。詳細については、Lambda 関数のメトリクスの使用 を参照してください。

Amazon MSK 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MSK に適用されるのは一部のパラメータのみです。

Amazon MSK に適用されるイベントソースパラメータ
Parameter [Required] (必須) デフォルト メモ

BatchSize

N

100

最大: 10,000

有効

N

有効

EventSourceArn

Y

作成時にのみ設定可能

FunctionName

Y

SourceAccessConfigurations

N

認証情報なし

イベントソースの VPC 情報または SASL/SCRAM 認証情報

StartingPosition

Y

TRIM_HORIZON または LATEST

作成時にのみ設定可能

トピック

Y

カフカのトピック名

作成時にのみ設定可能