Lambda のイベントソースマッピング - AWS Lambda

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Lambda のイベントソースマッピング

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたい場合は、「Amazon EventBridge Pipes」を参照してください。

イベントソースマッピングとは、イベントソースから読み取り、Lambda 関数を呼び出す Lambda リソースのことです。イベントソースマッピングを使用して、Lambda 関数を直接呼び出さないサービスのストリームまたはキューから項目を処理できます。このページでは、Lambda がイベントソースマッピングを提供するサービスと、バッチ処理動作を微調整する方法について説明します。

イベントソースのマッピングは、イベントソースの項目の読み取りや管理のために、関数の実行ロールのアクセス許可を使用します。アクセス許可やイベント構造、設定、ポーリングの動作はイベントソースによって異なります。詳細については、イベントソースとして使われるサービスの関連トピックを参照してください。

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、バッチの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

イベントソースマッピングを作成する

イベントソースと Lambda 関数間のマッピングを作成するには、コンソールでトリガーを作成するか、create-event-source-mapping コマンドを使用します。

アクセス許可を追加してトリガーを作成するには
  1. 必要なアクセス許可を実行ロールに追加します。Amazon SQS などの一部のサービスには、Lambda がイベントソースから読み取るために必要なアクセス許可を含む AWS 管理ポリシーがあります。

  2. Lambda コンソールの関数ページを開きます。

  3. 関数の名前を選択します。

  4. [関数の概要] で [トリガーを追加] をクリックします。

    
            Lambda コンソールの関数の概要セクション
  5. トリガーのタイプを選択します。

  6. 必須のオプションを設定し、[Add] (追加) を選択します。

イベントソースマッピングを作成するには (AWS CLI)

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された DynamoDB ストリーミングに、バッチサイズ 500 でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525

以下の出力が表示されます。

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

イベントソースマッピングを更新する

イベントソースマッピングを更新するには (コンソール)
  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [設定] タブを選択し、[トリガー] を選択します。

  4. トリガーを選択し、[編集] を選択します。

イベントソースマッピングを更新するには (AWS CLI)

update-event-source-mapping コマンドを使用します。次の例では、Amazon SQS イベントソースの最大同時実行数を設定します。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

イベントソースマッピングを削除する

関数を削除しても、Lambda は関連するイベントソースマッピングを削除しません。イベントソースマッピングは、コンソールで、または DeleteEventSourceMapping API アクションを使用して削除できます。

イベントソースマッピングを削除するには (コンソール)
  1. Lambda コンソールの [イベントソースマッピング] ページを開きます。

  2. 削除するイベントソースマッピングを選択します。

  3. [イベントソースマッピングの削除] ダイアログボックスで、「delete」と入力し、[削除] を選択します。

イベントソースマッピングを削除するには (AWS CLI)

delete-event-source-mapping コマンドを使用します。

aws lambda delete-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-11111EXAMPLE

バッチ処理動作

イベントソースマッピングは、ターゲットイベントソースから項目を読み取ります。デフォルトで、イベントソースマッピングは、Lambda が関数に送信する単一のペイロードにレコードをまとめてバッチ処理します。バッチ処理動作を細かく調整するには、バッチ処理ウィンドウ (MaximumBatchingWindowInSeconds) とバッチサイズ (BatchSize) を設定できます。バッチ処理ウィンドウとは、レコードを単一のペイロードにまとめるための最大時間です。バッチサイズとは、単一のバッチ内にあるレコードの最大数です。Lambda は、以下の 3 つの条件のいずれかが満たされたときに関数を呼び出します。

  • バッチ処理ウィンドウが最大値に到達した。デフォルトのバッチ処理ウィンドウの動作は、特定のイベントソースによって異なります。

    • Kinesis、DynamoDB、および Amazon SQS イベントソースの場合: デフォルトのバッチ処理ウィンドウは 0 秒です。これは、バッチサイズが満たされた場合、またはペイロードサイズ制限に達した場合にのみ、Lambda が関数にバッチを送信することを意味します バッチ処理ウィンドウを設定するには、MaximumBatchingWindowInSeconds を設定します。このパラメータは、秒単位で 0 秒から 300 秒までの任意の値に設定できます。バッチ処理ウィンドウを設定する場合、前の関数の呼び出しが完了するとすぐに次のウィンドウが開始されます。

    • Amazon MSK、セルフマネージド Apache Kafka、Amazon MQ、および Amazon DocumentDB イベントソースの場合: バッチ処理ウィンドウはデフォルトで 500 ミリ秒に設定されます。MaximumBatchingWindowInSeconds は、秒単位で 0 秒から 300 秒までの任意の値に設定できます。バッチ処理ウィンドウは、最初のレコードが到着するとすぐに開始されます。

      注記

      MaximumBatchingWindowInSeconds は秒単位の増分でしか変更できないため、変更後に 500 ミリ秒のデフォルトのバッチ処理ウィンドウに戻すことはできません。デフォルトのバッチ処理ウィンドウを復元するには、新しいイベントソースマッピングを作成する必要があります。

  • バッチサイズに適合した。最小バッチサイズは 1 です。デフォルトのバッチサイズと最大バッチサイズは、イベントソースに応じて異なります。これらの値に関する詳細については、CreateEventSourceMapping API 操作の「BatchSize」仕様を参照してください。

  • ペイロードサイズが 6 MB に到達した。この上限を変更することはできません。

以下の図は、これら 3 つの条件を説明するものです。バッチ処理ウィンドウが t = 7 秒で開始されるとします。最初のシナリオでは、バッチ処理ウィンドウが 5 個のレコードを蓄積した後、t = 47 秒の時点で最大値の 40 秒に到達します。2 番目のシナリオでは、バッチ処理ウィンドウの期限が切れる前にバッチサイズが 10 個になるため、バッチ処理ウィンドウが早く終了します。3 番目のシナリオでは、バッチ処理ウィンドウの期限が切れる前に最大ペイロードサイズに到達するため、バッチ処理ウィンドウが早く終了します。


        バッチ処理ウィンドウは、バッチ処理ウィンドウが最大値に到達する、バッチサイズが適合している、またはペイロードサイズが 6 MB に到達するという 3 つの条件のいずれかを満たすときに期限が切れます。

次の例は、Kinesis ストリームから読み取るイベントソースマッピングを示しています。イベントのバッチがすべての処理試行に失敗した場合、イベントソースマッピングはバッチの詳細を SQS キューに送信します。


        イベントソースマッピングは、Kinesis ストリームから読み取ります。レコードを関数に送信する前に、ローカルでキューに入れます。

イベントバッチは、Lambda から関数に送信されるイベントです。これは、現在のバッチ処理ウィンドウの期限が切れるまでイベントソースマッピングが読み取る項目からコンパイルされたレコードまたはメッセージのバッチです。

Kinesis および DynamoDB のストリームの場合、1 つのイベントソースマッピングは、ストリームに存在するシャードごとにイテレーターを作成し、各シャードの項目を順番に処理します。イベントソースマッピングを、ストリームに現れる新規項目のみを読み取る、または古い項目からを読み取るように設定することができます。処理された項目はストリームから削除されず、他の関数やコンシューマーがそれらを処理することができます。

Lambda は、次のバッチを処理のために送信する前に、設定された Lambda 拡張機能 が完了するのを待ちません。つまり、Lambda がレコードの次のバッチを処理する間も拡張機能を実行し続けることができます。アカウントの同時実行設定または制限に違反すると、スロットリングの問題が発生する可能性があります。これが潜在的な問題であるかどうかを検出するには、関数を監視し、イベントソースマッピングで予想よりも高い同時実行メトリクスが表示されているかどうかを確認します。呼び出しの間隔が短いため、Lambda はシャードの数よりも高い同時実行使用量を一時的に報告する場合があります。これは、拡張機能のない Lambda 関数にも当てはまります。

関数がエラーを返す場合は、デフォルトで、イベントソースマッピングは関数が成功するまで、またはバッチ内の項目の期限が切れるまでバッチ全体を再処理します。処理が順序正しく行われることを確実にするため、イベントソースマッピングは、エラーが解決されるまで影響を受けたシャードの処理を一時停止します。イベントソースマッピングは、古いイベントを破棄する、または複数のバッチを並列処理するように設定できます。複数のバッチを並列的に処理する場合、各パーティションキーについては順序正しい処理が引き続き保証されますが、イベントソースマッピングは同じシャード内の複数のパーティションキーを同時に処理します。

ストリームソース (DynamoDB と Kinesis) については、関数がエラーを返すときに Lambda が再試行する最大回数を設定できます。バッチが関数に到達しないサービスエラーまたはスロットルは、再試行としてカウントされません。

イベントバッチを破棄したときに、呼び出しレコードを別のサービスに送信するように、イベントソースマッピングを設定することもできます。Lambda では、イベントソースマッピングの以下の送信先がサポートされています。

  • Amazon SQS - SQS キュー。

  • Amazon SNS - SNS トピック。

呼び出しレコードには、失敗したイベントバッチの詳細が JSON 形式で格納されます。

次の例は、Kinesis ストリームの呼び出しレコードを示します。

例 呼び出しレコード
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

また、Lambda は FIFO (先入れ先出し) キューを順に処理し、アクティブなメッセージグループ数までスケールアップできます。標準キューの場合、項目は必ずしも順番に処理されるとは限りません。Lambda は、標準キューをできるだけ早く処理するためスケールアップします。エラーが発生すると、Lambda はバッチを個別の項目としてキューに戻し、元のバッチとは異なるグループでそれらを処理する場合があります。関数にエラーが発生していない場合でも、イベントソースマッピングがキューから同じ項目を 2 回受け取る可能性があります。Lambda は、処理した項目をキューから削除します。Lambda が項目を処理できない場合は、それらをデッドレターキューまたは送信先に送るようにソースキューを設定できます。

Lambda 関数を直接呼び出すサービスについては、「他のサービスで AWS Lambda を使用する」を参照してください。

イベントソースマッピング呼び出しの送信先の設定

失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。イベントソースマッピング呼び出しの送信先の設定は、Kinesis、DynamoDB、および Kafka ベースのイベントソースでのみサポートされています。送信先に送られる各レコードは、呼び出しに関する詳細を含む JSON ドキュメントです。エラー処理の設定と同様に、関数、関数のバージョン、またはエイリアスに対して送信先を設定できます。

注記

イベントソースマッピング呼び出しでは、失敗した呼び出しのレコードのみを保持できます。その他の非同期呼び出しでは、成功した呼び出しと失敗した呼び出しの両方のレコードを保持できます。詳細については、「非同期呼び出しの送信先の設定」を参照してください。

任意の Amazon SNS トピックまたは任意の Amazon SQS キューを送信先として設定できます。これらの送信先タイプでは、Lambda がレコードメタデータを送信先に送ります。Kafka ベースのイベントソースの場合のみ、Amazon S3 バケットを送信先に選択できます。S3 バケットを指定すると、Lambda は呼び出しレコード全体をメタデータと共に送信先に送ります。

以下の表は、イベントソースマッピング呼び出しでサポートされる送信先のタイプをまとめたものです。Lambda が選択した送信先にレコードを正常に送信するには、関数の実行ロールにも関連するアクセス権限が含まれていることを確認してください。この表では、各送信先タイプで JSON 呼び出しレコードを受け取る方法についても説明しています。

送信先タイプ サポートされるイベントソース 必要なアクセス許可 宛先固有の JSON 形式

Amazon SQS キュー

  • Kinesis

  • DynamoDB

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードのメタデータを Message として送信先に渡します。

Amazon SNS トピック

  • Kinesis

  • DynamoDB

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードのメタデータを Message として送信先に渡します。

Amazon S3 バケット

  • セルフマネージド型 Apache Kafka と Managed Apache Kafka

Lambda は呼び出しレコードをメタデータと共に送信先に保存します。

以下の例は、Kinesis イベントソース呼び出しが失敗した場合に Lambda が SQS キューまたは SNS トピックに送信する内容を示しています。Lambda はこれらの送信先タイプにメタデータのみを送信するため、streamArnshardIdstartSequenceNumberendSequenceNumber の各フィールドを使用して元のレコード全体を取得します。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

DynamoDB イベントソースの例については、「エラー処理」を参照してください。Kafka イベントソースの例については、「自己管理型 Apache Kafka の障害発生時の送信先」または「Amazon MSK の障害発生時の送信先」を参照してください。