Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する
Kinesis イベントソースマッピングのエラー処理は、エラーが関数の呼び出し前に発生するか、関数の呼び出し中に発生するかによって異なります。
-
呼び出し前: スロットリングまたはその他の問題によって Lambda イベントソースマッピングが関数を呼び出すことができない場合、レコードの有効期限が切れるか、イベントソースマッピングで設定された最大有効期間 (MaximumRecordAgeInSeconds) を超えるまで再試行します。
-
呼び出し中: 関数は呼び出されたがエラーが返された場合、Lambda はレコードの有効期限が切れるか、最大有効期間 (MaximumRecordAgeInSeconds) を超えるか、設定された再試行クォータ (MaximumRetryAttempts) に達するまで再試行します。関数エラーの場合、BisectBatchOnFunctionError を設定することもできます。これは、失敗したバッチを 2 つの小さなバッチに分割し、不良レコードを分離してタイムアウトを回避します。バッチを分割しても、再試行クォータは消費されません。
エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 週間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。
失敗した呼び出しの送信先の設定
失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。任意の Amazon SNS トピックまたは Amazon SQS キューを送信先として設定できます。実行ロールには、送信先に対するアクセス許可が必要です。
-
SQS 送信先の場合: sqs:SendMessage
-
SNS 送信先の場合: sns:Publish
障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。
Lambda コンソールの [関数ページ]
を開きます。 -
関数を選択します。
-
[機能の概要 ] で、[送信先を追加 ] を選択します。
-
[ソース] には、[イベントソースマッピング呼び出し] を選択します。
-
[イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。
-
[条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。
-
[送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。
-
[送信先] で、リソースを選択します。
-
[Save] を選択します。
AWS Command Line Interface (AWS CLI) を使用して障害発生時の送信先を設定することもできます。例えば、次の create-event-source-mappingMyFunction
に追加します。
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
次の update-event-source-mapping
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、get-event-source-mapping
送信先を削除するには、destination-config
パラメータの引数として空の文字列を指定します。
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
以下の例は、Kinesis イベントソース呼び出しが失敗した場合に Lambda が SQS キューまたは SNS トピックに送信する内容を示しています。Lambda はこれらの送信先タイプにメタデータのみを送信するため、元のレコード全体を取得するには、streamArn
、shardId
、startSequenceNumber
、endSequenceNumber
の各フィールドを使用します。KinesisBatchInfo
プロパティに表示されるフィールドはすべて常に存在します。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。