Lambda のイベントソースマッピング - AWS Lambda

Lambda のイベントソースマッピング

イベントソースマッピングとは、イベントソースから読み取り、Lambda 関数を呼び出す Lambda リソースのことです。イベントソースマッピングを使用して、Lambda 関数を直接呼び出さないサービスのストリームまたはキューから項目を処理できます。Lambda は、次のサービスのイベントソースマッピングを提供します。

イベントソースのマッピングは、イベントソースの項目の読み取りや管理のために、関数の実行ロールのアクセス許可を使用します。アクセス許可やイベント構造、設定、ポーリングの動作はイベントソースによって異なります。詳細については、イベントソースとして使われるサービスの関連トピックを参照してください。

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された DynamoDB ストリーミングに、バッチサイズ 500 でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

次のような出力が表示されます。

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Lambda イベントソースマッピングは、ポーラーに備わる分散性が理由で、少なくとも 1 回イベントを処理します。その結果、まれに Lambda 関数が重複するイベントを受け取ることがあります。重複イベントに関連する問題を回避するためには、「AWS Lambda 関数を使用するためのベストプラクティス」に従って、冪等性関数を作成します。

バッチ処理動作

イベントソースマッピングは、ターゲットイベントソースから項目を読み取ります。デフォルトで、イベントソースマッピングは、Lambda が関数に送信する単一のペイロードにレコードをまとめてバッチ処理します。バッチ処理動作を細かく調整するには、バッチ処理ウィンドウ (MaximumBatchingWindowInSeconds) とバッチサイズ (BatchSize) を設定できます。バッチ処理ウィンドウとは、レコードを単一のペイロードにまとめるための最大時間です。バッチサイズとは、単一のバッチ内にあるレコードの最大数です。Lambda は、以下の 3 つの条件のいずれかが満たされたときに関数を呼び出します。

  • バッチ処理ウィンドウが最大値に到達した。バッチ処理ウィンドウの動作は、特定のイベントソースに応じて異なります。

    • Kinesis、DynamoDB、および Amazon SQS イベントソースの場合: デフォルトのバッチ処理ウィンドウは 0 秒です。つまり、Lambda は可能な限り早急にバッチを関数に送信します。MaximumBatchingWindowInSeconds を設定する場合、前の関数の呼び出しが完了するとすぐに次のバッチ処理ウィンドウが開始されます。

    • Amazon MSK、セルフマネージド Apache Kafka、および Amazon MQ イベントソースの場合: デフォルトのバッチ処理ウィンドウは 500 ミリ秒です。MaximumBatchingWindowInSeconds は、秒単位で 0 秒から 300 秒までの任意の値に設定できます。バッチ処理ウィンドウは、最初のレコードが到着するとすぐに開始されます。

      注記

      MaximumBatchingWindowInSeconds は秒単位の増分でしか変更できないため、変更後に 500 ミリ秒のデフォルトのバッチ処理ウィンドウに戻すことはできません。デフォルトのバッチ処理ウィンドウを復元するには、新しいイベントソースマッピングを作成する必要があります。

  • バッチサイズに適合した。最小バッチサイズは 1 です。デフォルトのバッチサイズと最大バッチサイズは、イベントソースに応じて異なります。これらの値に関する詳細については、CreateEventSourceMapping API 操作の「BatchSize」仕様を参照してください。

  • ペイロードサイズが 6 MB に到達した。この上限を変更することはできません。

以下の図は、これら 3 つの条件を説明するものです。バッチ処理ウィンドウが t = 7 秒で開始されるとします。最初のシナリオでは、バッチ処理ウィンドウが 5 個のレコードを蓄積した後、t = 47 秒の時点で最大値の 40 秒に到達します。2 番目のシナリオでは、バッチ処理ウィンドウの期限が切れる前にバッチサイズが 10 個になるため、バッチ処理ウィンドウが早く終了します。3 番目のシナリオでは、バッチ処理ウィンドウの期限が切れる前に最大ペイロードサイズに到達するため、バッチ処理ウィンドウが早く終了します。


        バッチ処理ウィンドウは、バッチ処理ウィンドウが最大値に到達する、バッチサイズが適合している、またはペイロードサイズが 6 MB に到達するという 3 つの条件のいずれかを満たすときに期限が切れます。

次の例は、Kinesis ストリームから読み取るイベントソースマッピングを示しています。イベントのバッチがすべての処理試行に失敗した場合、イベントソースマッピングはバッチの詳細を SQS キューに送信します。


        イベントソースマッピングは、Kinesis ストリームから読み取ります。レコードを関数に送信する前に、ローカルでキューに入れます。

イベントバッチは、Lambda から関数に送信されるイベントです。これは、現在のバッチ処理ウィンドウの期限が切れるまでイベントソースマッピングが読み取る項目からコンパイルされたレコードまたはメッセージのバッチです。

ストリームの場合、1 つのイベントソースマッピングは、ストリームに存在するシャードごとにイテレーターを作成し、各シャードの項目を順番に処理します。イベントソースマッピングを、ストリームに現れる新規項目のみを読み取る、または古い項目からを読み取るように設定することができます。処理された項目はストリームから削除されず、他の関数やコンシューマーがそれらを処理することができます。

関数がエラーを返す場合は、デフォルトで、イベントソースマッピングは関数が成功するまで、またはバッチ内の項目の期限が切れるまでバッチ全体を再処理します。処理が順序正しく行われることを確実にするため、イベントソースマッピングは、エラーが解決されるまで影響を受けたシャードの処理を一時停止します。古いイベントを破棄したり、再試行回数を制限したり、複数のバッチを並行して処理したりするように、イベントソースマッピングを設定できます。複数のバッチを並列的に処理する場合、各パーティションキーについては順序正しい処理が引き続き保証されますが、イベントソースマッピングは同じシャード内の複数のパーティションキーを同時に処理します。

イベントバッチを破棄したときに、呼び出しレコードを別のサービスに送信するように、イベントソースマッピングを設定することもできます。Lambda では、イベントソースマッピングの以下の送信先がサポートされています。

  • Amazon SQS - SQS キュー。

  • Amazon SNS - SNS トピック。

呼び出しレコードには、失敗したイベントバッチの詳細が JSON 形式で格納されます。

次の例は、Kinesis ストリームの呼び出しレコードを示します。

例 呼び出しレコード

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

また、Lambda は FIFO (先入れ先出し) キューを順に処理し、アクティブなメッセージグループ数までスケールアップできます。標準キューの場合、項目は必ずしも順番に処理されるとは限りません。Lambda は、標準キューをできるだけ早く処理するためスケールアップします。エラーが発生すると、Lambda はバッチを個別の項目としてキューに戻し、元のバッチとは異なるグループでそれらを処理する場合があります。関数にエラーが発生していない場合でも、イベントソースマッピングがキューから同じ項目を 2 回受け取る可能性があります。Lambda は、処理した項目をキューから削除します。Lambda が項目を処理できない場合は、それらをデッドレターキューに送信するようにソースキューを設定できます。

Lambda 関数を直接呼び出すサービスについては、「他のサービスで AWS Lambda を使用する」を参照してください。