Amazon MSK での Lambda の使用
Amazon Managed Streaming for Apache Kafka (Amazon MSK) は、Apache Kafka
Lambda 関数を使用して、Kafka トピックのレコードを処理できます。関数をトリガーするには、イベントソースマッピングを使用します。イベントソースマッピングは、トピックから項目を読み取り、関数を呼び出す Lambda リソースです。Lambda は、複数のパーティションをまたいで新規レコードをポーリングし、ターゲット関数を同期的に呼び出します。
Amazon MSK イベントソースマッピングは以下の機能をサポートしています。
-
Amazon MSK がサポートするすべての Kafka バージョンとの完全な互換性。詳細については、Amazon Managed Streaming for Apache Kafka 開発者ガイドの「サポート対象の Apache Kafka バージョン」を参照してください。
-
プレーンテキストと TLS 暗号化ブローカーの両方。TLS ブローカーはプライベート認定権限ではサポートされていません。詳細については、Amazon Managed Streaming for Apache Kafka 開発者ガイドで「Amazon MSK 暗号化」の「転送中の暗号化」セクションを参照してください。
-
設定可能な開始位置とバッチサイズ。サポートされている設定可能な開始位置は
TRIM_HORIZON
とLATEST
です。これらはタイムスタンプベースではありません。
以下の Kafka の機能はサポートされていません。
-
認証 – 相互 TLS および SASL 認証はサポートされていません。
-
スキーマレジストリ – 独自のスキーマレジストリをホストできますが、この機能は Lambda API でサポートされていません。詳細については、Confluent ウェブサイトの「Schema Management
」を参照してください。
Lambda コンシューマーグループ
Amazon MSK とやり取りするために、Lambda は複数の Kafka トピックから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。Lambda によって作成されたコンシューマーグループはチェックポイントにも使用されます。各トピックパーティションでのグループの位置は、処理が成功した後に Kafka にコミットされます。
Lambda は 1 つ以上のパーティションのレコードを処理し、そのペイロードをターゲット関数に送信します。利用可能なレコードが増えると、Lambda は関数がトピックに追いつくまでバッチを処理し続けます。サポートされている関数の最大実行時間は 14 分です。
例 Amazon MSK レコードイベント
Received event:{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
"records": {
"AWSKafkaTopic-0": [
{
"topic": "AWSKafkaTopic",
"partition": 0,
"offset": 0,
"timestamp": 1595035749700,
"timestampType": "CREATE_TIME",
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
}
]
}
}
aws:kafka
リソースのキー値セットは base64 でエンコードされます。
実行ロールのアクセス許可
Amazon MSK クラスターからレコードを読み取るには、Lambda 関数の 実行ロール に次のアクセス許可が必要です。
AWS 管理ポリシー AWSLambdaMSKExecutionRole
には、これらのアクセス許可が含まれています。詳細については、「Lambda 機能用の AWS 管理ポリシー」を参照してください。
イベントソースとしてのトピックの設定
イベントソースマッピングを作成して、レコードを Kafka トピックから Lambda 関数に送信するように Lambda に指示します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、1 つの関数で複数のトピックの項目を処理したりできます。
関数を Amazon MSK から読み取るように設定するには、Lambda コンソールで MSK トリガーを作成します。
トリガーを作成するには
-
Lambda コンソールの [Functions (関数)
] ページを開きます。 -
関数を選択します。
-
[Designer] で、[Add trigger] を選択します。
-
トリガータイプを選択します。
-
必要なオプションを設定して [追加] を選択します。
Lambda では、Amazon MSK イベントソースの以下のオプションがサポートされています。
-
[MSK cluster (MSK クラスター)] – MSK クラスターを選択します。
-
[Topic name (トピック名)] – 使用する Kafka トピックを入力します。
-
[Starting position (開始位置)] (オプション) – ストリームでレコードの読み取りを開始する位置を入力します。
-
[Latest (最新)] – すべてのトピックのパーティションで最新の位置から読み取ります。
-
[水平トリム (Trim Horizon)] – すべてのトピックパーティションで最も古い位置から読み取ります。
既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。
-
-
[Enable trigger (トリガーの有効化)] – レコードの処理を停止するトリガーを無効にします。
トリガーを有効または無効にする (または削除する) には、デザイナーで MSK トリガーを選択します。トリガーを再設定するには、イベントソースマッピング API コマンドを使用します。
イベントソースマッピング API
AWS CLI または AWS SDK
AWS Command Line Interface (AWS CLI) を使用してイベントソースマッピングを作成するには、create-event-source-mapping
-
パブリックサブネットごとに 1 つの NAT ゲートウェイを設定します。詳細については、「VPC に接続した関数のインターネットアクセスとサービスアクセス」を参照してください。
-
Amazon VPC と Lambda との間に接続を作成します。詳細については、「Lambda の インターフェイス VPC エンドポイント の設定」を参照してください。
設定する Amazon VPC セキュリティグループルールには、最低でも以下の設定が必要です。
-
インバウンドルール – ソースとして指定されたセキュリティグループのすべてのポート上のすべてのトラフィックを許可します。
-
アウトバウンドルール – すべての送信先について、すべてのポート上のすべてのトラフィックを許可します。
Amazon VPC 設定は、Amazon MSK API を通じて検出可能であり、create-event-source-mapping
設定で定義する必要はありません。
以下の AWS CLI の例では、my-kafka-function
という名前の Lambda 関数を AWSKafkaTopic
という名前の Kafka トピックにマッピングして、開始位置を latest
に設定します。
$ aws lambda create-event-source-mapping --event-source-arn arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
--topics AWSKafkaTopic --starting-position LATEST --function-name my-kafka-function
{
"UUID": "6d9bce8e-836b-442c-8070-74e77903c815",
"BatchSize": 100,
"EventSourceArn": "arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2",
"FunctionArn": "arn:aws:lambda:us-west-2:111111111111:function:my-kafka-function",
"LastModified": 1580331394.363,
"State": "Creating",
"StateTransitionReason": "USER_INITIATED",
"LastProcessingResult": "OK",
"Topics": [ "AWSKafkaTopic" ]
}
get-event-source-mapping
$ aws lambda get-event-source-mapping --uuid 6d9bce8e-836b-442c-8070-74e77903c815
{
"UUID": "6d9bce8e-836b-442c-8070-74e77903c815"
"BatchSize": 100,
"EventSourceArn": "arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2",
"FunctionArn": "arn:aws:lambda:us-west-2:111111111111:function:my-kafka-function",
"LastModified": 1580331394.363,
"State": "Enabled",
"StateTransitionReason": "User action",
"LastProcessingResult": "OK",
"Topics": [ "AWSKafkaTopic" ],
}
イベントソースマッピングエラー
Lambda 関数で回復不可能なエラーが発生すると、Kafka トピックコンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、EventSourceMapping
から返された詳細の StateTransitionReason
フィールドをチェックし、以下のいずれかのコードを探します。
ESM_CONFIG_NOT_VALID
-
イベントソースマッピングの設定が無効です。
EVENT_SOURCE_AUTHN_ERROR
-
Lambda がイベントソースの認証に失敗しました。
EVENT_SOURCE_AUTHZ_ERROR
-
Lambda にイベントソースへのアクセス許可がありません。
FUNCTION_CONFIG_NOT_VALID
-
関数の設定が無効です。
レコードは、そのサイズが原因でドロップされた場合も、未処理になります。Lambda レコードのサイズ制限は 6 MB です。