セルフマネージド Apache Kafka イベントソースの破棄されたバッチのキャプチャ
失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。任意の Amazon SNS トピック、Amazon SQS キュー、または S3 バケットを送信先として設定できます。実行ロールには、送信先に対するアクセス許可が必要です。
-
SQS 送信先の場合: sqs:SendMessage
-
SNS 送信先の場合: sns:Publish
-
S3 バケット送信先の場合: s3:PutObject および s3:ListBuckets
障害発生時の送信先サービスの VPC エンドポイントを Apache Kafka クラスター VPC 内にデプロイする必要があります。
さらに、送信先に KMS キーを設定した場合、Lambda には送信先のタイプに応じて以下のアクセス許可が必要です。
-
S3 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:GenerateDataKey が必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。
-
SQS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decrypt および kms:GenerateDataKey が必要です。KMS キーと SQS キューの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。
-
SNS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decrypt と kms:GenerateDataKey が必要です。KMS キーと SNS トピックの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。
セルフマネージド Apache Kafka イベントソースマッピングの障害発生時の送信先の設定
障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。
Lambda コンソールの [関数ページ]
を開きます。 -
関数を選択します。
-
[機能の概要 ] で、[送信先を追加 ] を選択します。
-
[ソース] には、[イベントソースマッピング呼び出し] を選択します。
-
[イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。
-
[条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。
-
[送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。
-
[送信先] で、リソースを選択します。
-
[Save] を選択します。
AWS CLI を使用して障害発生時の送信先を設定することもできます。例えば、次の create-event-source-mappingMyFunction
に追加します。
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
以下の update-event-source-mappinguuid
に関連付けられたイベントソースに追加します。
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
送信先を削除するには、destination-config
パラメータの引数として空の文字列を指定します。
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
SNS および SQS の呼び出しレコードの例
以下の例は、Kafka イベントソース呼び出しが失敗した場合に Lambda が SNS トピックまたは SQS キューの送信先に送信する内容を示しています。recordsInfo
の各キーには、Kafka トピックとパーティションの両方がハイフンで区切られて含まれています。例えば、キー "Topic-0"
の場合、Topic
は Kafka トピック、0
はパーティションです。各トピックとパーティションについて、オフセットとタイムスタンプデータを使用して元の呼び出しレコードを検索できます。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
S3 送信先の呼び出しレコードの例
S3 の送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。以下の例は、Kafka イベントソース呼び出しが失敗した場合に、Lambda が S3 バケットの送信先に送信することを示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、payload
フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
ヒント
送信先バケットで S3 バージョニングを有効にすることをお勧めします。