Amazon SQS での Lambda の使用 - AWS Lambda

Amazon SQS での Lambda の使用

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon Simple Queue Service (Amazon SQS) キュー内のメッセージを処理するには、Lambda 関数を使用することができます。Lambda イベントソースマッピングは、標準のキューおよびファーストイン、ファーストアウト (FIFO) キューをサポートしています。Amazon SQS を使用すると、タスクをキューに送信して非同期的に処理することで、アプリケーションの 1 つのコンポーネントからタスクを任せることができます。

Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。Lambda はメッセージをバッチで読み取り、バッチごとに、一度に関数を呼び出します。関数が正常にバッチを処理すると、Lambda はキューからそのメッセージを削除します。

Lambda がバッチを読み取ると、メッセージはキューに留まりますが、キューの可視性タイムアウトの期間中は非表示になります。関数が正常にバッチを処理すると、Lambda はそのメッセージをキューから削除します。デフォルトで、関数がバッチを処理しているときにエラーが発生すると、そのバッチ内のすべてのメッセージが再びキューに表示されます。このため、関数コードは、意図しない副次的影響を及ぼすことなく同じメッセージを複数回処理できるようにする必要があります。

Lambda がメッセージを複数回処理しないようにするには、関数レスポンスにバッチアイテムの失敗を含めるようにイベントソースマッピングを設定するか、Amazon SQS API アクションの DeleteMessage を使用して、Lambda 関数がメッセージを正常に処理した場合にそれらをキューから削除することができます。Amazon SES API の使用に関する詳細については、「Amazon Simple Queue Service API Reference」(Amazon Simple Queue Service API リファレンス) を参照してください。

標準キューメッセージイベントの例

例 Amazon SQS メッセージイベント (標準キュー)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

デフォルトでは、Lambda はキュー内の最大 10 個のメッセージを一度にポーリングし、そのバッチを関数に送信します。少数のレコードで関数を呼び出さないようにするには、バッチウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、バッチ処理ウィンドウの期限が切れる、呼び出しペイロードサイズのクォータに到達する、または設定された最大バッチサイズに到達するまで、SQS 標準キューからのメッセージのポーリングを継続します。

注記

バッチウィンドウを使用していて、SQS キューのトラフィックがきわめて少ない場合、Lambda は関数を呼び出す前に最大 20 秒間待機することがあります。これは、バッチウィンドウを 20 秒未満に設定した場合であっても同様です。

FIFO キューメッセージイベントの例

FIFO キューの場合、レコードには、重複除外と順序付けに関連する追加属性が含まれます。

例 Amazon SQS メッセージイベント (FIFO キュー)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

Lambda で使用するキューの設定

SQS キューを作成して、Lambda 関数のイベントソースとして機能できるようにします。次に、Lambda 関数がイベントの各バッチを処理できるよう、またスケールアップ時に Lambda がスロットリングエラーに反応して再試行できるように、時間を見越してキューで設定します。

関数がレコードの各バッチを処理する時間を取るため、ソースキューの可視性タイムアウトは、関数に設定したタイムアウトの少なくとも 6 倍に設定してください。追加の時間は、関数が前のバッチの処理中にスロットリングされた場合に、Lambda が再試行することを可能にします。

関数がメッセージの処理に何回も失敗する場合、Amazon SQS はこのメッセージをデッドレターキューに送信できます。関数がエラーを返す場合、バッチ内のすべてのアイテムがキューに戻ります。可視性タイムアウトの発生後、Lambda がメッセージをもう一度受け取ります。多数の受信後に 2 番目のキューにメッセージを送信するには、ソースキューにデッドレターキューを設定します。

注記

Lambda 関数ではなく、ソースキューのデッドレターキューを設定するようにしてください。関数で設定したデッドレターキューは、イベントソースキューではなく、関数の非同期呼び出しキューに使用されます。

関数からエラーが返された場合や、同時実行数の最大値に達しているために関数を呼び出せない場合は、追加の試行で処理が成功する場合があります。メッセージを配信不能キューに送信する前にメッセージが処理される確率を高めるには、送信元キューのリドライブポリシーの maxReceiveCount5 以上に設定します。

実行ロールのアクセス許可

Lambda では、Amazon SQS キューでメッセージを管理するために以下のアクセス権限が必要です。これを関数の実行ロールに追加します。

イベントソースとしてキューを設定する

イベントソースマッピングを作成し、キューから Lambda 関数に項目を送信するように Lambda に通知します。1 つの関数で複数のキューの項目を処理するには、複数のイベントソースマッピングを作成します。Lambda がターゲットの関数を呼び出すと、このイベントには設定可能なバッチサイズまでの複数の項目が含まれている可能性があります。

Lambda コンソールで Amazon SQS から読み取るように関数を設定するには、SQS トリガーを作成します。

トリガーを作成するには
  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. 関数の名前を選択します。

  3. [Function overview] (関数の概要) で [Add trigger] (トリガーを追加) をクリックします。

  4. SQS トリガータイプを選択します。

  5. 必須のオプションを設定し、[Add] (追加) を選択します。

Lambda は、Amazon SQS イベントソースの以下のオプションをサポートしています。

SQS キュー

レコードの読み取り元である Amazon SQS キュー。

トリガーの有効化

イベントソースマッピングのステータス。[Enable trigger] (トリガーの有効化) はデフォルトで選択されています。

バッチサイズ

各バッチで関数に送信されるレコードの数。標準キューの場合、最大 10,000 レコードまで可能です。FIFO キューの場合、最大値は 10 です。バッチサイズが 10 を超える場合は、バッチウィンドウ (MaximumBatchingWindowInSeconds) も 1 秒以上に設定する必要があります。

関数のタイムアウトは、バッチの項目すべてを処理するために十分な時間を確保できるように設定してください。項目の処理に長時間かかる場合には、より少ないバッチサイズを選択します。バッチサイズを大きくするとワークロードの効率を向上させることができ、非常に高速になるか、多くのコストがかかります。関数で [reserved concurrency] (予約された同時実行数) を設定すると、同時実行数を 5 以上に設定した場合に、Lambda が関数を呼び出したときにスロットリングエラーが発生する可能性が少なくなります。

Lambda は、イベントの合計サイズが同期呼び出しの呼び出しペイロードサイズのクォータ (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。Lambda と Amazon SQS の両方が、レコードごとにメタデータを生成します。この追加のメタデータは合計ペイロードサイズに計上され、1 つのバッチで送信されるレコードの総数が設定されたバッチサイズよりも少なくなる可能性があります。Amazon SQS が送信するメタデータフィールドは可変長にすることができます。Amazon SQS メタデータフィールドの詳細については、Amazon Simple Queue Service API リファレンスReceiveMessage API 操作のドキュメントを参照してください。

バッチウィンドウ

関数を呼び出すまでのレコード収集の最大時間 (秒) です。これが適用されるのは標準キューのみです。

0 秒を超えるバッチウィンドウを使用している場合は、キューの可視性タイムアウトに処理時間の増加を考慮する必要があります。キューの可視性タイムアウトは、関数のタイムアウトの 6 倍に MaximumBatchingWindowInSeconds の値を加えた時間に設定することをお勧めします。これによりスロットリングエラーが発生した場合に Lambda 関数がイベントの各バッチを処理し、再試行する時間が許容されます。

注記

バッチウィンドウが 0 より大きく、かつ (batch window) + (function timeout) > (queue visibility timeout) である場合、有効なキューの可視性タイムアウトは (batch window) + (function timeout) + 30s になります。

メッセージが使用可能になると、Lambda はメッセージのバッチ処理を開始します。Lambda は、関数を 5 回同時に呼び出すことで、一度に 5 つのバッチの処理を開始します。メッセージがまだ利用可能な場合、Lambda は関数のインスタンスを 1 分あたり最大 60 インスタンスまで追加し、最大 1,000 インスタンスまで増やします。関数のスケーリングと同時実行の詳細について理解するには、「Lambda 関数のスケーリング」を参照してください。

より多くのメッセージを処理するには、Lambda 関数を最適化してスループットを向上させることができます。「AWS Lambda がAmazon SQS 標準キューでどのようにスケールするかを理解する」を参照してください。

最大同時実行数

イベントソースが呼び出せる同時関数の最大数。詳細については、「Amazon SQS イベントソースの最大同時実行数の設定」を参照してください。

フィルター条件

フィルター条件を追加して、Lambda が処理のために関数に送信するイベントを制御します。詳細については、「Lambda のイベントフィルタリング」を参照してください。

スケーリングと処理

標準キューの場合、Lambda はロングポーリングを使用して、キューがアクティブになるまでキューをポーリングします。メッセージが利用可能な場合、Lambda は最大 5 個のバッチを読み込み、それらを関数に送信します。メッセージがまだ利用可能な場合、Lambda はバッチを読み込むプロセスの数を 1 分あたり最大 60 インスタンスまで増やします。イベントソースマッピングによって同時に処理できるバッチの最大数は 1,000 です。

FIFO キューの場合、Lambda は、受信した順序でメッセージを関数に送信します。FIFO キューにメッセージを送信する場合、メッセージグループ ID を指定します。Amazon SQS は、同じグループ内のメッセージが Lambda に順番に配信されるようにします。Lambda はメッセージをグループにソートし、グループに対して一度に 1 つのバッチのみを送信します。関数がエラーを返す場合、その関数は、Lambda が同じグループから追加のメッセージを受信する前に、対象メッセージですべての再試行を試みます。

関数は、アクティブなメッセージグループの数に並行してスケールできます。詳細については、AWS コンピューティングブログの「イベントソースとしての SQS FIFO」を参照してください。

Amazon SQS イベントソースの最大同時実行数の設定

最大同時実行数設定は、Amazon SQS イベントソースが呼び出せる関数の同時インスタンス数を制限します。最大同時実行数は、イベントソースレベルの設定です。1 つの関数に複数の Amazon SQS イベントソースをマップしている場合は、各イベントソースに個別の最大同時実行数を設定できます。最大同時実行数は、1 つのキューが関数の予約された同時実行のすべてを使用したり、アカウントの同時実行クォータの残りのすべてを使用したりしないようにするために使用できます。最大同時実行数と予約された同時実行は、併用することも、個別に使用することもできます。

注記

最大同時実行数を、関数の予約された同時実行よりも多い数に設定することはできません。最大同時実行数の設定後は、関数の予約された同時実行数を、関数にマップされたすべての Amazon SQS イベントソースの合計最大同時実行数未満に減らさないようにしてください。合計数未満になった場合は、Lambda がメッセージをスロットルする可能性があります。FIFO キューの場合、メッセージグループの数が最大同時実行数の上限になります。

Amazon SQS イベントソースでの最大同時実行数の設定に料金はかかりません。

新規および既存の Amazon SQS イベントソースマッピングに最大同時実行数を設定できます。

Lambda コンソールを使用して最大同時実行数を設定する
  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. 関数の名前を選択します。

  3. [Function overview] (関数の概要) で [SQS] を選択します。選択すると、[Configuration] (設定) タブが開きます。

  4. Amazon SQS トリガーを選択し、[Edit] (編集) を選択します。

  5. [Maximum concurrency] (最大同時実行数) には、2 から 1,000 までの数値を入力します。最大同時実行数をオフにするには、ボックスを空のままにします。

  6. [Save (保存)] を選択します。

AWS Command Line Interface(AWS CLI) を使用して最大同時実行数を設定する

--scaling-config オプション付きの update-event-source-mapping コマンドを使用します。例:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

最大同時実行数をオフにするには、--scaling-config に空の値を入力します。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
Lambda API を使用して最大同時実行数を設定する

ScalingConfig オブジェクトを指定した CreateEventSourceMapping アクションまたは UpdateEventSourceMapping アクションを使用します。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API オペレーションを使用できます。

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された Amazon SQS キューに、バッチサイズ 5、バッチウィンドウ 60 秒でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

以下の出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

失敗した呼び出しに対するバックオフ戦略

呼び出しが失敗すると、Lambda はバックオフ戦略の実装中に呼び出しの再試行を試みます。バックオフ戦略は、Lambda で発生した障害が関数コード内のエラーによるものか、スロットリングによるものかに応じて若干異なります。

  • 関数コードが原因でエラーが発生する場合、Lambda は Amazon SQS イベントソースマッピングに割り当てられた同時実行数を減らすことで、再試行を徐々にバックオフします。呼び出しが引き続き失敗する場合、Lambda は最終的に、再試行することなくメッセージをドロップします。

  • スロットリングが原因で呼び出しが失敗する場合、Lambda は Amazon SQS イベントソースマッピングに割り当てられた同時実行数を減らすことで、再試行を徐々にバックオフします。Lambda は、メッセージのタイムスタンプがキューの可視性タイムアウトを超過するまでメッセージを再試行し続けますが、タイムアウトした時点でメッセージをドロップします。

部分的なバッチレスポンスの実装

Lambda 関数がバッチを処理しているときにエラーが発生すると、デフォルトでそのバッチ内のすべてのメッセージが再度キューに表示され、これには Lambda が正常に処理したメッセージも含まれます。その結果、関数が同じメッセージを複数回処理することになる場合があります。

失敗したバッチ内の正常に処理されたメッセージを再処理しないようにするために、失敗したメッセージのみを再び表示するようにイベントソースマッピングを設定できます。これを部分的なバッチレスポンスと呼びます。部分的なバッチレスポンスをオンにするには、イベントソースマッピングを設定するときに FunctionResponseTypes アクション用に ReportBatchItemFailures を指定します。そうすると、関数が部分的な成功を返すようになるため、レコードでの不必要な再試行回数を減らすことができます。

ReportBatchItemFailures がアクティブ化されている場合、Lambda は、関数の呼び出しが失敗したときにメッセージポーリングをスケールダウンしません。一部のメッセージが失敗することが想定され、それらの失敗によってメッセージの処理レートに影響が及ばないようにする場合は、ReportBatchItemFailures を使用します。

注記

部分的なバッチレスポンスを使用する場合は、次の点に注意してください。

  • 関数が例外をスローする場合、バッチ全体が完全な失敗とみなされます。

  • この機能を FIFO キューで使用している場合、関数は最初の失敗後にメッセージの処理を停止し、batchItemFailures で失敗したメッセージと未処理のメッセージのすべてを返します。これは、キュー内のメッセージの順序を維持するのに役立ちます。

部分的なバッチレポートをアクティブ化するには
  1. 部分的なバッチレスポンスを実装するためのベストプラクティスを確認します。

  2. 次のコマンドを実行して、関数用に ReportBatchItemFailures をアクティブ化します。イベントソースマッピングの UUID を取得するには、list-event-source-mappings AWS CLI コマンドを実行します。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 関数コードを更新して、すべての例外をキャッチし、失敗したメッセージを batchItemFailures JSON レスポンスで返します。batchItemFailures レスポンスには、メッセージ ID のリストが itemIdentifier JSON 値として含まれている必要があります。

    例えば、メッセージ ID が id1id2id3id4、および id5 である 5 つのメッセージのバッチがあるとします。関数は、id1id3id5 を正常に処理します。メッセージ id2 および id4 がキューで再び表示されるようにするには、関数が次のレスポンスを返す必要があります。

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。

    Python
    例 – batchItemFailures の Python 関数コード
    import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Java
    例 – batchItemFailures の Java 関数コード
    import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }

失敗したイベントがキューに戻らない場合は、AWS ナレッジセンターの「Lambda 関数 SQS ReportBatchItemFailures をトラブルシューティングするにはどうすればよいですか?」を参照してください。

成功条件と失敗の条件

関数が以下のいずれかを返す場合、Lambda はバッチを完全な成功として扱います。

  • 空の batchItemFailures リスト

  • null の batchItemFailures リスト

  • 空の EventResponse

  • null の EventResponse

関数が以下のいずれかを返す場合、Lambda はバッチを完全な失敗として扱います。

  • 無効な JSON レスポンス

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

  • 存在しないメッセージ ID を持つ itemIdentifier

CloudWatch メトリクス

関数がバッチ項目の失敗を正しく報告しているかどうかを判断するために、Amazon SQS メトリクスの NumberOfMessagesDeleted および ApproximateAgeOfOldestMessage を Amazon CloudWatch でモニタリングできます。

  • NumberOfMessagesDeleted は、キューから削除されたメッセージの数を追跡します。これが 0 になるということは、関数レスポンスが失敗したメッセージを正しく返していないことを示唆しています。

  • ApproximateAgeOfOldestMessage は、最も古いメッセージがキューに残っている期間を追跡します。このメトリクスの急激な増加は、関数が失敗したメッセージを正しく返していないことを示唆している可能性があります。

Amazon SQS 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon SQS に適用されるのは一部のパラメータのみです。

Amazon SQS に適用されるイベントソースパラメータ
パラメータ 必須 デフォルト メモ

BatchSize

N

10

標準キューの場合、最大値は 10,000 です。FIFO キューの場合、最大値は 10 です。

有効

N

true

EventSourceArn

Y

データストリームまたはストリーミングコンシューマーの ARN。

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

FunctionResponseTypes

N

関数がバッチ内の特定の失敗を報告できるようにするには、FunctionResponseTypes に値 ReportBatchItemFailures を含めます。詳細については、部分的なバッチレスポンスの実装 を参照してください。

MaximumBatchingWindowInSeconds

N

0

ScalingConfig

N

Amazon SQS イベントソースの最大同時実行数の設定