チュートリアル: Amazon MSK イベントソースマッピングを使用して Lambda 関数を呼び出す - AWS Lambda

チュートリアル: Amazon MSK イベントソースマッピングを使用して Lambda 関数を呼び出す

本チュートリアルでは、次の手順を実行します。

  • 既存の Amazon MSK クラスターと同じ AWS アカウントに Lambda 関数を作成します。

  • Amazon MSK と通信するように Lambda のネットワークと認証を設定します。

  • Lambda Amazon MSK イベントソースマッピングを設定します。これにより、イベントがトピックに出現したときに Lambda 関数が実行されます。

これらのステップを完了したら、イベントが Amazon MSK に送信されたときに、独自のカスタム Lambda コードを使用してそれらのイベントを自動的に処理するための Lambda 関数を設定できるようになります。

この機能で何ができますか?

ソリューションの例: MSK イベントソースマッピングを使用して、ライブスコアを顧客に配信します。

次のシナリオを考えてみましょう。あなたの会社は、顧客がスポーツの試合などのライブイベントに関する情報を表示できるウェブアプリケーションをホストしています。試合の情報更新は、Amazon MSK の Kafka トピックを通じてチームに提供されます。MSK トピックから取得した更新情報を使用して、開発中のアプリケーション内で顧客にライブイベントの更新ビューを提供するソリューションを設計する必要があります。次の設計アプローチを決定しました。クライアントアプリケーションは、AWS でホストされているサーバーレスバックエンドと通信します。クライアントは、Amazon API Gateway WebSocket API を使用して WebSocket セッション経由で接続します。

このソリューションでは、MSK イベントを読み取り、いくつかのカスタムロジックを実行してアプリケーションレイヤーのイベントを準備し、その情報を API Gateway API に転送するコンポーネントが必要です。このコンポーネントは、Lambda 関数でカスタムロジックを指定し、それを AWS Lambda Amazon MSK イベントソースマッピングで呼び出すことで、AWS Lambda を使用して実装できます。

Amazon API Gateway WebSocket API を使用したソリューションの実装の詳細については、API Gateway ドキュメントの「WebSocket API のチュートリアル」を参照してください。

前提条件

以下の事前設定されたリソースを持つ AWS アカウント。

これらの前提条件を満たすには、Amazon MSK ドキュメントの「Getting started using Amazon MSK」を参照してください。

  • Amazon MSK クラスター 「Getting started using Amazon MSK」の「Create an Amazon MSK cluster」を参照してください。

  • 以下の設定を行います。

    • クラスターのセキュリティ設定で [IAM ロールベースの認証][有効] になっていることを確認します。これにより、必要な Amazon MSK リソースにのみアクセスするように Lambda 関数を制限することで、セキュリティが向上します。これは、新しい Amazon MSK クラスターではデフォルトで有効になっています。

    • クラスターネットワーク設定で [パブリックアクセス] がオフになっていることを確認します。Amazon MSK クラスターのインターネットへのアクセスを制限すると、データを処理する仲介者の数が制限されることでセキュリティが向上します。これは、新しい Amazon MSK クラスターではデフォルトで有効になっています。

  • このソリューションに使用する Amazon MSK クラスターの Kafka トピック。「Getting started using Amazon MSK」の「Create a topic」を参照してください。

  • Kafka クラスターから情報を取得し、テスト用に Kafka イベントをトピックに送信するように設定された Kafka 管理ホスト。例えば、Kafka 管理 CLI と Amazon MSK IAM ライブラリがインストールされた Amazon EC2 インスタンスなどです。「Getting started using Amazon MSK」の「Create a client machine」を参照してください。

これらのリソースを設定したら、AWS アカウントから次の情報を収集して、続行する準備ができていることを確認します。

  • Amazon MSK クラスターの名前。この情報は、Amazon MSK コンソールで確認できます。

  • クラスター UUID。Amazon MSK クラスターの ARN の一部であり、Amazon MSK コンソールで確認できます。この情報を確認するには、Amazon MSK ドキュメントの「クラスターの一覧表示」の手順に従います。

  • Amazon MSK クラスターに関連付けられているセキュリティグループ。この情報は、Amazon MSK コンソールで確認できます。次のステップでは、これらを clusterSecurityGroups と呼びます。

  • Amazon MSK クラスターを含む Amazon VPC の ID。この情報を見つけるには、Amazon MSK コンソールで Amazon MSK クラスターに関連付けられたサブネットを特定し、Amazon VPC コンソールでそのサブネットに関連付けられた Amazon VPC を特定します。

  • ソリューションで使用される Kafka トピックの名前。この情報を確認するには、Kafka 管理ホストから Kafka topics CLI を使用して Amazon MSK クラスターを呼び出します。トピック CLI の詳細については、Kafka ドキュメントの「Adding and removing topics」を参照してください。

  • Kafka トピックのコンシューマーグループの名前。Lambda 関数での使用に適しています。このグループは Lambda によって自動的に作成できるため、Kafka CLI で作成する必要はありません。コンシューマーグループを管理する必要がある場合、コンシューマーグループ CLI の詳細については、Kafka ドキュメントの「Managing Consumer Groups」を参照してください。

AWS アカウントに次のアクセス許可が必要です。

  • Lambda 関数を作成および管理するためのアクセス許可。

  • IAM ポリシーを作成し、それを Lambda 関数に関連付けるためのアクセス許可。

  • Amazon MSK クラスターをホストする Amazon VPC で Amazon VPC エンドポイントを作成し、ネットワーク設定を変更するためのアクセス許可。

Amazon MSK と通信するように Lambda のネットワーク接続を設定する

AWS PrivateLink を使用して Lambda と Amazon MSK を接続します。これを行うには、Amazon VPC コンソールでインターフェイス Amazon VPC エンドポイントを作成します。ネットワーク設定の詳細については、「ネットワーク構成」を参照してください。

Amazon MSK イベントソースマッピングが Lambda 関数に代わって実行される際には、Lambda 関数の実行ロールを引き受けます。この IAM ロールは、マッピングに対して、IAM で保護されたリソース (Amazon MSK クラスターなど) へのアクセスを許可します。コンポーネントは実行ロールを共有しますが、次の図に示すように、Amazon MSK マッピングと Lambda 関数には、それぞれのタスクに対して個別の接続要件があります。

Lambda 関数はクラスターをポーリングし、AWS STS を使用して Lambda と通信します。

イベントソースマッピングは、Amazon MSK クラスターセキュリティグループに属します。このネットワークステップでは、Amazon MSK クラスター VPC から Amazon VPC エンドポイントを作成して、イベントソースマッピングを Lambda および STS サービスに接続します。Amazon MSK クラスターセキュリティグループからのトラフィックを受け入れるように、これらのエンドポイントを保護します。次に、Amazon MSK クラスターのセキュリティグループを調整して、イベントソースマッピングが Amazon MSK クラスターと通信できるようにします。

AWS Management Consoleを使用して、次の手順を設定できます。

Lambda と Amazon MSK を接続するようにインターフェイス Amazon VPC エンドポイントを設定するには
  1. インターフェイス Amazon VPC エンドポイントのセキュリティグループ endpointSecurityGroup を作成します。これにより、clusterSecurityGroups からのポート 443 でのインバウンド TCP トラフィックが許可されます。Amazon EC2 ドキュメントの「セキュリティグループの作成」の手順に従って、セキュリティグループを作成します。次に、Amazon EC2 ドキュメントの「セキュリティグループへのルールの追加」の手順に従って、適切なルールを追加します。

    次の情報を使用してセキュリティグループを作成します。

    インバウンドルールを追加する際に、clusterSecurityGroups 内のセキュリティグループごとにルールを作成します。各ルールは、次のように作成します。

    • [タイプ][HTTPS] を選択します。

    • [ソース]clusterSecurityGroups のいずれかを選択します。

  2. Lambda サービスを Amazon MSK クラスターを含む Amazon VPC に接続するエンドポイントを作成します。「インターフェイスエンドポイントの作成」の手順に従います。

    次の情報を使用してインターフェイスエンドポイントを作成します。

    • [サービス名]com.amazonaws.regionName.lambda を選択します。ここで、regionName が Lambda 関数をホストします。

    • [VPC] で、Amazon MSK クラスターを含む Amazon VPC を選択します。

    • [セキュリティグループ] で、前に作成した endpointSecurityGroup を選択します。

    • [サブネット] で、Amazon MSK クラスターをホストするサブネットを選択します。

    • [ポリシー] で、次のポリシードキュメントを指定します。これにより、Lambda サービスプリンシパルが lambda:InvokeFunction アクションに使用するためにエンドポイントを保護します。

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • [DNS 名を有効化] が設定されたままであることを確認します。

  3. AWS STS サービスを Amazon MSK クラスターを含む Amazon VPC に接続するエンドポイントを作成します。「インターフェイスエンドポイントの作成」の手順に従います。

    次の情報を使用してインターフェイスエンドポイントを作成します。

    • [サービス名] で AWS STS を選択します。

    • [VPC] で、Amazon MSK クラスターを含む Amazon VPC を選択します。

    • [セキュリティグループ]endpointSecurityGroup を選択します。

    • [サブネット] で、Amazon MSK クラスターをホストするサブネットを選択します。

    • [ポリシー] で、次のポリシードキュメントを指定します。これにより、Lambda サービスプリンシパルが sts:AssumeRole アクションに使用するためにエンドポイントを保護します。

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • [DNS 名を有効化] が設定されたままであることを確認します。

  4. Amazon MSK クラスターに関連付けられている各セキュリティグループに対して、つまり clusterSecurityGroups で、次のことを許可します。

    • すべての clusterSecurityGroups に対して、ポート 9098 でのすべてのインバウンドおよびアウトバウンド TCP トラフィックを許可します。これには、clusterSecurityGroups 内でのトラフィックも含まれます。

    • ポート 443 でのすべてのアウトバウンド TCP トラフィックを許可します。

    このトラフィックの一部は、デフォルトのセキュリティグループルールで許可されているため、クラスターが単一のセキュリティグループにアタッチされており、そのグループにデフォルトのルールがある場合は、追加のルールは必要ありません。セキュリティグループルールを調整するには、Amazon EC2 ドキュメントの「セキュリティグループへのルールの追加」の手順に従います。

    次の情報を使用して、セキュリティグループにルールを追加します。

    • ポート 9098 のインバウンドルールまたはアウトバウンドルールごとに、以下を指定します。

      • [タイプ] で、[カスタム TCP] を選択します。

      • [ポート範囲] で 9098 を指定します。

      • [ソース]clusterSecurityGroups のいずれかを指定します。

    • ポート 443 のインバウンドルールごとに、[タイプ][HTTPS] を選択します。

Lambda が Amazon MSK トピックから読み取るための IAM ロールを作成する

Lambda が Amazon MSK トピックから読み取るための認証要件を特定し、ポリシーで定義します。ロール lambdaAuthRole を作成します。このロールは、Lambda がこれらのアクセス許可を使用することを承認します。kafka-cluster IAM アクションを使用して Amazon MSK クラスターに対するアクションを承認します。次に、Lambda が Amazon MSK クラスターを検出して接続するために必要な Amazon MSK の kafka アクションと Amazon EC2 アクションを実行すること、および Lambda が実行した内容をログに記録できるように CloudWatch アクションを実行することを承認します。

Lambda が Amazon MSK から読み取るための認証要件を記述するには
  1. IAM ポリシードキュメント (JSON ドキュメント) である clusterAuthPolicy を作成します。これにより、Lambda は Kafka コンシューマーグループを使用して、Amazon MSK クラスター内の Kafka トピックからデータを読み取ることができるようになります。Lambda では、読み取り時に Kafka コンシューマーグループを設定する必要があります。

    前提条件に合わせて次のテンプレートを変更します。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    詳細については、「IAM ロールベースの認証」を参照してください。ポリシーは、次のように作成します。

    • region および account-id には、Amazon MSK クラスターをホストするものを指定します。

    • mskClusterName には、Amazon MSK クラスターの名前を指定します。

    • cluster-uuid には、Amazon MSK クラスターの ARN 内の UUID を指定します。

    • mskTopicName には、Kafka トピックの名前を指定します。

    • mskGroupName には、Kafka コンシューマーグループの名前を指定します。

  2. Lambda が Amazon MSK クラスターを検出して接続し、それらのイベントをログに記録するために必要となる、Amazon MSK、Amazon EC2、および CloudWatch のアクセス許可を特定します。

    AWSLambdaMSKExecutionRole マネージドポリシーは、必要なアクセス許可を許容的に定義します。これは、次の手順で使用します。

    本番環境で AWSLambdaMSKExecutionRole を評価し、最小特権の原則に基づいて実行ロールポリシーを制限します。その後、ロールに対して、このマネージドポリシーを置き換えるポリシーを作成します。

IAM ポリシー言語の詳細については、IAM ドキュメントを参照してください。

ポリシードキュメントを作成したので、IAM ポリシーを作成してロールにアタッチすることができます。これを行うには、コンソールを使用して次の手順を実行します。

ポリシードキュメントから IAM ポリシーを作成するには
  1. AWS Management Console にサインインして、IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. 左側のナビゲーションペインで、[ポリシー] を選択します。

  3. [Create policy] を選択します。

  4. [ポリシーエディタ] セクションで、[JSON] オプションを選択します。

  5. clusterAuthPolicy を貼り付けます。

  6. ポリシーにアクセス権限を追加し終えたら、[次へ] を選択します。

  7. [確認と作成] ページで、作成するポリシーの [ポリシー名] と [説明] (オプション) を入力します。[このポリシーで定義されているアクセス許可] を確認して、ポリシーによって付与されたアクセス許可を確認します。

  8. [ポリシーの作成] をクリックして、新しいポリシーを保存します。

詳細については、IAM ドキュメントの「IAM ポリシーの作成」を参照してください。

適切な IAM ポリシーを作成したので、ロールを作成してそれらのポリシーをアタッチします。これを行うには、コンソールを使用して次の手順を実行します。

IAM コンソールで実行ロールを作成するには
  1. IAM コンソールの [Roles (ロール)] ページを開きます。

  2. [ロールの作成] を選択します。

  3. [信頼されたエンティティタイプ] から、[AWS サービス] を選択します。

  4. [ユースケース] で、Lambda を選択します。

  5. [Next] を選択します。

  6. 次のポリシーを指定します。

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. [Next] を選択します。

  8. [ロール名]lambdaAuthRole と入力し、[ロールの作成] を選択します。

詳細については、「実行ロールを使用した Lambda 関数のアクセス許可の定義」を参照してください。

Amazon MSK トピックから読み取る Lambda 関数を作成する

IAM ロールを使用するように設定された Lambda 関数を作成します。コンソールを使用して Lambda 関数を作成できます。

認証設定を使用して Lambda 関数を作成するには
  1. Lambda コンソールを開き、ヘッダーから [関数の作成] を選択します。

  2. [ゼロから作る] を選択します。

  3. [関数名] で、任意の適切な名前を指定します。

  4. [ランタイム] で、[最新のサポート対象] バージョンの Node.js を選択すると、このチュートリアルで提供されるコードを使用できます。

  5. [デフォルトの実行ロールの変更] を選択します。

  6. [既存のロールを使用] を選択します。

  7. [既存のロール] で、lambdaAuthRole を選択します。

本番環境では、通常、Lambda 関数が Amazon MSK のイベントを適切に処理できるようにするために、実行ロールにさらにポリシーを追加する必要があります。ロールにポリシーを追加する方法の詳細については、IAM ドキュメントの「ID アクセス許可の追加または削除」を参照してください。

Lambda 関数へのイベントソースマッピングを作成する

Amazon MSK イベントソースマッピングは、該当する Amazon MSK イベントが発生したときに Lambda を呼び出すために必要な情報を Lambda サービスに提供します。コンソールを使用して Amazon MSK マッピングを作成できます。Lambda トリガーを作成すると、イベントソースマッピングは自動的に設定されます。

Lambda トリガー (およびイベントソースマッピング) を作成するには
  1. Lambda 関数の概要ページに移動します。

  2. 関数の概要セクションで、左下の [トリガーの追加] を選択します。

  3. [ソースの選択] ドロップダウンで、[Amazon MSK] を選択します。

  4. [認証] を設定しないでください。

  5. [MSK クラスター] で、クラスターの名前を選択します。

  6. [バッチサイズ] で、1 を入力します。これは、この機能のテストを容易にするためのステップであり、本番環境で理想的な値ではありません。

  7. [トピック名] で、Kafka トピックの名前を指定します。

  8. [コンシューマーグループ ID] で、Kafka コンシューマーグループの ID を指定します。

ストリーミングデータを読み取るために Lambda 関数を更新する

Lambda は、イベントメソッドパラメータを使用して Kafka イベントに関する情報を提供します。Amazon MSK イベントの構造の例については、「 イベントの例」を参照してください。Lambda によって転送された Amazon MSK イベントを解釈する方法を理解したら、これらのイベントによって提供される情報を使用するように Lambda 関数コードを変更できます。

テスト目的で Lambda Amazon MSK イベントの内容をログに記録するには、Lambda 関数に次のコードを指定します。

Java
SDK for Java 2.x
注記

GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Java を使用した Lambda での Amazon MSK イベントの消費。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK for JavaScript (v3)
注記

GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

JavaScript を使用した Lambda での Amazon MSK イベントの消費。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }

コンソールを使用して Lambda に関数コードを指定できます。

Lambda 関数コードを更新するには
  1. Lambda 関数の概要ページに移動します。

  2. [コード] タブを選択します。

  3. 指定されたコードを [コードソース] IDE に入力します。

  4. [コードソース] ナビゲーションバーで、[デプロイ] を選択します。

Lambda 関数をテストして、Amazon MSK トピックに接続されていることを確認します。

CloudWatch イベントログを調べることで、Lambda がイベントソースによって呼び出されているかどうかを確認できるようになりました。

Lambda 関数が呼び出されているかどうかを確認するには
  1. Kafka 管理ホストを使用し、kafka-console-producer CLI を使用して Kafka イベントを生成します。詳細については、Kafka ドキュメントの「Write some events into the topic」を参照してください。前のステップで定義したイベントソースマッピングのバッチサイズで定義されたバッチを埋めるのに十分なイベントを送信してください。そうしないと、Lambda は追加の情報が呼び出されるまで待機します。

  2. 関数が実行されると、Lambda はその結果を CloudWatch に書き込みます。コンソールで、Lambda 関数の詳細ページに移動します。

  3. [Configuration (設定)] タブを選択します。

  4. サイドバーから、[モニタリングおよび運用ツール] を選択します。

  5. [ロギング設定][CloudWatch ロググループ] を特定します。ロググループは /aws/lambda で始まります。ロググループへのリンクを選択します。

  6. CloudWatch コンソールの [ログイベント] で、Lambda がログストリームに送信したログイベントがないかを調べます。次の図のように、Kafka イベントからのメッセージを含むログイベントがあるかどうかを確認します。存在する場合は、Lambda イベントソースマッピングを使用して Lambda 関数を Amazon MSK に正常に接続できています。

    指定したコードによって抽出されたイベント情報を示す CloudWatch のログイベント。