Amazon DynamoDB で AWS Lambda を使用する - AWS Lambda

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon DynamoDB で AWS Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信するか、データを送信する前にデータを強化する場合は、「Amazon EventBridge Pipes」を参照してください。

AWS Lambda 関数を使用して、Amazon DynamoDB ストリームのレコードを処理します。DynamoDB Streams では、Lambda 関数を使用して、DynamoDB テーブルが更新されるたびに追加の作業を実行することができます。

Lambda はストリームからレコードを読み取り、関数を、ストリームレコードを含むイベントと共に同期的に呼び出します。Lambda はバッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。

イベントの例

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

ポーリングストリームとバッチストリーム

Lambda は、レコードの DynamoDB ストリームにあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、Lambda は関数を呼び出し、結果を待機します。処理が成功すると、Lambda は、レコードをさらに受け取るまでポーリングを再開します。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、バッチ処理ウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「バッチ処理動作」を参照してください。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、バッチの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードをべき等にすることを強くお勧めします。詳細については、「 AWSナレッジセンター」の「Lambda 関数をべき等にするにはどうすればよいですか?」を参照してください。

関数がエラーを返した場合、処理が成功するか、データの有効期限が切れるまで Lambda はバッチを再試行します。停止するシャードを避けるために、より小さなバッチサイズで再試行したり、再試行回数を制限したり、古すぎるレコードを破棄したりするように、イベントソースマッピングを設定できます。破棄されたイベントを保持するには、失敗したバッチの詳細を標準 SQS キューまたは標準 SNS トピックに送信するように、イベントソースマッピングを設定します。

同時実行数を増やすために、各シャードからの複数のバッチを並行して処理することもできます。Lambda は、各シャードで最大 10 個のバッチを同時に処理できます。シャードごとの同時バッチの数を増やしても、Lambda は引き続き、パーティションキーレベルで順序どおりに処理を行います。

ParallelizationFactor 設定を使用することで、複数の Lambda 呼び出しを同時に実行して Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)~10 の並列化係数で指定できます。例えば、ParallelizationFactor を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、100 個の Kinesis データシャードを処理できます (ただし、ConcurrentExecutions メトリックの実際の値は異なったものとなります)。これにより、データボリュームが揮発性で IteratorAge が高いときに処理のスループットをスケールアップすることができます。

Kinesis 集約ParallelizationFactorで を使用することもできます。イベントソースマッピングの動作は、拡張ファンアウト を使用しているかどうかによって異なります。

  • 拡張ファンアウトなし: 集約イベント内のすべてのイベントは、同じパーティションキーを持つ必要があります。パーティションキーは、集約されたイベントのパーティションキーとも一致する必要があります。集約されたイベント内のイベントに異なるパーティションキーがある場合、Lambda はパーティションキーによるイベントの順番な処理を保証することはできません。

  • 拡張ファンアウト: まず、Lambda は集約されたイベントを個々のイベントにデコードします。集約されたイベントには、含まれるイベントとは異なるパーティションキーを設定できます。ただし、パーティションキーに対応していないイベントは削除され、 は失われます。Lambda はこれらのイベントを処理しず、設定された障害送信先に送信しません。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON として指定します。

DynamoDB Streams でのシャードの同時読み込み

単一リージョンのテーブルがグローバルテーブルでない場合、同じ DynamoDB Streams のシャードから、同時に 2 つまでの Lambda 関数を読み込むように設計できます。この制限を超えると、リクエストのスロットリングが発生する場合があります。グローバルテーブルでは、リクエストのスロットリングを回避するために、同時関数の数を 1 に制限することをお勧めします。

実行ロールのアクセス許可

AWSLambdaDynamoDBExecutionRole AWS マネージドポリシーには、Lambda が DynamoDB ストリームから読み取るために必要なアクセス許可が含まれています。この管理ポリシーを関数の実行ロールに追加します。

標準 SQS キューまたは標準 SNS トピックに失敗したバッチのレコードを送信するには、関数に追加の許可が必要になります。各送信先サービスには、次のように異なるアクセス許可が必要です。

アクセス許可を追加してイベントソースマッピングを作成する

イベントソースマッピングを作成し、ストリームから Lambda 関数にレコードを送信するように Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のストリームの項目を処理したりできます。

DynamoDB Streams から読み取るように関数を設定するには、実行ロールに AWSLambdaDynamoDBExecutionRoleAWSマネージドポリシーをアタッチし、DynamoDB トリガーを作成します。

アクセス許可を追加してトリガーを作成するには
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [Configuration] (設定) タブを開き、次に [Permissions] (アクセス許可) をクリックします。

  4. ロール名 で、実行ロールへのリンクを選択します。このリンクをクリックすると、IAM コンソールでロールが開きます。

    
              実行ロールへのリンク
  5. [アクセス許可を追加][ポリシーをアタッチ] の順に選択します。

    
              IAM コンソールでのポリシーのアタッチ
  6. [検索] フィールドに AWSLambdaDynamoDBExecutionRole を入力します。このポリシーを実行ロールに追加します。これは、関数が DynamoDB ストリームから読み取るために必要なアクセス許可を含む AWSマネージドポリシーです。このポリシーの詳細については、「 AWSマネージドポリシーリファレンスAWSLambdaDynamoDBExecutionRole」の「」を参照してください。

  7. Lambda コンソールで関数に戻ります。[関数の概要] で [トリガーを追加] をクリックします。

    
              Lambda コンソールの関数の概要セクション
  8. トリガーのタイプを選択します。

  9. 必須のオプションを設定し、[Add] (追加) を選択します。

Lambda は、DynamoDB イベントソースの以下のオプションをサポートしています。

イベントソースオプション
  • DynamoDB テーブル - レコードの読み取り元の DynamoDB テーブル。

  • バッチサイズ - 各バッチで関数に送信されるレコードの数。最大 10,000。Lambda は、イベントの合計サイズが同期呼び出しのペイロード上限 (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。

  • バッチウィンドウ - 関数を呼び出す前にレコードを収集する最大時間(秒数)を指定します。

  • 開始位置 - 新規レコードのみ、または既存のすべてのレコードを処理します。

    • 最新 - ストリームに追加された新しいレコードを処理します。

    • 水平トリム - ストリーム内のすべてのレコードを処理します。

    既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。

  • [障害発生時の宛先] — 処理できないレコードの標準 SQS キューまたは標準 SNS トピックです。Lambda は、古すぎる、または再試行回数の上限に達したレコードのバッチを廃棄すると、バッチに関する詳細をキューまたはトピックに送信します。

  • 再試行回数 - 関数がエラーを返したときに Lambda が再試行する回数の上限です。これは、バッチが関数に到達しなかったサービスエラーやスロットルには適用されません。

  • レコードの最大有効期間 — Lambda が関数に送信するレコードの最大経過時間。

  • エラー発生時のバッチ分割 — 関数がエラーを返した場合、再試行する前にバッチを 2 つに分割します。元のバッチサイズ設定は変更されません。

  • シャードごとの同時バッチ — 同じシャードからの複数のバッチを同時に処理します。

  • 有効 - イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには、false に設定します。Lambda は、処理された最新のレコードを追跡し、マッピングが再度有効になるとその時点から処理を再開します。

注記

DynamoDB トリガーの一部として Lambda によって呼び出された GetRecords API コールに対しては課金されません。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API オペレーションを使用できます。

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された DynamoDB ストリーミングに、バッチサイズ 500 でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525

以下の出力が表示されます。

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

その他のオプションを設定し、バッチの処理方法をカスタマイズしたり、処理できないレコードをいつ破棄するかを指定したりします。次の例では、2 回の再試行後、またはレコードが 1 時間以上経過した場合に、イベントソースマッピングを更新して障害レコードを標準 SQS キューに送信します。

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

こちらの出力が表示されるはずです。

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。get-event-source-mapping コマンドを使用して、現在のステータスを表示します。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

こちらの出力が表示されるはずです。

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

複数のバッチを同時に処理するには、--parallelization-factor オプションを使用します。

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

エラー処理

DynamoDB ストリームからレコードを読み取るイベントソースマッピングは、関数を同期的に呼び出し、エラー発生時に再試行します。Lambda が関数をスロットリングするか、関数を呼び出すことなくエラーを返す場合、Lambda は、レコードの期限が切れる、またはイベントソースマッピングで設定した最大経過時間を超過するまで再試行します。

関数がレコードを受信してもエラーが返された場合、バッチ内のレコードが期限切れになるか、最大経過時間を超えるか、設定された再試行クォータに達するまで、Lambda は再試行します。関数エラーの場合、失敗したバッチを 2 つのバッチに分割するように、イベントソースマッピングを構成することもできます。小さなバッチで再試行すると、不良のレコードが分離され、タイムアウトの問題が回避されます。バッチを分割しても、再試行クォータにはカウントされません。

エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 日間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。

破棄されたバッチのレコードを保持するには、失敗したイベントの送信先を設定します。Lambda は、バッチの詳細を含むドキュメントを送信先キューまたはトピックに送信します。

失敗したイベントのレコードの送信先を設定するには
  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [Source] (送信元) で、[Stream invocation] (ストリームの呼び出し) を選択します。

  5. [ストリーム] で、関数にマッピングされるストリームを選択します。

  6. [送信先タイプ] で、呼び出しレコードを受信するリソースのタイプを選択します。

  7. [送信先] で、リソースを選択します。

  8. [Save] を選択します。

次の例は、DynamoDB ストリームの呼び出しレコードを示しています。

例 呼び出しレコード
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。

Amazon CloudWatch メトリクス

関数がレコードのバッチの処理を完了すると、Lambda により IteratorAge メトリクスが発生します。メトリクスは、処理が終了したとき、バッチの最後のレコードがどれくらい時間が経過したレコードであったかを示します。関数が新しいイベントを処理する場合、イテレーターの有効期間を使用して、レコードが追加されてから関数によって処理されるまでのレイテンシーを推定できます。

イテレーターの有効期間が増加傾向の場合、関数に問題があることを示している可能性があります。詳しくは、「Lambda 関数のメトリクスの使用」を参照してください。

時間枠

Lambda 関数は、連続ストリーム処理アプリケーションを実行できます。ストリームは、アプリケーションを継続的に流れる無限のデータを表します。この継続的に更新される入力からの情報を分析するために、時間に関して定義されたウィンドウを使用して、含まれるレコードをバインドできます。

タンブリングウィンドウは、一定の間隔で開閉する別個のタイムウィンドウです。ディフォルトでは、Lambda 呼び出しはステートレス — 外部データベースがない場合、複数の連続した呼び出しでデータを処理するために使用することはできません。ただし、タンブリングウィンドウを使用して、呼び出し間で状態を維持できます。この状態は、現在のウィンドウに対して以前に処理されたメッセージの集計結果が含まれます。状態は、シャードごとに最大 1 MB にすることができます。このサイズを超えると、Lambda はウィンドウを早期に終了します。

ストリームの各レコードは、特定のウィンドウに属しています。Lambda は各レコードを少なくとも 1 回処理しますが、各レコードが 1 回だけ処理される保証はありません。エラー処理などのまれなケースでは、一部のレコードが複数回処理されることがあります。レコードは常に最初から順番に処理されます。レコードが複数回処理される場合、順不同で処理されます。

集約と処理

ユーザー管理関数は、集約と、その集約の最終結果を処理するために呼び出されます。Lambda は、ウィンドウで受信したすべてのレコードを集約します。これらのレコードは、個別の呼び出しとして複数のバッチで受け取ることができます。各呼び出しは状態を受け取ります。したがって、タンブリングウィンドウを使用する場合、Lambda 関数の応答に state プロパティが含まれている必要があります。応答に state プロパティが含まれてないと、Lambda はこれを失敗した呼び出しと見なします。この条件を満たすために、関数は次の JSON 形式の TimeWindowEventResponse オブジェクトを返すことができます。

TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注記

Java 関数の場合は、Map<String, String>を使用して状態を表すことをお勧めします。

ウィンドウの最後で、フラグisFinalInvokeForWindowtrueに設定され、これが最終状態であり、処理の準備ができていることが示されます。処理が完了すると、ウィンドウが完了し、最終的な呼び出しが完了し、状態は削除されます。

ウィンドウの最後に、Lambda は集計結果に対するアクションの最終処理を使用します。最終処理が同期的に呼び出されます。呼び出しが成功すると、関数はシーケンス番号をチェックポイントし、ストリーム処理が続行されます。呼び出しが失敗した場合、Lambda 関数は呼び出しが成功するまで処理を一時停止します。

例 DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

構成

イベントソースマッピングを作成または更新するときに 、タンブリングウィンドウを設定できます。タンブリングウィンドウを設定するには、ウィンドウを秒単位で指定します。次の例のAWS Command Line Interface (AWS CLI)コマンドは、タンブルウィンドウが120秒に設定されたストリーミングイベントソースマッピングを作成します。集約と処理のために Lambda 関数が定義した関数の名前は tumbling-window-example-function です。

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambdaは、レコードがストリームに挿入された時間に基づいて、タンブルするウィンドウ境界を決定します。すべてのレコードには、Lambda が境界の決定に使用するおおよそのタイムスタンプがあります。

ウィンドウの集合をタンブルしても、再共有はサポートされません。シャードが終了すると、Lambda はウィンドウが閉じているとみなし、子シャードは新しい状態で自身のウィンドウを開始します。

タンブルウィンドウは、既存の再試行ポリシーmaxRetryAttemptsおよびmaxRecordAgeを完全にサポートします。

例 Handler.py - 集約と処理

次の Python 関数は、最終状態を集約して処理する方法を示しています。

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

バッチアイテムの失敗をレポートする

イベントソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に Lambda チェックポイントが設定されます。Lambda は、他のすべての結果を完全な失敗として扱い、再試行の上限までバッチの処理を再試行します。ストリームからのバッチの処理中に部分的な成功を許可するには、ReportBatchItemFailuresをオンにします 。部分的な成功を許可すると、レコードの再試行回数を減らすことができますが、成功したレコードの再試行の可能性を完全に妨げるわけではありません。

ReportBatchItemFailuresオンにするには 、ReportBatchItemFailures 列挙値をFunctionResponseTypesリストに含めます。このリストは、関数で有効になっているレスポンスタイプを示します。このリストは、 イベントソースマッピングを作成または更新するときに設定ができます。

レポートの構文

バッチアイテムの失敗に関するレポートを設定する場合、StreamsEventResponse クラスはバッチアイテムの失敗のリストとともに返されます。StreamsEventResponseオブジェクトを使用して、バッチ処理で最初に失敗したレコードのシーケンス番号を返すことができます。また、正しいレスポンスシンタックスを使用して、独自のカスタムクラスを作成することもできます。次の JSON 構造体は、必要な応答構文を示しています。

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
注記

batchItemFailures 配列に複数の項目が含まれている場合、Lambda はシーケンス番号が最も小さいレコードをチェックポイントとして使用します。その後、Lambda はそのチェックポイントからすべてのレコードを再試行します。

成功条件と失敗の条件

次のいずれかを返すと、Lambda はバッチを完全な成功として処理します:

  • 空のbatchItemFailureリストです。

  • null の batchItemFailure リスト

  • 空の EventResponse

  • ヌル EventResponse

次のいずれかを返すと、Lambda はバッチを完全な失敗として処理します:

  • 空の文字列itemIdentifier

  • ヌル itemIdentifier

  • itemIdentifier間違えているキー名

Lambda は、再試行戦略に基づいて失敗を再試行します。

バッチを2分割します

呼び出しが失敗し、BisectBatchOnFunctionError オンになっている場合、バッチはReportBatchItemFailures設定に関係なく2分割されます。

部分的なバッチ成功レスポンスを受信し、BisectBatchOnFunctionErrorReportBatchItemFailures の両方がオンになっている場合、バッチは返されたシーケンス番号で 2 分割され、Lambda は残りのレコードのみを再試行します。

Java
例 Handler.java – 新しい StreamsEventResponse() を返します。
import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
Python
例 Handler.py – 返される batchItemFailures[]
def handler(event, context): records = event.get("Records") curRecordSequenceNumber = ""; for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

Amazon DynamoDB Streams 設定パラメータ

すべての Lambda イベントソースタイプは、同じ CreateEventSourceMappingおよび UpdateEventSourceMapping API オペレーションを共有します。ただし、DynamoDB Streams に適用されるのは一部のパラメータのみです。

DynamoDB Streams に適用されるイベントソースパラメータ
[Parameter] (パラメータ) 必須 デフォルト メモ

BatchSize

N

100

最大: 10,000

BisectBatchOnFunctionError

N

false

DestinationConfig

N

廃棄されたレコードの標準 Amazon SQS キューまたは標準 Amazon SNS トピックの送信先。

有効

N

true

EventSourceArn

Y

データストリームまたはストリームコンシューマーの ARN。

FilterCriteria

N

FunctionName

Y

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 は無限を意味します。失敗したレコードは、レコードの有効期限が切れるまで再試行されます。DynamoDB Streams のデータ保持制限は 24 時間です。

最小: -1

最大: 604,800

MaximumRetryAttempts

N

-1

-1 に設定すると無制限になり、失敗したレコードはレコードの有効期限が切れるまで再試行されます。

最小: 0

最大: 10,000

ParallelizationFactor

N

1

最大: 10

StartingPosition

Y

TRIM_HORIZON または LATEST

TumblingWindowInSeconds

N

最小: 0

最大: 900