Amazon SQS での Lambda の使用 - AWS Lambda

Amazon SQS での Lambda の使用

Amazon Simple Queue Service (Amazon SQS) キュー内のメッセージを処理するには、Lambda 関数を使用することができます。Lambda イベントソースマッピングは、標準のキューおよびファーストイン、ファーストアウト (FIFO) キューをサポートしています。Amazon SQS を使用すると、タスクをキューに送信して非同期的に処理することで、アプリケーションの 1 つのコンポーネントからタスクを任せることができます。

Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。Lambda はメッセージをバッチで読み取り、バッチごとに、一度に関数を呼び出します。関数が正常にバッチを処理すると、Lambda はキューからそのメッセージを削除します。以下の例では、2 つのメッセージのバッチのイベントを示しています。

例 Amazon SQS メッセージイベント (標準キュー)

{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

デフォルトでは、Lambda はキュー内の最大 10 個のメッセージを一度にポーリングし、そのバッチを関数に送信します。少数のレコードで関数を呼び出さないようにするには、バッチウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、バッチ処理ウィンドウの期限が切れる、呼び出しペイロードサイズのクォータに到達する、または設定された最大バッチサイズに到達するまで、SQS 標準キューからのメッセージのポーリングを継続します。

注記

バッチウィンドウを使用していて、SQS キューのトラフィックがきわめて少ない場合、Lambda は関数を呼び出す前に最大 20 秒間待機することがあります。これは、バッチウィンドウを 20 秒未満に設定した場合であっても同様です。

FIFO キューの場合、レコードには、重複除外と順序付けに関連する追加属性が含まれます。

例 Amazon SQS メッセージイベント (FIFO キュー)

{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

Lambda がバッチを読み取ると、メッセージはキューに留まりますが、キューの可視性タイムアウトの期間中は非表示になります。関数が正常にバッチを処理すると、Lambda はそのメッセージをキューから削除します。デフォルトで、関数がバッチを処理しているときにエラーが発生すると、そのバッチ内のすべてのメッセージが再びキューに表示されます。このため、関数コードは、意図しない副次的影響を及ぼすことなく同じメッセージを複数回処理できるようにする必要があります。バッチアイテムの失敗を関数のレスポンスに含めることで、この再処理動作を変更できます。

スケーリングと処理

標準キューの場合、Lambda はロングポーリングを使用して、キューがアクティブになるまでキューをポーリングします。メッセージが利用可能な場合、Lambda は最大 5 個のバッチを読み込み、それらを関数に送信します。メッセージがまだ利用可能な場合、Lambda はバッチを読み込むプロセスの数を 1 分あたり最大 60 インスタンスまで増やします。イベントソースマッピングによって同時に処理できるバッチの最大数は 1,000 です。

FIFO キューの場合、Lambda は、受信した順序でメッセージを関数に送信します。FIFO キューにメッセージを送信する場合、メッセージグループ ID を指定します。Amazon SQS は、同じグループ内のメッセージが Lambda に順番に配信されるようにします。Lambda はメッセージをグループにソートし、グループに対して一度に 1 つのバッチのみを送信します。関数がエラーを返す場合、その関数は、Lambda が同じグループから追加のメッセージを受信する前に、対象メッセージですべての再試行を試みます。

関数は、アクティブなメッセージグループの数に並行してスケールできます。詳細については、AWS コンピューティングブログの「イベントソースとしての SQS FIFO」を参照してください。

Lambda で使用するキューの設定

SQS キューを作成して、Lambda 関数のイベントソースとして機能できるようにします。次に、Lambda 関数がイベントの各バッチを処理できるよう、またスケールアップ時に Lambda がスロットリングエラーに反応して再試行できるように、時間を見越してキューで設定します。

関数がレコードの各バッチを処理するために十分な時間を取るため、ソースキューの可視性タイムアウトは、関数に設定したタイムアウトの少なくとも 6 倍にに設定してください。追加の時間は、関数が前のバッチの処理中にスロットリングされた場合に、Lambda が再試行することを可能にします。

関数がメッセージの処理に何回も失敗する場合、Amazon SQS はこのメッセージをデッドレターキューに送信できます。関数がエラーを返すと、Lambda はそのメッセージをキューに残します。可視性タイムアウトが発生すると、Lambda はメッセージをもう一度受信します。多数の受信後に 2 番目のキューにメッセージを送信するには、ソースキューにデッドレターキューを設定します。

注記

Lambda 関数ではなく、ソースキューのデッドレターキューを設定するようにしてください。関数で設定したデッドレターキューは、イベントソースキューではなく、関数の非同期呼び出しキューに使用されます。

関数からエラーが返された場合や、同時実行数の最大値に達しているために関数を呼び出せない場合は、追加の試行で処理が成功する場合があります。メッセージを配信不能キューに送信する前にメッセージが処理される確率を高めるには、送信元キューのリドライブポリシーの maxReceiveCount5 以上に設定します。

実行ロールのアクセス許可

Lambda では、Amazon SQS キューでメッセージを管理するために以下のアクセス権限が必要です。これを関数の実行ロールに追加します。

イベントソースとしてキューを設定する

イベントソースマッピングを作成し、キューから Lambda 関数に項目を送信するように Lambda に通知します。1 つの関数で複数のキューの項目を処理するには、複数のイベントソースマッピングを作成します。Lambda がターゲットの関数を呼び出すと、このイベントには設定可能なバッチサイズまでの複数の項目が含まれている可能性があります。

Lambda コンソールで Amazon SQS から読み取るように関数を設定するには、SQS トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. 関数の名前を選択します。

  3. [Function overview] (関数の概要) で [Add trigger] (トリガーを追加) をクリックします。

  4. SQS トリガータイプを選択します。

  5. 必須のオプションを設定し、[Add] (追加) を選択します。

Lambda は、Amazon SQS イベントソースの以下のオプションをサポートしています。

イベントソースオプション

  • SQS キュー - レコードの読み取り元である Amazon SQS キュー。

  • バッチサイズ - 各バッチで関数に送信されるレコードの数。標準キューの場合、最大 10,000 レコードまで可能です。FIFO キューの場合、最大値は 10 です。バッチサイズが 10 を超える場合は、MaximumBatchingWindowInSeconds パラメータも 1 秒以上に設定します。Lambda は、イベントの合計サイズが同期呼び出しの呼び出しペイロードサイズのクォータ (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。

    Lambda と Amazon SQS の両方が、レコードごとにメタデータを生成します。この追加のメタデータは合計ペイロードサイズに計上され、1 つのバッチで送信されるレコードの総数が設定されたバッチサイズよりも少なくなる可能性があります。Amazon SQS が送信するメタデータフィールドは可変長にすることができます。Amazon SQS メタデータフィールドの詳細については、Amazon Simple Queue Service API リファレンスReceiveMessage API 操作のドキュメントを参照してください。

  • Batch window (バッチウィンドウ) - 関数を呼び出す前にレコードを収集する最大時間 (秒数)。これが適用されるのは標準キューのみです。

    0 秒を超えるバッチウィンドウを使用している場合は、キューの可視性タイムアウトの処理時間の増加を考慮する必要があります。キューの可視性タイムアウトは、関数タイムアウトの 6 倍に MaximumBatchingWindowInSeconds の値を加えた時間に設定することをお勧めします。これによりスロットリングエラーが発生した場合に Lambda 関数がイベントの各バッチを処理し、再試行する時間が許容されます。

    注記

    バッチウィンドウが 0 より大きく、かつ (batch window) + (function timeout) > (queue visibility timeout) である場合、有効なキューの可視性タイムアウトは (batch window) + (function timeout) + 30s になります。

    Lambda は、一度に最大 5 つのバッチを処理します。これは、一度にメッセージのバッチ処理と処理を同時に実行できるワーカーが最大 5 つあることを意味します。各ワーカーは、メッセージの現在のバッチに対して個別の Lambda 呼び出しを表示します。

  • Enabled (有効) – イベントソースマッピングの状態。イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには、false に設定します。

注記

Amazon SQS には、リクエストに対する期限なしの無料利用枠があります。無料利用枠を超えると、Amazon SQS では 100 万件のリクエストごとに料金が発生します。イベントソースマッピングがアクティブな間、Lambda はアイテムを取得するためのリクエストをキューに送信します。料金の詳細については、「Amazon SQS の料金」を参照してください。

イベントソース設定を後ほど管理するには、Lambda コンソールのデザイナーで [SQS] トリガーを選択します。

バッチの項目全体を処理するために十分な時間で、関数のタイムアウト時間を設定します。項目の処理に長時間かかる場合には、より少ないバッチサイズを選択します。バッチサイズを大きくするとワークロードの効率を向上させることができ、非常に高速になるか、多くのコストがかかります。ただし、関数がエラーを返した場合、バッチ内のすべての項目がキューに戻ります。関数で [reserved concurrency] (予約された同時実行数) を設定すると、同時実行数を 5 以上に設定した場合に、Lambda が関数を呼び出したときにスロットリングエラーが発生する可能性が少なくなります。スロットリングエラーの可能性を排除するには、予約された同時実行数の値を 1,000 に設定します。これは Amazon SQS イベントソースの最大同時実行数です。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API オペレーションを使用できます。

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された Amazon SQS キューに、バッチサイズ 5、バッチウィンドウ 60 秒でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

次のような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

バッチ項目の失敗の報告

Lambda 関数がバッチを処理しているときにエラーが発生すると、デフォルトでそのバッチ内のすべてのメッセージが再度キューに表示され、これには Lambda が正常に処理したメッセージも含まれます。その結果、関数が同じメッセージを複数回処理することになる場合があります。

失敗したバッチ内のすべてのメッセージを再処理しないようにするために、失敗したメッセージのみを再び表示するようにイベントソースマッピングを設定できます。これを行うには、イベントソースマッピングを設定するときに、値 ReportBatchItemFailuresFunctionResponseTypes リストに含めます。そうすると、関数が部分的な成功を返すようになるため、レコードでの不必要な再試行回数を減らすことができます。

レポートの構文

イベントソースマッピング設定に ReportBatchItemFailures を含めると、失敗したメッセージ ID のリストを関数レスポンスで返すことができます。例えば、メッセージ ID が id1id2id3id4、および id5 である 5 つのメッセージのバッチがあるとします。関数は、id1id3id5 を正常に処理します。メッセージ id2id4 がキューに再び表示されるようにする場合、レスポンスの構文は次のようになります。

{ "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

バッチ内の失敗したメッセージ ID のリストを返すために、SQSBatchResponse クラスオブジェクトを使用するか、独自のカスタムクラスを作成できます。以下は、SQSBatchResponse オブジェクトを使用するレスポンスの例です。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }

この機能を使用するには、関数がエラーを正常に処理する必要があります。関数ロジックがすべての例外をキャッチし、関数レスポンスの batchItemFailures で失敗につながるメッセージを報告するようにします。関数が例外をスローする場合、バッチ全体が完全な失敗とみなされます。

注記

この機能を FIFO キューで使用している場合、関数は最初の失敗後にメッセージの処理を停止し、batchItemFailures で失敗したメッセージと未処理のメッセージのすべてを返します。これは、キュー内のメッセージの順序を維持するのに役立ちます。

成功条件と失敗の条件

関数が以下のいずれかを返す場合、Lambda はバッチを完全な成功として扱います。

  • 空の batchItemFailures リスト

  • null の batchItemFailures リスト

  • 空の EventResponse

  • null の EventResponse

関数が以下のいずれかを返す場合、Lambda はバッチを完全な失敗として扱います。

  • 無効な JSON レスポンス

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

  • 存在しないメッセージ ID を持つ itemIdentifier

CloudWatch のメトリクス

関数がバッチ項目の失敗を正しく報告しているかどうかを判断するために、Amazon SQS メトリクスの NumberOfMessagesDeleted および ApproximateAgeOfOldestMessage を Amazon CloudWatch でモニタリングできます。

  • NumberOfMessagesDeleted は、キューから削除されたメッセージの数を追跡します。これが 0 になるということは、関数レスポンスが失敗したメッセージを正しく返していないことを示唆しています。

  • ApproximateAgeOfOldestMessage は、最も古いメッセージがキューに残っている期間を追跡します。このメトリクスの急激な増加は、関数が失敗したメッセージを正しく返していないことを示唆している可能性があります。

Amazon SQS 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon SQS に適用されるのは一部のパラメータのみです。

Amazon SQS に適用されるイベントソースパラメータ
パラメータ 必須 デフォルト メモ

BatchSize

N

10

標準キューの場合、最大値は 10,000 です。FIFO キューの場合、最大値は 10 です。

有効

N

true

EventSourceArn

Y

データストリームまたはストリーミングコンシューマーの ARN。

FunctionName

Y

FunctionResponseTypes

N

関数がバッチ内の特定の失敗を報告できるようにするには、FunctionResponseTypes に値 ReportBatchItemFailures を含めます。詳細については、「バッチ項目の失敗の報告」を参照してください。

MaximumBatchingWindowInSeconds

N

0