Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する - AWS Lambda

Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する

Kinesis イベントソースマッピングのエラー処理は、エラーが関数の呼び出し前に発生するか、関数の呼び出し中に発生するかによって異なります。

  • 呼び出し前: スロットリングまたはその他の問題によって Lambda イベントソースマッピングが関数を呼び出すことができない場合、レコードの有効期限が切れるか、イベントソースマッピングで設定された最大有効期間 (MaximumRecordAgeInSeconds) を超えるまで再試行します。

  • 呼び出し中: 関数は呼び出されたがエラーが返された場合、Lambda はレコードの有効期限が切れるか、最大有効期間 (MaximumRecordAgeInSeconds) を超えるか、設定された再試行クォータ (MaximumRetryAttempts) に達するまで再試行します。関数エラーの場合、BisectBatchOnFunctionError を設定することもできます。これは、失敗したバッチを 2 つの小さなバッチに分割し、不良レコードを分離してタイムアウトを回避します。バッチを分割しても、再試行クォータは消費されません。

エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 週間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。

失敗した呼び出しの送信先の設定

失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。任意の Amazon SNS トピックまたは Amazon SQS キューを送信先として設定できます。実行ロールには、送信先に対するアクセス許可が必要です。

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [ソース] には、[イベントソースマッピング呼び出し] を選択します。

  5. [イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。

  6. [条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

  7. [送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

  8. [送信先] で、リソースを選択します。

  9. [Save] を選択します。

AWS Command Line Interface (AWS CLI) を使用して障害発生時の送信先を設定することもできます。例えば、次の create-event-source-mapping コマンドは、SQS を障害発生時の送信先として持つイベントソースマッピングを MyFunction に追加します。

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

次の update-event-source-mapping コマンドは、2 回の再試行後、またはレコードが 1 時間以上経過した場合に失敗した呼び出しレコードを SNS 送信先に送信するように、イベントソースマッピングを更新します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、get-event-source-mapping コマンドを使用します。

送信先を削除するには、destination-config パラメータの引数として空の文字列を指定します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

以下の例は、Kinesis イベントソース呼び出しが失敗した場合に Lambda が SQS キューまたは SNS トピックに送信する内容を示しています。Lambda はこれらの送信先タイプにメタデータのみを送信するため、元のレコード全体を取得するには、streamArnshardIdstartSequenceNumberendSequenceNumber の各フィールドを使用します。KinesisBatchInfo プロパティに表示されるフィールドはすべて常に存在します。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。