Amazon MSK での Lambda の使用 - AWS Lambda

Amazon MSK での Lambda の使用

Amazon Managed Streaming for Apache Kafka (Amazon MSK) は、Apache Kafka をストリーミングデータの処理に使用するアプリケーションをビルドおよび実行できるマネージドサービスです。Apache Kafka は、概念的には Amazon Kinesis に似た分散ストリーミングプラットフォームです。Amazon MSK を使用すると、多くのソースからデータを収集し、複数のコンシューマーで処理できます。

Lambda 関数を使用して、Kafka トピックのレコードを処理できます。関数をトリガーするには、イベントソースマッピングを使用します。イベントソースマッピングは、トピックから項目を読み取り、関数を呼び出す Lambda リソースです。Lambda は、複数のパーティションをまたいで新規レコードをポーリングし、ターゲット関数を同期的に呼び出します。

Amazon MSK イベントソースマッピングは以下の機能をサポートしています。

  • Amazon MSK がサポートするすべての Kafka バージョンとの完全な互換性。詳細については、Amazon Managed Streaming for Apache Kafka 開発者ガイドの「サポート対象の Apache Kafka バージョン」を参照してください。

  • プレーンテキストと TLS 暗号化ブローカーの両方。TLS ブローカーはプライベート認定権限ではサポートされていません。詳細については、Amazon Managed Streaming for Apache Kafka 開発者ガイドで「Amazon MSK 暗号化」の「転送中の暗号化」セクションを参照してください。

  • 設定可能な開始位置とバッチサイズ。サポートされている設定可能な開始位置は TRIM_HORIZONLATEST です。これらはタイムスタンプベースではありません。

以下の Kafka の機能はサポートされていません。

  • 認証 – 相互 TLS および SASL 認証はサポートされていません。

  • スキーマレジストリ – 独自のスキーマレジストリをホストできますが、この機能は Lambda API でサポートされていません。詳細については、Confluent ウェブサイトの「Schema Management」を参照してください。

Lambda コンシューマーグループ

Amazon MSK とやり取りするために、Lambda は複数の Kafka トピックから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。Lambda によって作成されたコンシューマーグループはチェックポイントにも使用されます。各トピックパーティションでのグループの位置は、処理が成功した後に Kafka にコミットされます。

Lambda は 1 つ以上のパーティションのレコードを処理し、そのペイロードをターゲット関数に送信します。利用可能なレコードが増えると、Lambda は関数がトピックに追いつくまでバッチを処理し続けます。サポートされている関数の最大実行時間は 14 分です。

例 Amazon MSK レコードイベント

Received event:{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", "records": { "AWSKafkaTopic-0": [ { "topic": "AWSKafkaTopic", "partition": 0, "offset": 0, "timestamp": 1595035749700, "timestampType": "CREATE_TIME", "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" } ] } }
注記

aws:kafka リソースのキー値セットは base64 でエンコードされます。

実行ロールのアクセス許可

Amazon MSK クラスターからレコードを読み取るには、Lambda 関数の 実行ロール に次のアクセス許可が必要です。

AWS 管理ポリシー AWSLambdaMSKExecutionRole には、これらのアクセス許可が含まれています。詳細については、「Lambda 機能用の AWS 管理ポリシー」を参照してください。

イベントソースとしてのトピックの設定

イベントソースマッピングを作成して、レコードを Kafka トピックから Lambda 関数に送信するように Lambda に指示します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、1 つの関数で複数のトピックの項目を処理したりできます。

関数を Amazon MSK から読み取るように設定するには、Lambda コンソールで MSK トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. 関数を選択します。

  3. [Designer] で、[Add trigger] を選択します。

  4. トリガータイプを選択します。

  5. 必要なオプションを設定して [追加] を選択します。

Lambda では、Amazon MSK イベントソースの以下のオプションがサポートされています。

  • [MSK cluster (MSK クラスター)] – MSK クラスターを選択します。

  • [Topic name (トピック名)] – 使用する Kafka トピックを入力します。

  • [Starting position (開始位置)] (オプション) – ストリームでレコードの読み取りを開始する位置を入力します。

    • [Latest (最新)] – すべてのトピックのパーティションで最新の位置から読み取ります。

    • [水平トリム (Trim Horizon)] – すべてのトピックパーティションで最も古い位置から読み取ります。

    既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。

  • [Enable trigger (トリガーの有効化)] – レコードの処理を停止するトリガーを無効にします。

トリガーを有効または無効にする (または削除する) には、デザイナーMSK トリガーを選択します。トリガーを再設定するには、イベントソースマッピング API コマンドを使用します。

イベントソースマッピング API

AWS CLI または AWS SDK でイベントソースを管理するには、次の API オペレーションを使用できます。

AWS Command Line Interface (AWS CLI) を使用してイベントソースマッピングを作成するには、create-event-source-mapping コマンドを使用します。Amazon MSK ブローカーからレコードをフェッチするには、MSK クラスターに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) へのアクセスが必要です。Amazon VPC アクセス要件を満たすには、次のいずれかを試すことができます。

設定する Amazon VPC セキュリティグループルールには、最低でも以下の設定が必要です。

  • インバウンドルール – ソースとして指定されたセキュリティグループのすべてのポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先について、すべてのポート上のすべてのトラフィックを許可します。

Amazon VPC 設定は、Amazon MSK API を通じて検出可能であり、create-event-source-mapping 設定で定義する必要はありません。

以下の AWS CLI の例では、my-kafka-function という名前の Lambda 関数を AWSKafkaTopic という名前の Kafka トピックにマッピングして、開始位置を latest に設定します。

$ aws lambda create-event-source-mapping --event-source-arn arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 --topics AWSKafkaTopic --starting-position LATEST --function-name my-kafka-function { "UUID": "6d9bce8e-836b-442c-8070-74e77903c815", "BatchSize": 100, "EventSourceArn": "arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2", "FunctionArn": "arn:aws:lambda:us-west-2:111111111111:function:my-kafka-function", "LastModified": 1580331394.363, "State": "Creating", "StateTransitionReason": "USER_INITIATED", "LastProcessingResult": "OK", "Topics": [ "AWSKafkaTopic" ] }

get-event-source-mapping コマンドを使用して、リソースの現在のステータスを表示します。

$ aws lambda get-event-source-mapping --uuid 6d9bce8e-836b-442c-8070-74e77903c815 { "UUID": "6d9bce8e-836b-442c-8070-74e77903c815" "BatchSize": 100, "EventSourceArn": "arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2", "FunctionArn": "arn:aws:lambda:us-west-2:111111111111:function:my-kafka-function", "LastModified": 1580331394.363, "State": "Enabled", "StateTransitionReason": "User action", "LastProcessingResult": "OK", "Topics": [ "AWSKafkaTopic" ], }

イベントソースマッピングエラー

Lambda 関数で回復不可能なエラーが発生すると、Kafka トピックコンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、EventSourceMapping から返された詳細の StateTransitionReason フィールドをチェックし、以下のいずれかのコードを探します。

ESM_CONFIG_NOT_VALID

イベントソースマッピングの設定が無効です。

EVENT_SOURCE_AUTHN_ERROR

Lambda がイベントソースの認証に失敗しました。

EVENT_SOURCE_AUTHZ_ERROR

Lambda にイベントソースへのアクセス許可がありません。

FUNCTION_CONFIG_NOT_VALID

関数の設定が無効です。

レコードは、そのサイズが原因でドロップされた場合も、未処理になります。Lambda レコードのサイズ制限は 6 MB です。