Amazon SQS で AWS Lambda を使用する - AWS Lambda

Amazon SQS で AWS Lambda を使用する

Amazon Simple Queue Service (Amazon SQS) キュー内のメッセージを処理するときは、AWS Lambda 関数を使用します。Lambda イベントソースマッピングは、標準のキューおよびファーストイン、ファーストアウト (FIFO) キューをサポートしています。Amazon SQS を使用すると、タスクをキューに送信して非同期的に処理することで、アプリケーションの 1 つのコンポーネントからタスクを任せることができます。

Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。Lambda はメッセージをバッチで読み取り、バッチごとに、一度に関数を呼び出します。関数が正常にバッチを処理すると、Lambda はキューからそのメッセージを削除します。以下の例では、2 つのメッセージのバッチのイベントを示しています。

例 Amazon SQS メッセージイベント (標準キュー)

{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

デフォルトでは、SQS キューでレコードが使用可能になるとすぐに、Lambda が関数を呼び出します。Lambda はキュー内の最大 10 個のメッセージを一度にポーリングし、そのバッチを関数に送信します。少数のレコードで関数を呼び出さないようにするには、バッチウィンドウを設定して、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda はバッチウィンドウの有効期限が切れるか、ペイロードの上限に達するか、バッチサイズ全体に達するまで SQS 標準キューからのメッセージのポーリングを続けます。

FIFO キューの場合、レコードには、重複除外と順序付けに関連する追加属性が含まれます。

例 Amazon SQS メッセージイベント (FIFO キュー)

{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

Lambda がバッチを読み取る際、メッセージはキューに留まりますが、キューの可視性タイムアウトの間は非表示になります。関数が正常にバッチを処理すると、Lambda はそのメッセージをキューから削除します。関数がスロットリングされた、エラーを返した、または応答しなかった場合は、メッセージが再び表示されるようになります。失敗したバッチのすべてのメッセージはキューに戻るため、関数コードが副作用なしで同じメッセージを複数回処理できることが必要となります。

スケーリングと処理

標準キューの場合、Lambda はロングポーリングを使用して、キューがアクティブになるまでキューをポーリングします。メッセージが利用可能な場合、Lambda はバッチを最大 5 個まで読み込み、それらを関数に送信します。メッセージがまだ利用可能な場合、Lambda はバッチを読み込むプロセスの数を 1 分あたり最大 60 インスタンスまで増やします。イベントソースマッピングによって同時に処理できるバッチの最大数は 1000 です。

FIFO キューの場合、Lambda は、受信した順序でメッセージを関数に送信します。FIFO キューにメッセージを送信する場合、メッセージグループ ID を指定します。Amazon SQS は、同じグループ内のメッセージが Lambda に順番に配信されるようにします。Lambda はメッセージをグループにソートし、グループに対して一度に 1 つのバッチのみを送信します。関数からエラーが返されると、Lambda は同じグループから追加のメッセージを受信する前に、影響を受けたメッセージのすべての再試行を行います。

関数は、アクティブなメッセージグループの数に並行してスケールできます。詳細については、AWS コンピューティングブログの「イベントソースとしての SQS FIFO」を参照してください。

Lambda で使用するキューの設定

SQS キューを作成して、Lambda 関数のイベントソースとして機能できるようにします。次に、Lambda 関数がイベントの各バッチを処理できるよう、またスケールアップ時に Lambda がスロットリングエラーに反応して再試行できるように、時間を見越してキューで設定します。

レコードの各バッチを処理する関数時間を許可するには、ソースキューの可視性タイムアウトを、関数で設定したタイムアウトの少なくとも 6 倍に設定します。関数が前のバッチを処理している間に関数の実行がスロットリングされたときのために、追加時間が Lambda の再試行のために与えられます。

メッセージが何回も処理に失敗する場合、Amazon SQS はこのメッセージをデッドレターキューに送信できます。関数がエラーを返すと、Lambda はそのメッセージをキューに残します。可視性タイムアウトが発生すると、Lambda はメッセージをもう一度受信します。多数の受信後に 2 番目のキューにメッセージを送信するには、ソースキューにデッドレターキューを設定します。

注記

Lambda 関数ではなく、ソースキューのデッドレターキューを設定するようにしてください。関数で設定したデッドレターキューは、イベントソースキューではなく、関数の非同期呼び出しキューに使用されます。

関数からエラーが返された場合や、同時実行数の最大値に達しているために関数を呼び出せない場合は、追加の試行で処理が成功する場合があります。メッセージを配信不能キューに送信する前にメッセージが処理される確率を高めるには、送信元キューのリドライブポリシーの maxReceiveCount5 以上に設定します。

実行ロールのアクセス許可

Lambda では、Amazon SQS キューでメッセージを管理するために以下のアクセス権限が必要です。これを関数の実行ロールに追加します。

詳細については、「AWS Lambda 実行ロール」を参照してください。

イベントソースとしてキューを設定する

イベントソースマッピングを作成し、キューから Lambda 関数に項目を送信するように Lambda に通知します。1 つの関数で複数のキューの項目を処理するには、複数のイベントソースマッピングを作成します。Lambda がターゲットの関数を呼び出すと、このイベントには設定可能なバッチサイズまでの複数の項目が含まれている可能性があります。

Lambda コンソールで Amazon SQS から読み取るように関数を設定するには、SQS トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソールで [Functions (関数)] ページを開きます。

  2. 関数を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. トリガーのタイプを選択します。

  5. 必須のオプションを設定し、[Add] (追加) を選択します。

Lambda は、Amazon SQS イベントソースの以下のオプションをサポートしています。

イベントソースオプション

  • SQS キュー - レコードの読み取り元である Amazon SQS キュー。

  • バッチサイズ - 各バッチで関数に送信されるレコードの数。標準キューの場合、最大 10000 レコードまで可能です。FIFO キューの最大値は 10 です。バッチサイズが 10 を超える場合は、MaximumBatchingWindowInSeconds パラメータも 1 秒以上に設定します。Lambda は、イベントの合計サイズが同期呼び出しのペイロード上限 (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。

    メタデータは、レコードごとに Lambda と Amazon SQS の両方により生成されます。この追加のメタデータは、合計ペイロードサイズにカウントされ、バッチで送信されるレコードの総数が、設定されたバッチサイズよりも小さくなる可能性があります。Amazon SQS によって送信されるメタデータフィールドの長さは可変です。Amazon SQS メタデータフィールドの詳細については、Amazon Simple Queue Service API リファレンスReceiveMessage ドキュメントを参照してください。

  • バッチウィンドウ - 関数を呼び出す前にレコードを収集する最大時間(秒数)を指定します。標準キューにのみ適用されます。

    0 秒を超えるバッチウィンドウを使用している場合は、キューの可視性タイムアウトの処理時間の増加を考慮する必要があります。キューの可視性タイムアウトを、関数タイムアウトの 6 倍に MaximumBatchingWindowInSeconds を加えた値に設定することを奨励します。これによりスロットリングエラーが発生した場合に Lambda 関数がイベントの各バッチを処理し、再試行する時間が許容されます。

    注記

    バッチウィンドウが 0 より大きく (batch window) + (function timeout) > (queue visibility timeout) の場合、有効なキューの可視性タイムアウトは (batch window) + (function timeout) + 30s になります。

    Lambda は、一度に最大 5 つのバッチを処理します。つまり、一度にメッセージのバッチ処理と処理を同時に実行できるワーカーは最大 5 つです。各ワーカーは、メッセージの現在のバッチに対して個別の Lambda 呼び出しを表示します。

  • 有効 - イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには、false に設定します。

注記

Amazon SQS には、リクエストに対する期限なしの無料利用枠があります。無料利用枠を超えると、Amazon SQS では 100 万件のリクエストごとに料金が発生します。イベントソースマッピングがアクティブな間、Lambda はアイテムを取得するためのリクエストをキューに送信します。料金の詳細については、Amazon Simple Queue Service の料金を参照してください。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

バッチの項目全体を処理するために十分な時間で、関数のタイムアウト時間を設定します。項目の処理に長時間かかる場合には、より少ないバッチサイズを選択します。バッチサイズを大きくするとワークロードの効率を向上させることができ、非常に高速になるか、多くのコストがかかります。ただし、関数がエラーを返した場合、バッチ内のすべての項目がキューに戻ります。関数で予約された同時実行数を設定すると、同時実行数を 5 以上に設定した場合に、Lambda が関数を呼び出したときにスロットリングエラーが発生する可能性が少なくなります。スロットリングエラーの可能性を排除するには、予約された同時実行数を 1000 に設定します。これは Amazon SQS イベントソースの同時実行の最大数です。

イベントソースマッピング API

イベントソースを AWS CLI または AWSSDK を使って管理するには、次の API オペレーションを使用します。

次の例では、AWS CLI を使用して、関数 my-function を、Amazon リソースネーム (ARN) により指定された Amazon SQS キューに、バッチサイズ 5、バッチウィンドウ 60 秒でマップします。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

次のような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }