メニュー
Amazon Simple Queue Service
開発者ガイド

水平スケーリングとバッチ処理によるスループットの向上

Amazon SQS キューにより、かなり高いスループットを実現できます (1 秒あたり数千通のメッセージ)。 このスループットを実現する鍵は、メッセージのプロデューサーとコンシューマーの水平スケーリングです。さらに、Amazon SQS API でバッチ処理アクションを使用して、最大 10 通のメッセージを一度に送信、受信、または削除できます。水平スケーリングとバッチ処理を組み合わせると、個々のメッセージリクエストに必要な数より少ないスレッド、接続、リクエストで一定のスループットを実現できます。Amazon SQS では、メッセージではなくリクエスト単位で課金されるため、バッチ処理によりコストを大幅に削減することもできます。

この付録では、水平スケーリングとバッチ処理について詳しく説明します。その後、自分で試すことができる簡単な例について手順を追って説明します。さらに、CloudWatch を使用してモニタリング可能な Amazon SQS のスループットメトリクスについても簡単に説明します。

水平スケーリング

Amazon SQS には HTTP リクエストレスポンスプロトコルを通じてアクセスするため、1 回の接続で 1 つのスレッドを処理した場合のスループットは、リクエストのレイテンシー (リクエストの開始からレスポンスの受信までの時間) により低下します。たとえば、Amazon Elastic Compute Cloud (Amazon EC2) ベースのクライアントから同じリージョンにある Amazon SQS へのレイテンシーが平均約 20 ミリ秒の場合、1 回の接続で 1 つのスレッドを処理した場合の最大スループットは 1 秒あたりのオペレーション数が平均で 50 件になります。

水平スケーリングは、全体的なキュースループットを高めるために、メッセージのプロデューサー (SendMessage リクエストを生成) とコンシューマー (ReceiveMessage リクエストと DeleteMessage リクエストを生成) の数を増やすことです。クライアントのスレッド数を増やしたり、クライアントを追加したりするか、その両方を行うことにより、水平的にスケーリングできます。クライアントを多く追加すればするほど、基本的にはキューのスループットが直線的に向上します。たとえば、クライアントの数を 2 倍にした場合、スループットは 2 倍になります。

重要

水平スケーリングを行うときは、使用している Amazon SQS キューに、リクエストの送信とレスポンスの受信を行うメッセージの同時プロデューサーおよびコンシューマー数をサポートできる十分な接続またはスレッドがあることを確認する必要があります。たとえば、デフォルトでは AWS SDK for Java AmazonSQSClient クラスのインスタンスには Amazon SQS への接続が最大 50 件保持されます。追加の同時プロデューサーおよびコンシューマーを作成するには、その制限を調整する必要があります。たとえば、AWS SDK for Java では、次のコード行によって、AmazonSQSClient オブジェクト上の許容可能なプロデューサーおよびコンシューマースレッドの最大数を調整できます。

Copy to clipboard
AmazonSQS sqsClient = new AmazonSQSClient(credentials, new ClientConfiguration().withMaxConnections(producerCount + consumerCount));

SDK for Java 非同期クライアント AmazonSQSAsyncClient の場合、使用可能なスレッドが十分にあることも確認する必要があります。詳細については、使用している SDK ライブラリのドキュメントを参照してください。

バッチ処理

Amazon SQS API のバッチアクション (SendMessageBatch および DeleteMessageBatch) で、一度に 10 個までのメッセージを処理することで、さらにスループットを最適化できます。ReceiveMessage は、一度に 10 件のメッセージを処理できるため、ReceiveMessageBatch アクションはありません。

バッチ処理の基本的な考え方は、サービスの各ラウンドトリップでより多くの処理を実行すること (1 つの SendMessageBatch リクエストで複数のメッセージを送信するなど) と、1 つのメッセージレイテンシー全体を受け入れる (SendMessage リクエストなど) のではなく、バッチ処理オペレーションのレイテンシーをバッチリクエスト内の複数のメッセージに分散することです。各ラウンドトリップがより多くの処理を実行するため、バッチリクエストがスレッドと接続をより効率的に使用するようになり、スループットが向上します。Amazon SQS ではリクエスト単位で課金されるため、少ないリクエストで同じ数のメッセージを処理するとコストが大幅に削減されます。さらに、スレッドと接続が減少することで、クライアント側のリソース使用率が下がるため、小さいホストまたは少ないホストで同じ処理が実行されてクライアント側のコストを削減できます。

バッチ処理を行うと、アプリケーションが少し複雑になります。たとえば、アプリケーションはメッセージを送信前に蓄積する必要があるため、レスポンスの待ち時間が長くなることがあります。ただし、バッチ処理は以下の状況に効果がある可能性があります。

  • アプリケーションが短い時間で多くのメッセージを生成するため、遅延が大幅に長くなることはない。

  • 一般的なメッセージプロデューサーが自身でコントロールしていないイベントに応答してメッセージを送信する必要があるのに対して、メッセージコンシューマーは自身の判断でキューからメッセージを取得する。

重要

バッチ内の個々のメッセージが失敗しても、バッチリクエスト (SendMessageBatch または DeleteMessageBatch) は成功することがあります。バッチリクエストの後、必ず個々のメッセージのエラーがないか確認し、必要に応じて再試行してください。

このセクションで示す例には、簡単なプロデューサー/コンシューマーパターンが実装されています。詳しい例は、https://s3.amazonaws.com/cloudformation-examples/sqs-producer-consumer-sample.tar から無料でダウンロードできます。各テンプレートによってデプロイされるリソースについては、このセクションの後方で説明します。

サンプルコードは、/tmp/sqs-producer-consumer-sample/src のプロビジョニングされたインスタンスにあります。設定された実行のコマンドラインは、/tmp/sqs-producer-consumer-sample/command.log にあります。

メインスレッドにより、指定された時間に 1 KB のメッセージを処理するプロデューサーおよびコンシューマースレッドが多数発生します。この例には、単一オペレーションリクエストを生成するプロデューサーおよびコンシューマーと、バッチ処理リクエストを生成する他のプロデューサーおよびコンシューマーが含まれています。

プログラムでは、メインスレッドがプロデューサースレッドを停止するまで、各プロデューサースレッドがメッセージを送信します。producedCount オブジェクトは、すべてのプロデューサースレッドにより生成されたメッセージ数を追跡します。エラー処理はシンプルです。エラーがある場合、プログラムは run() メソッドを終了します。一時エラーが発生したリクエストは、デフォルトでは AmazonSQSClient により 3 回再試行されるため、このようなエラーが生じることはほとんどありません。必要に応じて再試行回数を設定して、スローされる例外の数を減らすことができます。メッセージプロデューサーの run() は、次のように実装されます。

Copy to clipboard
try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { // By default AmazonSQSClient retries calls 3 times before failing, // so when this rare condition occurs, simply stop. log.error("Producer: " + e.getMessage()); System.exit(1); }

バッチプロデューサーは、ほとんど同じです。注目すべき 1 つ違いは、エラーが発生した個々のバッチエントリを再試行する必要があることです。

Copy to clipboard
SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); }

コンシューマーの run() メソッドは次のとおりです。

Copy to clipboard
while (!stop.get()) { result = sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } }

各コンシューマースレッドは、メインスレッドにより停止されるまでメッセージを受信および削除します。consumedCount オブジェクトは、すべてのコンシューマースレッドによって消費されるメッセージの数を追跡します。カウントは、定期的にログ記録されます。バッチコンシューマーも同様ですが、一度に最大 10 通のメッセージが受信され、DeleteMessage の代わりに DeleteMessageBatch が使用される点が異なります。

例の実行

用意された AWS CloudFormation テンプレートを使用して、3 つの異なる設定 (単一オペレーションリクエストが生成される 1 つのホスト、単一オペレーションリクエストが生成される 2 つのホスト、バッチリクエストが生成される 1 つのホスト) でサンプルコードを実行できます。

重要

サンプル全体は、.tar ファイル 1 つに収められています。各テンプレートによってデプロイされるリソースについては、このセクションの後方で説明します。

サンプルコードは、/tmp/sqs-producer-consumer-sample/src のプロビジョニングされたインスタンスにあります。設定された実行のコマンドラインは、/tmp/sqs-producer-consumer-sample/command.log にあります。

デフォルトの時間 (20 分) は、ボリュームメトリクスの 5 分間の CloudWatch データポイントが 3 つまたは 4 つ生成されるように設定されています。各実行の Amazon EC2 コストは、m1.large インスタンスのコストです。Amazon SQS コストは各サンプルの API 呼び出しレートによって異なり、約 38,000 API 呼び出し/分 (バッチ処理サンプルの場合) から 380,000 API 呼び出し/分 (2 つのホストの単一 API サンプルの場合) の間になります。たとえば、1 つのホストで単一 API サンプルを実行すると、m1.large のコストは約 1 インスタンス時間 (大規模な標準オンデマンドインスタンス。2012 年 7 月時点で $0.32) で、デフォルト時間が 20 分の Amazon SQS オペレーションのコストは 20 分 x 190,000 API 呼び出し/分 x $1 / 1,000,000 API 呼び出し = $3.80 となります (2012 年 7 月時点。最新の価格は確認してください)。

米国東部(バージニア北部) 以外のリージョンに AWS CloudFormation スタックをデプロイする場合、AWS CloudFormation コンソールのリージョンボックスで、リージョンを選択します。

の例を実行するには

  1. 以下のリンクのうち、起動するスタックに対応するリンクを選択します。

  2. プロンプトが表示されたら、AWS マネジメントコンソールにサインインします。

  3. [Create Stack] ウィザードの [Select Template] ページで、[Continue] を選択します。

  4. [Specify Parameters] ページで、実行の完了時に Amazon EC2 インスタンスを自動的に終了するかどうかにかかわらず、プログラムの実行時間を指定し、サンプルを実行しているインスタンスにアクセスできるように Amazon EC2 キーペアを指定します。例を示します。

  5. [I acknowledge that this template may create IAM resources] チェックボックスをオンにします。プロデューサー/コンシューマープログラムがキューにアクセスできるように、すべてのテンプレートにより AWS Identity and Access Management (IAM) ユーザーが作成されます。

  6. すべての設定が正しいことを確認したら、[Continue] を選択します。

  7. [Review] ページで、設定を見直します。希望どおりであれば、[Continue] を選択します。それ以外の場合は、[Back] をクリックし、必要に応じて変更します。

  8. ウィザードの最後のページで、[Close] を選択します。スタックのデプロイには数分かかる場合があります。

スタックのデプロイの進行状況を確認するには、AWS CloudFormation コンソールでサンプルスタックを選択します。下部のペインで、[Events] タブを選択します。スタックが作成されると、5 分以内にサンプルの実行が開始されます。実行されると、Amazon SQS コンソールにキューが表示されます。

キューアクティビティをモニタリングするには、次の操作を実行します。

  • クライアントインスタンスにアクセスしてそのログファイル (/tmp/sqs-producer-consumer-sample/output.log) を開き、これまでに生成および消費されたメッセージの集計を確認します。この集計は、1 秒ごとに更新されます。

  • Amazon SQS コンソールで、[Message Available] と [Messages in Flight] の数の変化を観察します。

また、キューが開始されてから最大 15 分の遅延が経過した後、このトピックの後方で説明するように CloudWatch でキューをモニタリングできます。

テンプレートとサンプルには、リソースの過剰使用を防ぐ安全装置が付いていますが、サンプルの実行が完了したら AWS CloudFormation スタックを削除することをお勧めします。削除するには、Amazon SQS コンソールで、削除するスタックを選択し、次に [Delete Stack] を選択します。リソースがすべて削除されると、CloudWatch メトリクスがすべてゼロになります。

サンプル実行からのボリュームメトリクスのモニタリング

Amazon SQS は、送信、受信、削除されたメッセージのボリュームメトリクスを自動的に生成します。これらのメトリクスと他のメトリクスには、CloudWatch コンソールからアクセスできます。メトリクスを参照可能になるまで、キューが開始してから最大 15 分かかる場合があります。検索結果セットを管理するには、[Search] を選択し、モニタリングするキューとメトリックスに対応するチェックボックスをオンにします。

3 つのサンプルを連続して実行した場合の NumberOfMessageSent メトリクスを次に示します。結果は少し異なる場合がありますが、結果の質は似たようなものになります。

  • NumberOfMessagesReceived および NumberOfMessagesDeleted メトリクスには、同じパターンが表示されますが、無駄を省くためこのグラフでは省略しました。

  • 最初のサンプル (1 つの m1.large にシングルオペレーション API) では、5 分間に約 210,000 通のメッセージ、つまり 1 秒あたり約 700 通のメッセージが処理され、受信および削除オペレーションのスループットが同じです。

  • 2 つ目の例 (2 つの m1.large インスタンスにシングルオペレーション API) は、スループットがほぼ 2 倍になります。5 分間に約 440,000 通のメッセージ、つまり 1 秒あたり約 1,450 通のメッセージが処理され、受信および削除オペレーションのスループットが同じです。

  • 最後のサンプル (1 つの m1.large にバッチ処理 API) では、5 分間に 800,000 通以上のメッセージ、つまり 1 秒あたり約 2,500 通のメッセージが処理され、受信および削除されたメッセージのスループットが同じです。バッチサイズが 10 の場合、これらのメッセージはかなり少ないリクエストで処理されるため、コストが低くなります。