メニュー
Amazon Simple Queue Service
開発者ガイド

水平スケーリングとバッチ処理によるスループットの向上

Amazon SQS キューにより、かなり高いスループットを実現できます (1 秒あたり数千通のメッセージ)。 このスループットを達成するには、メッセージのプロデューサーとコンシューマーを水平にスケーリングする必要があります (プロデューサーとコンシューマーを追加します)。

水平スケーリングに加えて、バッチ処理は個々のメッセージリクエストに必要な数より少ないスレッド、接続、リクエストで一定のスループットを実現できます。バッチ処理された Amazon SQS API アクションを使用して、最大 10 通のメッセージを一度に送信、受信、または削除できます。Amazon SQS では、メッセージではなくリクエスト単位で課金されるため、バッチ処理によりコストを大幅に削減することもできます。

水平スケーリング

Amazon SQS には HTTP リクエストレスポンスプロトコルを通じてアクセスするため、1 回の接続で 1 つのスレッドを処理した場合のスループットは、リクエストのレイテンシー (リクエストの開始からレスポンスの受信までの時間) により低下します。たとえば、Amazon Elastic Compute Cloud (Amazon EC2) ベースのクライアントから同じリージョンにある Amazon SQS へのレイテンシーが平均約 20 ミリ秒の場合、1 回の接続で 1 つのスレッドを処理した場合の最大スループットは 1 秒あたりのオペレーション数が平均で 50 になります。

水平スケーリングは、全体的なキュースループットを高めるために、メッセージのプロデューサー (SendMessage リクエストを生成) とコンシューマー (ReceiveMessage リクエストと DeleteMessage リクエストを生成) の数を増やすことです。クライアントのスレッド数を増やしたり、クライアントを追加したりするか、その両方を行うことにより、水平的にスケーリングできます。クライアントを多く追加すればするほど、基本的にはキューのスループットが直線的に向上します。たとえば、クライアントの数を 2 倍にすると、スループットは倍増する場合があります。

重要

水平スケーリングを行うときは、使用している Amazon SQS キューの接続またはスレッドで、リクエストの送信とレスポンスの受信を並列して行うメッセージプロデューサーとメッセージコンシューマーの数を十分にサポートできることを確認する必要があります。たとえば、デフォルトでは AWS SDK for Java AmazonSQSClient クラスのインスタンスには Amazon SQS への接続が最大 50 件保持されます。追加の同時プロデューサーおよびコンシューマーを作成するには、その制限を調整する必要があります。たとえば、AWS SDK for Java では、次のコード行によって、AmazonSQSClient オブジェクト上の許容可能なプロデューサーおよびコンシューマースレッドの最大数を調整できます。

AmazonSQS sqsClient = new AmazonSQSClient(credentials, new ClientConfiguration().withMaxConnections(producerCount + consumerCount));

SDK for Java 非同期クライアント AmazonSQSAsyncClient の場合、使用可能なスレッドが十分にあることも確認する必要があります。詳細については、使用している SDK ライブラリのドキュメントを参照してください。

バッチ処理

Amazon SQS API のバッチアクション (SendMessageBatch および DeleteMessageBatch) で、一度に 10 個までのメッセージを処理することで、さらにスループットを最適化できます。ReceiveMessage は、一度に 10 件のメッセージを処理できるため、ReceiveMessageBatch アクションはありません。

バッチ処理の基本的な考え方は、サービスの各ラウンドトリップでより多くの処理を実行すること (1 つの SendMessageBatch リクエストで複数のメッセージを送信するなど) と、1 つのメッセージレイテンシー全体を受け入れる (SendMessage リクエストなど) のではなく、バッチ処理オペレーションのレイテンシーをバッチリクエスト内の複数のメッセージに分散することです。各ラウンドトリップがより多くの処理を実行するため、バッチリクエストがスレッドと接続をより効率的に使用するようになり、スループットが向上します。Amazon SQS ではリクエスト単位で課金されるため、少ないリクエストで同じ数のメッセージを処理するとコストが大幅に削減されます。さらに、スレッドと接続が減少することで、クライアント側のリソース使用率が下がるため、小さいホストまたは少ないホストで同じ処理が実行されてクライアント側のコストを削減できます。

バッチ処理を行うと、アプリケーションが少し複雑になります。たとえば、アプリケーションはメッセージを蓄積してから送信するため、レスポンスの待ち時間が長くなることがあります。ただし、以下のようにバッチ処理が効果的な場合があります。

  • アプリケーションが短い時間で多くのメッセージを生成するため、遅延が大幅に長くなることはない。

  • 一般的なメッセージプロデューサーが自身でコントロールしていないイベントに応答してメッセージを送信する必要があるのに対して、メッセージコンシューマーは自身の判断でキューからメッセージを取得する。

重要

バッチ内の個々のメッセージが失敗しても、バッチリクエスト (SendMessageBatch または DeleteMessageBatch) は成功することがあります。バッチリクエストの後、必ず個々のメッセージのエラーがないか確認し、必要に応じて再試行してください。

Amazon SQS のバッファされた非同期クライアント を使用すると、プロデューサーおよびコンシューマーを変更することなく、バッチ処理を活用できます。

このセクションで示す例には、簡単なプロデューサー/コンシューマーパターンが実装されています。詳しい例は、https://s3.amazonaws.com/cloudformation-examples/sqs-producer-consumer-sample.tar から無料でダウンロードできます。各テンプレートによってデプロイされるリソースについては、このセクションの後方で説明します。

サンプルコードは、/tmp/sqs-producer-consumer-sample/src のプロビジョニングされたインスタンスにあります。設定された実行のコマンドラインは、/tmp/sqs-producer-consumer-sample/command.log にあります。

メインスレッドにより、指定された時間に 1 KB のメッセージを処理するプロデューサーおよびコンシューマースレッドが多数発生します。この例には、単一オペレーションリクエストを生成するプロデューサーおよびコンシューマーと、バッチ処理リクエストを生成する他のプロデューサーおよびコンシューマーが含まれています。

プログラムでは、メインスレッドがプロデューサースレッドを停止するまで、各プロデューサースレッドがメッセージを送信します。producedCount オブジェクトは、すべてのプロデューサースレッドにより生成されたメッセージ数を追跡します。エラー処理はシンプルです。エラーがある場合、プログラムは run() メソッドを終了します。一時エラーが発生したリクエストは、デフォルトでは AmazonSQSClient により 3 回再試行されるため、このようなエラーが生じることはほとんどありません。必要に応じて再試行回数を設定して、スローされる例外の数を減らすことができます。メッセージプロデューサーの run() は、次のように実装されます。

try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { // By default AmazonSQSClient retries calls 3 times before failing, // so when this rare condition occurs, simply stop. log.error("Producer: " + e.getMessage()); System.exit(1); }

バッチプロデューサーは、ほとんど同じです。注目すべき 1 つ違いは、エラーが発生した個々のバッチエントリを再試行する必要があることです。

SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); }

コンシューマーの run() メソッドは次のとおりです。

while (!stop.get()) { result = sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } }

各コンシューマースレッドは、メインスレッドにより停止されるまでメッセージを受信および削除します。consumedCount オブジェクトは、すべてのコンシューマースレッドによって消費されるメッセージの数を追跡します。カウントは、定期的にログ記録されます。バッチコンシューマーも同様ですが、一度に最大 10 通のメッセージが受信され、DeleteMessage の代わりに DeleteMessageBatch が使用される点が異なります。

例の実行

用意された AWS CloudFormation テンプレートを使用して、3 つの異なる設定 (単一オペレーションリクエストが生成される 1 つのホスト、単一オペレーションリクエストが生成される 2 つのホスト、バッチリクエストが生成される 1 つのホスト) でサンプルコードを実行できます。

重要

サンプル全体は、.tar ファイル 1 つに収められています。各テンプレートによってデプロイされるリソースについては、このセクションの後方で説明します。

サンプルのコードは、/tmp/sqs-producer-consumer-sample/src のプロビジョニング済みインスタンスにあります。 設定された実行のコマンドラインは、/tmp/sqs-producer-consumer-sample/command.log にあります。

デフォルトの時間 (20 分) は、ボリュームメトリクスの 5 分間の CloudWatch データポイントが 3 つまたは 4 つ生成されるように設定されています。各実行の Amazon EC2 コストは m1.large インスタンスのコストです。Amazon SQS コストは各サンプルの API 呼び出しレートによって異なり、約 38,000 API コール/分 (バッチ処理サンプルの場合) から 380,000 API コール/分 (ダブルホストの単一 API サンプルの場合) までの間になります。

米国東部(バージニア北部) 以外のリージョンに AWS CloudFormation スタックをデプロイする場合、AWS CloudFormation コンソールのリージョンボックスで、リージョンを選択します。

の例を実行するには

  1. 以下のリンクのうち、起動するスタックに対応するリンクを選択します。

    • 単一オペレーション API、1 つのホスト : このサンプルテンプレートでは、単一オペレーション形式の Amazon SQS API リクエストとして SendMessageReceiveMessageDeleteMessage を使用しています。Amazon EC2 の 1 つの m1.large インスタンスは、16 のプロデューサースレッドと 32 のコンシューマースレッドを実行します。テンプレートを表示

    • 単一オペレーション API、2 つのホスト: このサンプルテンプレートでは、単一オペレーション形式の Amazon SQS API リクエストを使用していますが、Amazon EC2 の 1 つの m1.large インスタンスの代わりに、それぞれ 16 のプロデューサースレッドと 32 のコンシューマースレッドを実行する 2 つのインスタンスが使用されるため、合計 32 のプロデューサーと 64 のコンシューマーが実行されます。これは、プロデューサーとコンシューマーの数に比例してスループットが向上する Amazon SQS の伸縮性を示しています。テンプレートを表示

    • バッチ API、1 つのホスト: このサンプルテンプレートでは、バッチ形式の Amazon SQS API リクエストを Amazon EC2 の 1 つの m1.large インスタンスで使用し、12 のプロデューサースレッドと 20 のコンシューマースレッドを実行します。テンプレートを表示

  2. プロンプトが表示されたら、AWS マネジメントコンソールにサインインします。

  3. [Create Stack] ウィザードの [Select Template] ページで、[Continue] を選択します。

  4. [Specify Parameters] ページで、実行の完了時に Amazon EC2 インスタンスを自動的に終了するかどうかにかかわらず、プログラムの実行時間を指定し、サンプルを実行しているインスタンスにアクセスできるように Amazon EC2 キーペアを指定します。

  5. [I acknowledge that this template may create IAM resources] チェックボックスをオンにします。プロデューサー/コンシューマープログラムがキューにアクセスできるように、すべてのテンプレートにより AWS Identity and Access Management (IAM) ユーザーが作成されます。

  6. すべての設定が正しいことを確認したら、[Continue] を選択します。

  7. [Review] ページで、設定を見直します。希望どおりであれば、[Continue] を選択します。それ以外の場合は、[Back] をクリックし、必要に応じて変更します。

  8. ウィザードの最後のページで、[Close] を選択します。スタックのデプロイには数分かかる場合があります。

スタックのデプロイの進行状況を確認するには、AWS CloudFormation コンソールでサンプルスタックを選択します。下部のペインで、[Events] タブを選択します。スタックが作成されると、5 分以内にサンプルの実行が開始されます。実行されると、Amazon SQS コンソールにキューが表示されます。

キューアクティビティをモニタリングするには、次の操作を実行します。

  • クライアントインスタンスにアクセスしてそのログファイル /tmp/sqs-producer-consumer-sample/output.log を開き、これまでに生成および消費されたメッセージの集計を確認します。この集計は、1 秒ごとに更新されます。

  • Amazon SQS コンソールで、[Message Available] と [Messages in Flight] の数の変化を観察します。

また、キューが開始されてから最大 15 分の遅延が経過した後、このトピックの後方で説明するように CloudWatch でキューをモニタリングできます。

テンプレートとサンプルには、リソースの過剰使用を防ぐ安全装置が付いていますが、サンプルの実行が完了したら AWS CloudFormation スタックを削除することをお勧めします。削除するには、Amazon SQS コンソールで、削除するスタックを選択し、次に [Delete Stack] を選択します。すべてのリソースが削除されると、すべての CloudWatch メトリクスがゼロになります。

サンプル実行からのボリュームメトリクスのモニタリング

Amazon SQS は、送信、受信、削除されたメッセージのボリュームメトリクスを自動的に生成します。これらのメトリクスと他のメトリクスには、CloudWatch コンソールからアクセスできます。メトリクスを参照可能になるまで、キューが開始してから最大 15 分かかる場合があります。検索結果セットを管理するには、[Search] を選択し、モニタリングするキューとメトリクスに対応するチェックボックスをオンにします。

3 つのサンプルを連続して実行した場合の NumberOfMessageSent メトリクスを次に示します。結果は少し異なる場合がありますが、結果の質は似たようなものになります。

  • NumberOfMessagesReceived および NumberOfMessagesDeleted メトリクスには、同じパターンが表示されますが、無駄を省くためこのグラフでは省略しました。

  • 最初のサンプル (単一オペレーション API、1 つの m1.large インスタンス) では、5 分間に約 210,000 通のメッセージ、つまり 1 秒あたり約 700 通のメッセージが処理され、受信および削除オペレーションのスループットが同じです。

  • 2 つ目のサンプル (単一オペレーション API、2 つの m1.large インスタンス) は、スループットがほぼ 2 倍になります。5 分間に約 440,000 通のメッセージ、つまり 1 秒あたり約 1,450 通のメッセージが処理され、受信および削除オペレーションのスループットが同じです。

  • 最後のサンプル (バッチ API、1 つの m1.large インスタンス) では、5 分間に 800,000 通以上のメッセージ、つまり 1 秒あたり約 2,500 通のメッセージが処理され、受信および削除されたメッセージのスループットが同じです。バッチサイズが 10 の場合、これらのメッセージはかなり少ないリクエストで処理されるため、コストが低くなります。