EventBridge Pipes のソースとしての Amazon Simple Queue Service - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EventBridge Pipes のソースとしての Amazon Simple Queue Service

EventBridge Pipes を使用して、Amazon SQSキューからレコードを受信できます。その後、必要に応じてこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先に送信できます。

パイプを使用して Amazon Simple Queue Service (Amazon SQS) キュー内のメッセージを処理できます。 EventBridge パイプは標準キュー先入れ先出し (FIFO) キューをサポートします。Amazon ではSQS、タスクをキューに送信して非同期的に処理することで、アプリケーションの 1 つのコンポーネントからタスクをオフロードできます。

EventBridge はキューをポーリングし、キューメッセージを含むイベントでパイプを同期的に呼び出します。 はメッセージをバッチで EventBridge 読み取り、バッチごとにパイプを 1 回呼び出します。パイプがバッチを正常に処理すると、 はキューからそのメッセージ EventBridge を削除します。

デフォルトでは、 はキューに最大 10 個のメッセージを同時に EventBridge ポーリングし、そのバッチをパイプに送信します。少数のレコードでパイプを呼び出さないようにするには、バッチウィンドウを設定して、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。パイプを呼び出す前に、 は次のいずれかが発生するまで Amazon SQS標準キューからのメッセージのポーリング EventBridge を続行します。

  • バッチウィンドウの有効期限が切れる。

  • 呼び出しペイロードのサイズがクォータに達する。

  • 最大バッチサイズに達する。

注記

バッチウィンドウを使用していて、Amazon SQSキューのトラフィックが少ない場合は、パイプを呼び出す前に最大 20 秒待つ EventBridge ことができます。これは、バッチウィンドウを 20 秒未満に設定した場合であっても同様です。FIFO キューの場合、レコードには重複排除とシーケンスに関連する追加の属性が含まれます。

がバッチ EventBridge を読み取ると、メッセージはキューに残りますが、キューの可視性タイムアウト の間は非表示になります。パイプがバッチを正常に処理すると、 はキューからメッセージ EventBridge を削除します。デフォルトで、パイプがバッチを処理しているときにエラーが発生すると、そのバッチ内のすべてのメッセージが再びキューに表示されます。このため、パイプコードは、意図しない副次的影響を及ぼすことなく同じメッセージを複数回処理できるようにする必要があります。バッチアイテムの失敗をパイプのレスポンスに含めることで、この再処理動作を変更できます。以下の例では、2 つのメッセージのバッチのイベントを示しています。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes でのイベントフィルタリング」を参照してください。

スタンダードキュー

[ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ]

FIFO キュー

[ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ]

スケーリングと処理

標準キューの場合、キューがアクティブになるまで、 はロングポーリング EventBridge を使用してキューをポーリングします。メッセージが利用可能になると、 は最大 5 つのバッチを EventBridge 読み取り、パイプに送信します。メッセージがまだ利用可能な場合、バッチを読み取るプロセスの数を 1 分あたり最大 300 インスタンス EventBridge 増やします。パイプによって同時に処理できるバッチの最大数は 1,000 です。

FIFO キューの場合、 EventBridge はパイプにメッセージを受信順に送信します。FIFO キューにメッセージを送信するときは、メッセージグループ ID を指定します。Amazon は EventBridge、同じグループのメッセージを に順番に配信することSQSを容易にします。 は受信したメッセージをグループに EventBridge ソートし、グループに対して一度に 1 つのバッチのみを送信します。パイプがエラーを返すと、パイプは影響を受けるメッセージに対してすべての再試行を試行し、 は同じグループから追加のメッセージ EventBridge を受信します。

EventBridge Pipes で使用するキューの設定

パイプのソースとして機能する Amazon SQSキューを作成します。次に、パイプがイベントの各バッチを処理し、スケールアップ時にスロットリングエラーに応答して が再試行する時間を許可する EventBridge ようにキューを設定します。

パイプがレコードの各バッチを処理するために十分な時間を取るため、ソースキューの可視性タイムアウトは、パイプのエンリッチメントとターゲットコンポーネントのランタイムを組み合わせた時間の少なくとも 6 倍に設定してください。余分な時間により EventBridge 、前のバッチの処理中にパイプがスロットリングされた場合、 は再試行できます。

パイプがメッセージを複数回処理できない場合、Amazon SQSはデッドレターキュー に送信できます。 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.htmlパイプがエラーを返すと、 はそれをキューに EventBridge 保持します。可視性タイムアウトが発生すると、 EventBridge はメッセージを再度受け取ります。多数の受信後に 2 番目のキューにメッセージを送信するには、ソースキューにデッドレターキューを設定します。

注記

パイプではなく、ソースキューのデッドレターキューを設定するようにしてください。パイプで設定したデッドレターキューは、ソースキューではなく、パイプの非同期呼び出しキューに使用されます。

パイプからエラーが返された場合や、同時実行数の最大値に達しているために関数を呼び出せない場合は、追加の試行で処理が成功する場合があります。メッセージをデッドレターキューに送信する前にメッセージが処理される確率を高めるには、送信元キューのリドライブポリシーの maxReceiveCount5 以上に設定します。

バッチ項目の失敗の報告

がソースからのストリーミングデータを EventBridge 消費して処理する場合、デフォルトではバッチのシーケンス番号が最も高いチェックポイントになりますが、バッチが完全に成功した場合に限ります。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。

詳細については、「部分的なバッチ処理失敗」を参照してください。

成功条件と失敗の条件

次のいずれかを返すと、 はバッチを完全に成功として EventBridge 処理します。

  • 空の batchItemFailure リスト

  • null の batchItemFailure リスト

  • 空の EventResponse

  • null の EventResponse

次のいずれかを返すと、 はバッチを完全に失敗として EventBridge 処理します。

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

EventBridge は、再試行戦略に基づいて失敗を再試行します。