Amazon SQS でのクライアント側のバッファリングとリクエストのバッチ処理の有効化 - Amazon Simple Queue Service

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon SQS でのクライアント側のバッファリングとリクエストのバッチ処理の有効化

AWS SDK for Javaを含むAmazonSQSBufferedAsyncClientAmazon SQSにアクセスするもの。このクライアントでは、クライアント側のバッファリングを使用してリクエストのバッチを簡単に行うことができます。クライアントから行われた呼び出しは、最初にバッファされ、バッチリクエストとして Amazon SQS に送信されます。

クライアント側のバッファリングは最大10個のリクエストをバッファリングし、バッチリクエストとして送信できるため、Amazon SQSの利用コストを削減して、送信リクエストの数を減らすことができます。バッファリングは同期および非同期コールの両方をAmazonSQSBufferedAsyncClient バッファします。バッチ処理されたリクエストと ロングポーリングのサポートによって、スループットを向上させることもできます。詳細については、「Amazon SQS での水平スケーリングとアクションのバッチ処理を使用したスループットの向上」を参照してください。

AmazonSQSBufferedAsyncClientAmazonSQSAsyncClientと同じインターフェイスを実行するため、AmazonSQSAsyncClient から AmazonSQSBufferedAsyncClient に移行するには通常既存のコードを最小限変更するだけです。

注記

Amazon SQS バッファリング非同期クライアントは現在 FIFOキューをサポートしていません。

AmazonSQSBufferedAsyncClient の使用

開始する前に、「Amazon SQSのセットアップ」のステップを完了します。

AWS SDK for Java 1.x

AWS SDK for Java 1.x では、次の例AmazonSQSBufferedAsyncClientに基づいて新しい を作成できます。

// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

新規作成したらAmazonSQSBufferedAsyncClient、これを使用してAmazon SQSに複数のリクエストを送信できます (AmazonSQSAsyncClient と同様です)。次に例を示します。

final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue"); final CreateQueueResult res = bufferedSqs.createQueue(createRequest); final SendMessageRequest request = new SendMessageRequest(); final String body = "Your message text" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request); final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

AmazonSQSBufferedAsyncClient の設定

AmazonSQSBufferedAsyncClient は、ほとんどのユースケースに合うように事前に設定されています。たとえば、AmazonSQSBufferedAsyncClient をさらに設定できます。'

  1. 必要な設定パラメータを使用して、QueueBufferConfig クラスのインスタンスを作成します。

  2. AmazonSQSBufferedAsyncClient コンストラクタにインスタンスを指定します。

// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); final QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
QueueBufferConfig の設定パラメータ
パラメータ デフォルト値 説明
longPoll true

longPolltrue に設定されている場合、AmazonSQSBufferedAsyncClient はメッセージの処理時にロングポーリングを使用しようとします。

longPollWaitTimeoutSeconds 20 s

空の受信結果を返すまでに、キュー内へのメッセージの出現をサーバーが待機するのを ReceiveMessage 呼び出しがブロックする最大秒数。

注記

ロングポーリングが無効になっている場合、この設定に効果はありません。

maxBatchOpenMs 200ms

送信呼び出しが、同じタイプのメッセージをバッチ処理する他の呼び出しを待機する最大ミリ秒。

設定を大きくすればするほど、同じ量の処理を実行するのに必要なバッチが少なくなります (ただし、バッチ内の最初の呼び出しは待機時間が長くなります)。

このパラメータを 0 に設定した場合、送信されたリクエストは他のリクエストを待機しないため、バッチ処理が事実上無効になります。

maxBatchSize バッチあたり 10 個のリクエスト

1 つのバッチリクエストでまとめてバッチ処理されるメッセージの最大数。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。

注記

バッチあたり 10 個のリクエストはAmazon SQSの最大許容値です。

maxBatchSizeBytes 256 KiB

クライアントがAmazon SQSに送信しようとするメッセージバッチの最大サイズ、バイト単位。

注記

256 KiB は Amazon SQS の最大許容値です。

maxDoneReceiveBatches 10 個のバッチ

AmazonSQSBufferedAsyncClient がプリフェッチし、クライアント側に保存する受信バッチの最大数。

設定を大きくすればするほど、Amazon SQSを呼び出さなくても多くの受信リクエストを満たすことができます (ただし、プリフェッチされるメッセージが多くなるほど、バッファにとどまる時間が長くなるため、それ自体の可視性タイムアウトが発生する可能性があります)。

注記

0 は、すべてのメッセージのプリフェッチが無効になっていて、メッセージはオンデマンドでのみ消費されることを示します。

maxInflightOutboundBatches 5 個のバッチ

同時に処理できるアクティブな送信バッチの最大数。

設定を大きくすればするほど、送信バッチの送信速度が速くなり (CPU や帯域幅などの他のクォータの影響を受けます)、AmazonSQSBufferedAsyncClient により消費されるスレッドが増えます。

maxInflightReceiveBatches 10 個のバッチ

同時に処理できるアクティブな受信バッチの最大数。

設定を大きくすればするほど、受信するメッセージが増え (CPU や帯域幅などの他のクォータの影響を受けます)、AmazonSQSBufferedAsyncClient により消費されるスレッドが増えます。

注記

0 は、すべてのメッセージのプリフェッチが無効になっていて、メッセージはオンデマンドでのみ消費されることを示します。

visibilityTimeoutSeconds -1

このパラメータが 0 以外の正の値に設定されている場合、ここで設定した可視性タイムアウトにより、メッセージの処理元のキューで設定された可視性タイムアウトが上書きされます。

注記

-1 は、キューのデフォルト設定が選択されていることを示します。

可視性タイムアウトを 0 に設定することはできません。

AWS SDK for Java 2.x

AWS SDK for Java 2.x では、次の例SqsAsyncBatchManagerに基づいて新しい を作成できます。

// Create the basic Sqs Async Client SqsAsyncClient sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build(); // Create the batch manager SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();

新規作成したらSqsAsyncBatchManager、これを使用してAmazon SQSに複数のリクエストを送信できます (SqsAsyncClient と同様です)。次に例を示します。

final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID(); final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build(); final String queueUrl = sqs.createQueue(request).join().queueUrl(); System.out.println("Queue created: " + queueUrl); // Send messages CompletableFuture<SendMessageResponse> sendMessageFuture; for (int i = 0; i < 10; i++) { final int index = i; sendMessageFuture = sqsAsyncBatchManager.sendMessage( r -> r.messageBody("Message " + index).queueUrl(queueUrl)); SendMessageResponse response= sendMessageFuture.join(); System.out.println("Message " + response.messageId() + " sent!"); } // Receive messages with customized configurations CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage( r -> r.queueUrl(queueUrl) .waitTimeSeconds(10) .visibilityTimeout(20) .maxNumberOfMessages(10) ); System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total."); // Delete messages DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder().queueUrl(queueUrl).build(); int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode(); System.out.println("Queue is deleted, with statusCode " + code);

SqsAsyncBatchManager の設定

SqsAsyncBatchManager は、ほとんどのユースケースに合うように事前に設定されています。たとえば、SqsAsyncBatchManager をさらに設定できます。'

を使用したカスタム設定の作成SqsAsyncBatchManager.Builder

SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() .client(sqs) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .maxBatchSize(10) .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
BatchOverrideConfiguration 個のパラメータ
パラメータ デフォルト値 説明
maxBatchSize

バッチあたり 10 個のリクエスト

1 つのバッチリクエストでまとめてバッチ処理されるメッセージの最大数。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。

注記

Amazon SQS の最大許容値は、バッチあたり 10 リクエストです。

sendRequestFrequency

200ms

送信呼び出しが、同じタイプのメッセージをバッチ処理する他の呼び出しを待機する最大ミリ秒。

設定を大きくすればするほど、同じ量の処理を実行するのに必要なバッチが少なくなります (ただし、バッチ内の最初の呼び出しは待機時間が長くなります)。

このパラメータを 0 に設定した場合、送信されたリクエストは他のリクエストを待機しないため、バッチ処理が事実上無効になります。

receiveMessageVisibilityTimeout

-1

このパラメータが 0 以外の正の値に設定されている場合、ここで設定した可視性タイムアウトにより、メッセージの処理元のキューで設定された可視性タイムアウトが上書きされます。

注記

1 は、キューのデフォルト設定が選択されていることを示します。可視性タイムアウトを 0 に設定することはできません。

receiveMessageMinWaitDuration

50 ミリ秒

使用可能なメッセージが取得されるまでreceiveMessage呼び出しが待機する最小時間 (ミリ秒単位)。設定が高いほど、同じ数のリクエストを実行するために必要なバッチが少なくなります。