Amazon SQS での Lambda の使用 - AWS Lambda

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon SQS での Lambda の使用

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon Simple Queue Service (Amazon SQS) キュー内のメッセージを処理するには、Lambda 関数を使用することができます。Lambda イベントソースマッピングは、標準のキューおよびファーストイン、ファーストアウト (FIFO) キューをサポートしています。Amazon SQS を使用すると、タスクをキューに送信して非同期的に処理することで、アプリケーションの 1 つのコンポーネントからタスクを任せることができます。

Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。Lambda はメッセージをバッチで読み取り、バッチごとに、一度に関数を呼び出します。関数が正常にバッチを処理すると、Lambda はキューからそのメッセージを削除します。

Lambda がバッチを読み取ると、メッセージはキューに留まりますが、キューの可視性タイムアウトの期間中は非表示になります。関数が正常にバッチを処理すると、Lambda はそのメッセージをキューから削除します。デフォルトでは、バッチの処理中に関数でエラーが発生すると、可視性タイムアウトの期限が切れた後に、そのバッチ内のすべてのメッセージが再びキューに表示されます。このため、関数コードは、意図しない副次的影響を及ぼすことなく同じメッセージを複数回処理できるようにする必要があります。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、バッチの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Lambda がメッセージを複数回処理しないようにするには、関数レスポンスにバッチアイテムの失敗を含めるようにイベントソースマッピングを設定するか、Amazon SQS API アクションの DeleteMessage を使用して、Lambda 関数がメッセージを正常に処理した場合にそれらをキューから削除することができます。Amazon SQS API の使用に関する詳細については、「Amazon Simple Queue Service API Reference」を参照してください。

標準キューメッセージイベントの例

例 Amazon SQS メッセージイベント (標準キュー)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

デフォルトでは、Lambda はキュー内の最大 10 個のメッセージを一度にポーリングし、そのバッチを関数に送信します。少数のレコードで関数を呼び出さないようにするには、バッチウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、バッチ処理ウィンドウの期限が切れる、呼び出しペイロードサイズのクォータに到達する、または設定された最大バッチサイズに到達するまで、SQS 標準キューからのメッセージのポーリングを継続します。

バッチウィンドウを使用していて、SQS キューのトラフィックがきわめて少ない場合、Lambda は関数を呼び出す前に最大 20 秒間待機することがあります。これは、バッチウィンドウを 20 秒未満に設定した場合であっても同様です。

注記

Java では、JSON を逆シリアル化するときに null ポインタエラーが発生することがあります。これは、「Records」と「eventSourceARN」のケースが JSON オブジェクトマッパーによってどのように変換されるかに起因している可能性があります。

FIFO キューメッセージイベントの例

FIFO キューの場合、レコードには、重複除外と順序付けに関連する追加属性が含まれます。

例 Amazon SQS メッセージイベント (FIFO キュー)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

Lambda で使用するキューの設定

SQS キューを作成して、Lambda 関数のイベントソースとして機能できるようにします。次に、Lambda 関数がイベントの各バッチを処理できるよう、またスケールアップ時に Lambda がスロットリングエラーに反応して再試行できるように、時間を見越してキューで設定します。

関数がレコードの各バッチを処理する時間を取るため、ソースキューの可視性タイムアウトは、関数に設定したタイムアウトの少なくとも 6 倍に設定してください。追加の時間は、関数が前のバッチの処理中にスロットリングされた場合に、Lambda が再試行することを可能にします。

関数がメッセージの処理に何回も失敗する場合、Amazon SQS はこのメッセージをデッドレターキューに送信できます。関数がエラーを返す場合、バッチ内のすべてのアイテムがキューに戻ります。可視性タイムアウトの発生後、Lambda がメッセージをもう一度受け取ります。多数の受信後に 2 番目のキューにメッセージを送信するには、ソースキューにデッドレターキューを設定します。

注記

Lambda 関数ではなく、ソースキューのデッドレターキューを設定するようにしてください。関数で設定したデッドレターキューは、イベントソースキューではなく、関数の非同期呼び出しキューに使用されます。

関数からエラーが返された場合や、同時実行数の最大値に達しているために関数を呼び出せない場合は、追加の試行で処理が成功する場合があります。メッセージを配信不能キューに送信する前にメッセージが処理される確率を高めるには、送信元キューのリドライブポリシーの maxReceiveCount5 以上に設定します。

実行ロールのアクセス許可

AWSLambdaSQSQueueExecutionRole AWS 管理ポリシーには、Lambda が Amazon SQS キューから読み取るために必要なアクセス許可が含まれています。この管理ポリシーを関数の実行ロールに追加します。

オプションで、暗号化されたキューを使用している場合は、実行ロールに次の権限を追加する必要もあります。

アクセス許可を追加し、イベントソースマッピングを作成するには

イベントソースマッピングを作成し、キューから Lambda 関数に項目を送信するように Lambda に通知します。1 つの関数で複数のキューの項目を処理するには、複数のイベントソースマッピングを作成します。Lambda がターゲットの関数を呼び出すと、このイベントには設定可能なバッチサイズまでの複数の項目が含まれている可能性があります。

Amazon SQS から読み取るように関数を設定するには、AWSLambdaSQSQueueExecutionRole AWS 管理ポリシーを実行ロールにアタッチし、SQS トリガーを作成します。

アクセス許可を追加してトリガーを作成するには
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [Configuration] (設定) タブを開き、次に [Permissions] (アクセス許可) をクリックします。

  4. [実行ロール] で、実行ロールのリンクを選択します。このリンクを選択すると、IAM コンソールでロールが開きます。

    
              実行ロールのリンク
  5. [アクセス許可を追加][ポリシーをアタッチ] の順に選択します。

    
              IAM コンソールでポリシーのアタッチ
  6. [検索] フィールドに AWSLambdaSQSQueueExecutionRole を入力します。実行ロールにポリシーを追加 これは、関数が Amazon SQS キューから読み取るために必要なアクセス許可を含む AWS 管理ポリシーです。このポリシーの詳細については、「AWS 管理ポリシーリファレンス」の「AWSLambdaSQSQueueExecutionRole」を参照してください。

  7. Lambda コンソールの関数に戻ります。[関数の概要] で [トリガーを追加] をクリックします。

    
              Lambda コンソールの関数の概要セクション
  8. トリガーのタイプを選択します。

  9. 必須のオプションを設定し、[追加] を選択します。

Lambda は、Amazon SQS イベントソースの以下のオプションをサポートしています。

SQS キュー

レコードの読み取り元である Amazon SQS キュー。

トリガーの有効化

イベントソースマッピングのステータス。[Enable trigger] (トリガーの有効化) はデフォルトで選択されています。

バッチサイズ

各バッチで関数に送信されるレコードの最大数。標準キューの場合、最大 10,000 レコードまで可能です。FIFO キューの場合、最大値は 10 です。バッチサイズが 10 を超える場合は、バッチウィンドウ (MaximumBatchingWindowInSeconds) も 1 秒以上に設定する必要があります。

関数のタイムアウトは、バッチの項目すべてを処理するために十分な時間を確保できるように設定してください。項目の処理に長時間かかる場合には、より少ないバッチサイズを選択します。バッチサイズを大きくするとワークロードの効率を向上させることができ、非常に高速になるか、多くのコストがかかります。関数で予約された同時実行数を設定すると、同時実行数を 5 以上に設定した場合に、Lambda が関数を呼び出したときにスロットリングエラーが発生する可能性が少なくなります。

Lambda は、イベントの合計サイズが同期呼び出しの呼び出しペイロードサイズのクォータ (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。Lambda と Amazon SQS の両方が、レコードごとにメタデータを生成します。この追加のメタデータは合計ペイロードサイズに計上され、1 つのバッチで送信されるレコードの総数が設定されたバッチサイズよりも少なくなる可能性があります。Amazon SQS が送信するメタデータフィールドは可変長にすることができます。Amazon SQS メタデータフィールドの詳細については、「Amazon Simple Queue Service API リファレンス」の「ReceiveMessage」API 操作のドキュメントを参照してください。

バッチウィンドウ

関数を呼び出すまでのレコード収集の最大時間 (秒) です。これが適用されるのは標準キューのみです。

0 秒を超えるバッチウィンドウを使用している場合は、キューの可視性タイムアウトに処理時間の増加を考慮する必要があります。キューの可視性タイムアウトは、関数のタイムアウトの 6 倍に MaximumBatchingWindowInSeconds の値を加えた時間に設定することをお勧めします。これによりスロットリングエラーが発生した場合に Lambda 関数がイベントの各バッチを処理し、再試行する時間が許容されます。

メッセージが使用可能になると、Lambda はメッセージのバッチ処理を開始します。Lambda は、関数を 5 回同時に呼び出すことで、一度に 5 つのバッチの処理を開始します。メッセージがまだ利用可能な場合、Lambda は関数のインスタンスを 1 分あたり最大 300 インスタンスまで追加し、最大 1,000 インスタンスまで増やします。関数のスケーリングと同時実行の詳細について理解するには、「Lambda 関数のスケーリング」を参照してください。

より多くのメッセージを処理するには、Lambda 関数を最適化してスループットを向上させることができます。「AWS Lambda がAmazon SQS 標準キューでどのようにスケールするかを理解する」を参照してください。

最大同時実行数

イベントソースが呼び出せる同時関数の最大数。詳細については、「Amazon SQS イベントソースの最大同時実行数の設定」を参照してください。

フィルター条件

フィルター条件を追加して、Lambda が処理のために関数に送信するイベントを制御します。詳細については、「Lambda のイベントフィルタリング」を参照してください。

スケーリングと処理

標準キューの場合、Lambda はロングポーリングを使用して、キューがアクティブになるまでキューをポーリングします。メッセージが利用可能な場合、Lambda は、関数を 5 回同時に呼び出すことで、一度に 5 つのバッチの処理を開始します。メッセージがまだ利用可能な場合、Lambda はバッチを読み込むプロセスの数を 1 分あたり最大 300 インスタンスまで増やします。イベントソースマッピングによって同時に処理できるバッチの最大数は 1,000 です。

FIFO キューの場合、Lambda は、受信した順序でメッセージを関数に送信します。FIFO キューにメッセージを送信する場合、メッセージグループ ID を指定します。Amazon SQS は、同じグループ内のメッセージが Lambda に順番に配信されるようにします。Lambda はメッセージをグループにソートし、グループに対して一度に 1 つのバッチのみを送信します。関数がエラーを返す場合、その関数は、Lambda が同じグループから追加のメッセージを受信する前に、対象メッセージですべての再試行を試みます。

関数は、アクティブなメッセージグループの数に並行してスケールインできます。詳細については、AWS コンピューティングブログの「イベントソースとしての SQS FIFO」を参照してください。

Amazon SQS イベントソースの最大同時実行数の設定

最大同時実行数設定は、Amazon SQS イベントソースが呼び出せる関数の同時インスタンス数を制限します。最大同時実行数は、イベントソースレベルの設定です。1 つの関数に複数の Amazon SQS イベントソースをマップしている場合は、各イベントソースに個別の最大同時実行数を設定できます。最大同時実行数は、1 つのキューが関数の予約された同時実行のすべてを使用したり、アカウントの同時実行クォータの残りのすべてを使用したりしないようにするために使用できます。Amazon SQS イベントソースでの最大同時実行数の設定に料金はかかりません。

重要なのは、最大同時実行数と予約された同時実行は、2 つの独立した設定であるということです。最大同時実行数を、関数の予約された同時実行よりも多い数に設定しないでください。最大同時実行数を設定する場合は、関数の予約された同時実行数が、関数にマップされたすべての Amazon SQS イベントソースの合計最大同時実行数以上になるようにしてください。合計数未満になった場合は、Lambda がメッセージをスロットルする可能性があります。

最大同時実行数が設定されていない場合、Lambda は Amazon SQS イベントソースをアカウントの合計同時実行クォータ (デフォルトでは 1,000) までスケールできます。

注記

FIFO キューの場合、同時呼び出しの上限は、メッセージグループ ID の数 (messageGroupId) または最大同時実行数の設定 (どちらか小さい方) です。例えば、メッセージグループ ID が 6 つあり、最大同時実行数が 10 に設定されている場合、関数は最大 6 回の同時呼び出しを行うことができます。

新規および既存の Amazon SQS イベントソースマッピングに最大同時実行数を設定できます。

Lambda コンソールを使用して最大同時実行数を設定する
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [Function overview] (関数の概要) で [SQS] を選択します。選択すると、[Configuration] (設定) タブが開きます。

  4. Amazon SQS トリガーを選択し、[Edit] (編集) を選択します。

  5. [Maximum concurrency] (最大同時実行数) には、2 から 1,000 までの数値を入力します。最大同時実行数をオフにするには、ボックスを空のままにします。

  6. [保存] を選択します。

AWS Command Line Interface(AWS CLI) を使用して最大同時実行数を設定する

--scaling-config オプション付きの update-event-source-mapping コマンドを使用します。例:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

最大同時実行数をオフにするには、--scaling-config に空の値を入力します。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
Lambda API を使用して最大同時実行数を設定する

ScalingConfig オブジェクトを指定した CreateEventSourceMapping アクションまたは UpdateEventSourceMapping アクションを使用します。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API オペレーションを使用できます。

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された Amazon SQS キューに、バッチサイズ 5、バッチウィンドウ 60 秒でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

以下の出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

失敗した呼び出しに対するバックオフ戦略

呼び出しが失敗すると、Lambda はバックオフ戦略の実装中に呼び出しの再試行を試みます。バックオフ戦略は、Lambda で発生した障害が関数コード内のエラーによるものか、スロットリングによるものかに応じて若干異なります。

  • 関数コードが原因でエラーが発生した場合、Lambda は処理を停止し、呼び出しを再試行します。その間、Lambda は Amazon SQS イベントソースマッピングに割り当てられた同時実行数を減らすことで、再試行を徐々にバックオフします。キューの可視性タイムアウトがタイムアウトすると、メッセージが再びキューに表示されます。

  • スロットリングが原因で呼び出しが失敗する場合、Lambda は Amazon SQS イベントソースマッピングに割り当てられた同時実行数を減らすことで、再試行を徐々にバックオフします。Lambda は、メッセージのタイムスタンプがキューの可視性タイムアウトを超過するまでメッセージを再試行し続けますが、タイムアウトした時点でメッセージをドロップします。

部分的なバッチレスポンスの実装

Lambda 関数がバッチを処理しているときにエラーが発生すると、デフォルトでそのバッチ内のすべてのメッセージが再度キューに表示され、これには Lambda が正常に処理したメッセージも含まれます。その結果、関数が同じメッセージを複数回処理することになる場合があります。

失敗したバッチ内の正常に処理されたメッセージを再処理しないようにするために、失敗したメッセージのみを再び表示するようにイベントソースマッピングを設定できます。これを部分的なバッチレスポンスと呼びます。部分的なバッチレスポンスをオンにするには、イベントソースマッピングを設定するときに FunctionResponseTypes アクション用に ReportBatchItemFailures を指定します。そうすると、関数が部分的な成功を返すようになるため、レコードでの不必要な再試行回数を減らすことができます。

ReportBatchItemFailures がアクティブ化されている場合、Lambda は、関数の呼び出しが失敗したときにメッセージポーリングをスケールダウンしません。一部のメッセージが失敗することが想定され、それらの失敗によってメッセージの処理レートに影響が及ばないようにする場合は、ReportBatchItemFailures を使用します。

注記

部分的なバッチレスポンスを使用する場合は、次の点に注意してください。

  • 関数が例外をスローする場合、バッチ全体が完全な失敗とみなされます。

  • この機能を FIFO キューで使用している場合、関数は最初の失敗後にメッセージの処理を停止し、batchItemFailures で失敗したメッセージと未処理のメッセージのすべてを返します。これは、キュー内のメッセージの順序を維持するのに役立ちます。

部分的なバッチレポートをアクティブ化するには
  1. 部分的なバッチレスポンスを実装するためのベストプラクティスを確認します。

  2. 次のコマンドを実行して、関数用に ReportBatchItemFailures をアクティブ化します。イベントソースマッピングの UUID を取得するには、list-event-source-mappings AWS CLI コマンドを実行します。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 関数コードを更新して、すべての例外をキャッチし、失敗したメッセージを batchItemFailures JSON レスポンスで返します。batchItemFailures レスポンスには、メッセージ ID のリストが itemIdentifier JSON 値として含まれている必要があります。

    例えば、メッセージ ID が id1id2id3id4、および id5 である 5 つのメッセージのバッチがあるとします。関数は、id1id3id5 を正常に処理します。メッセージ id2 および id4 がキューで再び表示されるようにするには、関数が次のレスポンスを返す必要があります。

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。

    .NET
    AWS SDK for .NET
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    .NET を使用した Lambda での SQS バッチアイテム失敗のレポート。

    using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK for Go V2
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Go を使用した Lambda での SQS バッチアイテム失敗のレポート。

    package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK for Java 2.x
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Java を使用した Lambda での SQS バッチアイテム失敗のレポート。

    import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK for JavaScript (v2)
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    JavaScript を使用した Lambda での SQS バッチアイテム失敗のレポート。

    export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    TypeScript を使用して Lambda で SQS バッチ項目の失敗を報告します。

    import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; export const handler = async (event: APIGatewayProxyEvent, context: Context): Promise<APIGatewayProxyResult> => { const batchItemFailures: { ItemIdentifier: string }[] = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ ItemIdentifier: record.messageId }); } } return { statusCode: 200, body: JSON.stringify({ batchItemFailures }), }; }; async function processMessageAsync(record: any, context: Context): Promise<void> { if (!record.body) { throw new Error('No Body in SQS Message.'); } context.log(`Processed message ${record.body}`); }
    PHP
    SDK for PHP
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    PHP を使用した Lambda での SQS バッチアイテム失敗のレポート。

    <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $sqsEvent = new SqsEvent($event); $this->logger->info("Processing SQS records"); $records = $sqsEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getMessageId(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); // Format failures for the response $failures = array_map( fn(string $messageId) => ['itemIdentifier' => $messageId], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger); ?>
    Python
    SDK for Python (Boto3)
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Python を使用した Lambda での SQS バッチアイテム失敗のレポート。

    import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK for Ruby
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Ruby を使用した Lambda での SQS バッチアイテム失敗のレポート。

    require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK for Rust
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Rust を使用した Lambda での SQS バッチアイテム失敗のレポート。

    use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

失敗したイベントがキューに戻らない場合は、AWS ナレッジセンターの「Lambda 関数 SQS ReportBatchItemFailures をトラブルシューティングするにはどうすればよいですか?」を参照してください。

成功条件と失敗の条件

関数が以下のいずれかを返す場合、Lambda はバッチを完全な成功として扱います。

  • 空の batchItemFailures リスト

  • null の batchItemFailures リスト

  • 空の EventResponse

  • null の EventResponse

関数が以下のいずれかを返す場合、Lambda はバッチを完全な失敗として扱います。

  • 無効な JSON レスポンス

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

  • 存在しないメッセージ ID を持つ itemIdentifier

CloudWatch メトリクス

関数がバッチ項目の失敗を正しく報告しているかどうかを判断するために、Amazon SQS メトリクスの NumberOfMessagesDeleted および ApproximateAgeOfOldestMessage を Amazon CloudWatch でモニタリングできます。

  • NumberOfMessagesDeleted は、キューから削除されたメッセージの数を追跡します。これが 0 になるということは、関数レスポンスが失敗したメッセージを正しく返していないことを示唆しています。

  • ApproximateAgeOfOldestMessage は、最も古いメッセージがキューに残っている期間を追跡します。このメトリクスの急激な増加は、関数が失敗したメッセージを正しく返していないことを示唆している可能性があります。

Amazon SQS 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon SQS に適用されるのは一部のパラメータのみです。

Amazon SQS に適用されるイベントソースパラメータ
[Parameter] (パラメータ) 必須 デフォルト メモ

BatchSize

N

10

標準キューの場合、最大値は 10,000 です。FIFO キューの場合、最大値は 10 です。

有効

N

true

EventSourceArn

Y

データストリームまたはストリーミングコンシューマーの ARN。

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

FunctionResponseTypes

N

関数がバッチ内の特定の失敗を報告できるようにするには、FunctionResponseTypes に値 ReportBatchItemFailures を含めます。詳細については、部分的なバッチレスポンスの実装 を参照してください。

MaximumBatchingWindowInSeconds

N

0

ScalingConfig

N

Amazon SQS イベントソースの最大同時実行数の設定