AWS Lambda を Amazon Kinesis に使用する - AWS Lambda

AWS Lambda を Amazon Kinesis に使用する

AWS Lambda 関数を使用して、Amazon Kinesis データストリームのレコードを処理できます。

Kinesis データストリームは、シャードのセットです。各シャードには、一連のデータレコードが含まれます。コンシューマーは、Kinesis データストリームからのデータを処理するアプリケーションです。Lambda 関数を共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。

標準イテレーターの場合、Lambda は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成できます。ストリームコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。専用のスループットは、多数のアプリケーションで同じデータを読み取っている場合や、大きなレコードでストリームを再処理する場合に役立ちます。Kinesis は HTTP/2 経由でレコードを Lambda にプッシュします。

Kinesis データストリームの詳細については、「Amazon Kinesis Data Streams からのデータの読み取り」を参照してください。

Lambda は、データストリームからレコードを読み取り、ストリームレコードを含むイベントを使用して関数を同期的に呼び出します。Lambda は、バッチのレコードを読み取り、関数を呼び出してバッチからレコードを処理します。

例 Kinesis レコードイベント

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

デフォルトでは、Lambda はストリームからレコードが利用可能になるとすぐに、関数を呼び出します。Lambda がストリームから読み込むバッチに 1 つのレコードしか含まれていない場合、Lambda は関数に 1 つのレコードのみを送信します。少数のレコードで関数を呼び出さないようにするには、バッチウィンドウを設定して、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。Lambda は、関数を呼び出す前に、完全なバッチを収集するか、バッチウィンドウの有効期限が切れるまで、ストリームから継続してレコードを読み取ります。

関数がエラーを返した場合、処理が成功するか、データの有効期限が切れるまで Lambda はバッチを再試行します。シャードの停止を避けるには、より小さいバッチサイズで再試行する、再試行数を制限する、または古いレコードを破棄するよう、イベントソースマッピングを設定します。破棄されたイベントを保持するには、失敗したバッチの詳細を SQS キューまたは SNS トピックに送信するよう、イベントソースマッピングを設定できます。

各シャードから複数のバッチを並行して処理し、同時実行を増やすこともできます。Lambda は各シャードで同時に最大 10 個のバッチを処理できます。シャードごとの同時実行バッチ数を増やした場合、Lambda はパーティションキーレベルで順序どおりの処理を継続します。

Kinesis または DynamoDB データストリームの 1 つのシャードを複数の Lambda 呼び出しで同時に処理するように、ParallelizationFactor 設定を指定します。Lambda が 1 (デフォルト) から 10 までの並列化係数を介してシャードからポーリングする同時バッチの数を指定できます。例えば、ParallelizationFactor が 2 に設定されている場合、最大 200 の同時 Lambda 呼び出しを使用して 100 個の Kinesis データシャードを処理できます。これは、データ量が揮発性で IteratorAge が高い場合に処理スループットを拡大するのに役立ちます。詳細については、「Kinesis および DynamoDB イベントソース用の新しい AWS Lambda スケーリングコントロール」を参照してください。

データストリームと関数の設定

Lambda 関数は、データストリームのコンシューマーアプリケーションです。シャードごとに 1 つのレコードのバッチを一度に処理します。Lambda 関数はデータストリーム (標準イテレーター) か、ストリームのコンシューマー (拡張ファンアウト) にマッピングすることができます。

Lambda は、レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします。利用可能なレコードが増えると、Lambda は関数がストリームに追いつくまでバッチを処理し続けます。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成します。拡張ファンアウトを使用するコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。ストリームのコンシューマーは HTTP/2 を使用して、長時間にわたる接続とリクエストヘッダーの圧縮でレコードを Lambda にプッシュすることによってレイテンシーを短縮します。ストリームコンシューマーは、Kinesis RegisterStreamConsumer API を使用して作成できます。

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

次のような出力が表示されます。

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

関数がレコードを処理する速度を上げるには、データストリームにシャードを追加します。Lambda によって各シャードのレコードが順番に処理されます。関数からエラーが返された場合、シャードのさらなるレコードの処理は停止されます。シャードが増えると、一度に処理されるバッチが増え、同時実行のエラーの影響を下げることができます。

同時実行のバッチの合計分を処理できるように関数をスケールアップできない場合は、関数のクォータ引き上げをリクエストするか、同時実行数を予約します。

実行ロールのアクセス許可

Lambda には、Kinesis データストリームに関連するリソースを管理するための以下のアクセス許可が必要です。これらのアクセス許可を関数の 実行ロール に追加します。

AWSLambdaKinesisExecutionRole 管理ポリシーには、これらのアクセス許可が含まれています。詳細については、「AWS Lambda 実行ロール」を参照してください。

キューまたはトピックに失敗したバッチのレコードを送信するには、関数に追加のアクセス権限が必要です。各送信先サービスには、次のように異なるアクセス許可が必要です。

イベントソースとしてストリームを設定する

イベントソースマッピングを作成し、データストリームから Lambda 関数にレコードを送信するように Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のデータストリームの項目を処理したりできます。

Lambda コンソールで Kinesis から読み取るように関数を設定するには、Kinesis トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. 関数を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. トリガータイプを選択します。

  5. 必要なオプションを設定して [追加] を選択します。

Lambda では、Kinesis イベントソースの次のオプションがサポートされています。

イベントソースオプション

  • Kinesis ストリーム – レコードの読み取り元の Kinesis ストリーム。

  • コンシューマー (オプション) – ストリームコンシューマーを使用して、専用接続でストリームから読み込みます。

  • バッチサイズ – 各バッチ内の関数へ送信するレコードの数(最大 10,000)。Lambda は、同期呼び出し (6 MB) に対してイベントの合計サイズがペイロード制限を超えない限り、バッチ内のすべてのレコードを 1 回の呼び出しで関数に渡します。

  • バッチウィンドウ – 関数を呼び出す前にレコードを収集する最大時間 (秒数) を指定します。

  • Starting position (開始位置) – 新しいレコードのみ、既存のすべてのレコード、または特定の日付以降に作成されたレコードを処理します。

    • 最新 – ストリームに追加された新しいレコードを処理します。

    • Trim horizon (水平トリム) – ストリーム内のすべてのレコードを処理します。

    • At timestamp (タイムスタンプで) – 特定の時刻以降のレコードを処理します。

    既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。

  • 失敗発生時の送信先 – 処理できないレコードの SQS キューまたは SNS トピック。Lambda は、レコードが古くなったか、最大回数の再試行を行ったためレコードのバッチを破棄した場合、バッチに関する詳細をキューまたはトピックに送信します。

  • Retry attempts (再試行数) – 関数がエラーを返すまでに Lambda が試みる再試行の最大数。これは、バッチが関数に到達しなかったときのサービスエラーやスロットリングには適用されません。

  • [Maximum age of record (レコードの最大期間)] – Lambda が関数に送信するレコードの最大期間。

  • エラー発生時のバッチ分割 – 関数がエラーを返した場合は、再試行する前にバッチを 2 つに分割します。

  • シャードごとの同時実行バッチ – 同じシャードから同時に複数のバッチを処理します。

  • 有効 – イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには false に設定します。Lambda は、処理された最新のレコードを追跡し、再度有効になるとその時点から処理を再開します。

注記

Kinesis は、各シャードに対して課金し、拡張ファンアウトの場合はストリームから読み取られたデータに対して課金します。料金の詳細については、「Amazon Kinesis の料金表」を参照してください。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

イベントソースマッピング API

AWS CLI または AWS SDK でイベントソースを管理するには、次の API オペレーションを使用できます。

AWS CLI を使用してイベントソースマッピングを作成するには、create-event-source-mapping コマンドを使用します。以下の例では、AWS CLI を使用して、my-function という名前の関数を Kinesis データストリームにマッピングします。データストリームは Amazon リソースネーム (ARN) によって指定され、バッチサイズ 500 で、Unix 時間形式のタイムスタンプから始まります。

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

次のような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

コンシューマーを使用するには、ストリーム ARN ではなく、コンシューマーの ARN を指定します。

その他のオプションを設定してバッチの処理方法をカスタマイズし、処理できないレコードをいつ破棄するかを指定します。次の例では、イベントソースマッピングを更新し、2 回の再試行後、またはレコードが 1 時間以上古くなったときに SQS キューに失敗レコードを送信します。

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

次の出力が表示されます。

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、get-event-source-mapping コマンドを使用します。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

次の出力が表示されます。

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

複数のバッチを同時に処理するには、--parallelization-factor オプションを使用します。

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

エラー処理

Kinesis ストリームからレコードを読み取るイベントソースマッピングは、関数を同期的に呼び出し、エラー発生時には再試行します。関数がスロットリングされるか、Lambda サービスが関数を呼び出さずにエラーを返す場合、Lambda は、レコードの有効期限が切れるか、イベントソースマッピングで設定した最大期間を超えるまで再試行します。

関数がレコードを受け取るがエラーを返す場合、Lambda はバッチ内のレコードの有効期限が切れる、最大期間を過ぎる、または設定された再試行のクォータに達するまで、再試行します。関数のエラーの場合、失敗したバッチを 2 つのバッチに分割するようイベントソースマッピングを設定することもできます。より小さなバッチで再試行すると、不良レコードが分離され、タイムアウトの問題を回避できます。バッチを分割しても、再試行のクォータに対してカウントされません。

エラー処理が失敗する場合、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。この場合、デフォルト設定では、不良レコードが最大 one week まで、影響を受けたシャードの処理をブロックする可能性があります。これを回避するには、ユースケースに合った適切な数の再試行数と最大レコード期間で、関数のイベントソースマッピングを設定します。

破棄されるバッチのレコードを保持するには、失敗したイベントの送信先を設定します。Lambda はバッチの詳細を含むドキュメントを送信先キューまたはトピックに送信します。

失敗したイベントのレコードの送信先を設定するには

  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [Source (送信元)] で、[Stream invocation (ストリーム呼び出し)] を選択します。

  5. [ストリーム] で、関数にマッピングされるストリームを選択します。

  6. [送信先タイプ] で、呼び出しレコードを受信するリソースのタイプを選択します。

  7. [送信先] で、リソースを選択します。

  8. [Save] (保存) をクリックします。

次の例は、Kinesis ストリームの呼び出しレコードを示します。

例 呼び出しレコード

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

この情報を使用して、トラブルシューティングのため、影響のあるレコードをストリームから取得できます。実際のレコードは含まれないため、このレコードを処理し、有効期限が切れて失われる前に、ストリームから取得する必要があります。

Amazon CloudWatch のメトリクス

関数がレコードのバッチの処理を完了すると、Lambda により IteratorAge メトリクスが発生します。メトリクスは、処理が終了したとき、バッチの最後のレコードがどれくらい時間が経過したレコードであったかを示します。関数が新しいイベントを処理する場合、イテレーターの有効期間を使用して、レコードが追加されてから関数によって処理されるまでのレイテンシーを推定できます。

イテレーターの有効期間が増加傾向の場合、関数に問題があることを示している可能性があります。詳細については、「AWS Lambda 関数メトリクスの使用」を参照してください。

時間枠

Lambda関数は、連続ストリーム処理アプリケーションを実行できます。ストリームは、アプリケーションを継続的に流れる無限のデータを表します。この継続的に更新される入力からの情報を分析するために、時間に関して定義されたウィンドウを使用して、含まれるレコードをバインドできます。

Lambda呼び出しはステートレス—外部データベースがない場合、複数の連続した呼び出しでデータを処理するために使用することはできません。ただし、ウィンドウを有効にすると、呼び出し間で状態を維持できます。この状態は、現在のウィンドウに対して以前に処理されたメッセージの集計結果が含まれます。状態は、シャードごとに最大 1 MB にすることができます。このサイズを超えると、Lambda ウィンドウを早期に終了します。

タンブリングウィンドウ

Lambda関数は、タンブルウィンドウを使用してデータを集計できます:一定の間隔で開閉する個別のタイムウィンドウです。タンブルウィンドウを使用すると、連続した、重なり合っていないタイムウィンドウでストリーミングデータソースを処理できます。

ストリームの各レコードは、特定のウィンドウに属しています。Lambdaレコードが属するウィンドウが処理されると、レコードは 1 回だけ処理されます。各ウィンドウでは、シャード内のパーティションキーレベルで合計や平均などの計算を実行できます。

集約と処理

ユーザー管理関数は、集約と、その集約の最終結果を処理するために呼び出されます。Lambdaウィンドウ内で受け取ったすべてのレコードを集約します。これらのレコードは、個別の呼び出しとして複数のバッチで受け取ることができます。各呼び出しは状態を受け取ります。また、レコードを処理し、次の呼び出しで渡される新しい状態を返すこともできます。Lambdaは、JSONで次のフォーマットのTimeWindowEventResponseを返します。

TimeWindowEventReponse

{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注記

Java関数の場合は、Map <文字列、文字列>を使用して状態を表すことをお勧めします。

ウィンドウの最後で、フラグisFinalInvokeForWindowtrueに設定され、これが最終状態であり、処理の準備ができていることが示されます。処理が完了すると、ウィンドウが完了し、最終的な呼び出しが完了し、状態は削除されます。

ウィンドウの最後に、Lambda は集計結果に対するアクションの最終処理を使用します。最終処理が同期的に呼び出されます。呼び出しが成功すると、関数はシーケンス番号をチェックポイントし、ストリーム処理が続行されます。呼び出しが失敗した場合、Lambda 関数は呼び出しが成功するまで処理を一時停止します。

例 kinesisTimeWindowEvent

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

設定

イベントソースマッピングを作成または更新するときに 、タンブリングウィンドウを設定できます。タンブリングウィンドウを設定するには、ウィンドウを秒単位で指定します。次の例のAWS Command Line Interface (AWS CLI)コマンドは、タンブルウィンドウが120秒に設定されたストリーミングイベントソースマッピングを作成します。集約と処理のために定義されたLambda関数の名前はtumbling-window-example-functionです。

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambdaは、レコードがストリームに挿入された時間に基づいて、タンブルするウィンドウ境界を決定します。すべてのレコードには、Lambdaが境界の決定に使用するおおよそのタイムスタンプがあります。

ウィンドウの集合をタンブルしても、再共有はサポートされません。シャードが終了すると、Lambdaはウィンドウが閉じているとみなし、子シャードは新しい状態で自身のウィンドウを開始します。

タンブルウィンドウは、既存の再試行ポリシーmaxRetryAttemptsおよびmaxRecordAgeを完全にサポートします。

例 Handler.py – 集約と処理

次の Python 関数は、最終状態を集約して処理する方法を示しています。

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

バッチアイテムの失敗をレポートする

イベントソースからストリーミングデータを使用および処理する場合、デフォルトでは、Lambdaバッチが完全に成功した場合にのみ、バッチの最大シーケンス番号にチェックポイントが設定されます。Lambda他のすべての結果を完全な失敗として扱い、再試行制限までバッチの処理を再試行します。ストリームからのバッチの処理中に部分的な成功を許可するには、ReportBatchItemFailuresをオンにします 。部分的な成功を許可すると、レコードの再試行回数を減らすことができますが、成功したレコードの再試行の可能性を完全に妨げるわけではありません。

ReportBatchItemFailuresオンにするには 、ReportBatchItemFailures 列挙値をFunctionResponseTypesリストに含めます。このリストは、関数で有効になっているレスポンスタイプを示します。このリストは、 イベントソースマッピングを作成または更新するときに設定ができます。

レポートの構文

バッチアイテムの失敗に関するレポートを設定する場合、StreamsEventResponse クラスはバッチアイテムの失敗のリストとともに返されます。StreamsEventResponseオブジェクトを使用して、バッチ処理で最初に失敗したレコードのシーケンス番号を返すことができます。また、正しいレスポンスシンタックスを使用して、独自のカスタムクラスを作成することもできます。次の JSON 構造体は、必要な応答構文を示しています。

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }

成功条件と失敗の条件

Lambda次のいずれかを返すと、バッチは完全な成功として処理します:

  • 空のbatchItemFailureリストです。

  • ヌルbatchItemFailureリスト

  • 空の EventResponse

  • ヌル EventResponse

Lambda次のいずれかを返すと、バッチは完全な失敗として処理します:

  • 空の文字列itemIdentifier

  • ヌル itemIdentifier

  • itemIdentifier間違えているキー名

Lambda再試行戦略に基づいて失敗を再試行します。

バッチを2分割します

呼び出しが失敗し、BisectBatchOnFunctionError オンになっている場合、バッチはReportBatchItemFailures設定に関係なく2分割されます。

部分的なバッチ成功レスポンスを受信し、BisectBatchOnFunctionErrorReportBatchItemFailuresの両方がオンになっている場合、バッチは返されたシーケンス番号で2分割され、Lambda残りのレコードのみが再試行されます。

Java

例 Handler.java – 新しい streamsEventResponse () を返します

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, Serializable> { @Override public Serializable handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<*>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { //Return failed record's sequence number batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
Python

例 Handler.py – batchitemfailures [] を返します

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}