AWS Lambda イベントソースマッピング - AWS Lambda

AWS Lambda イベントソースマッピング

イベントソースのマッピングは、イベントソースからを読み取り、Lambda 関数を呼び出す AWS Lambda リソースのことを指します。イベントソースマッピングは、直接 Lambda 関数を呼び出さないサービスのキューまたはストリームから項目を処理するときに使用することができます。Lambda は、次のサービスに対してイベントソースマッピングを提供します。

イベントソースのマッピングは、イベントソースの項目の読み取りや管理のために、関数の実行ロールのアクセス権限を使用します。アクセス権限やイベント構造、設定、ポーリングの動作はイベントソースによって異なります。詳細については、イベントソースとして使われるサービスの関連トピックを参照してください。

AWS CLI または AWS SDK を使用してイベントソースマッピングを管理するには、次の API アクションを使用します。

次の例では、AWS CLI を使用して、500 のバッチサイズで、my-function という名前の関数を、Amazon リソースネーム (ARN) によって指定された DynamoDB ストリームにマッピングします。

$ aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525 { "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

イベントソースマッピングはストリームまたはキューから項目をバッチで読み取ります。関数が受け取るイベントには、複数の項目が含まれます。イベントソースマッピングが関数に送信するバッチのサイズを設定できます。サイズの最大値はサービスによって異なります。使用可能な項目が十分でない場合や、1 つのイベントとして送信するにはバッチが大きすぎる場合は、イベントの項目数をバッチサイズよりも少なく設定することができます。

次の例は、Kinesis ストリームから読み取るイベントソースマッピングを示しています。イベントのバッチがすべての処理試行に失敗した場合、イベントソースマッピングはバッチの詳細を SQS キューに送信します。


      イベントソースマッピングは、Kinesis ストリームから読み取ります。レコードを関数に送信する前に、ローカルでキューに入れます。

イベントバッチは、Lambda から関数に送信されるイベントです。これは、イベントソースマッピングがストリームやキューから読み取った項目によってコンパイルしたレコードやメッセージのバッチです。バッチサイズやその他の設定は、イベントバッチにのみ適用されます。

ストリームの場合、1 つのイベントソースマッピングは、ストリームに存在するシャードごとにイテレーターを作成し、各シャードの項目を順番に処理します。イベントソースマッピングを、ストリームに現れる新規項目のみを読み取る、または古い項目からを読み取るように設定することができます。処理された項目はストリームから削除されず、他の関数やコンシューマーによって処理されることがあります。

デフォルトでは、関数がエラーを返すと、バッチは関数が成功するまで、またはバッチの項目が期限切れになるまで再処理されます。処理が順番に行われるよう、影響を受けているシャードの処理は、エラーが解決するまで一時停止されます。古いイベントを破棄したり、再試行回数を制限したり、複数のバッチを並行して処理したりするように、イベントソースマッピングを設定できます。複数のバッチを並列に処理する場合、各パーティションキーは確実に順序どおりに処理されますが、同じシャードの複数のパーティションキーは同時に処理されます。

イベントバッチを破棄したときに、呼び出しレコードを別のサービスに送信するように、イベントソースマッピングを設定することもできます。Lambda は、イベントソースマッピングの送信先として以下をサポートしています。

  • Amazon SQS – SQS; キュー。

  • Amazon SNS – SNS トピック。

呼び出しレコードには、失敗したイベントバッチの詳細が JSON 形式で格納されます。

次の例は、Kinesis ストリームの呼び出しレコードを示します。

例 呼び出しレコード

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

また、Lambda は FIFO (先入れ先出し) キューを順に処理し、アクティブなメッセージグループ数までスケールアップできます。標準キューの場合、項目は必ずしも順に処理されるとは限りません。Lambda は、標準キューをできるだけ早く処理するためにスケールアップします。エラーが発生すると、バッチは個別の項目としてキューに戻され、元のバッチとは異なるグループで処理される場合があります。関数にエラーのない場合でも、イベントソースマッピングがキューから同じ項目を 2 回受け取る可能性があります。それらが正常に処理されると、Lambda は該当する項目をキューから削除します。項目を処理できない場合はデッドレターキューに送信するように、ソースキューを設定できます。

Lambda 関数を直接呼び出すサービスについては、「他のサービスで AWS Lambda を使用する」を参照してください。