透過 Amazon SQS 使用水平擴展和動作批次處理來增加輸送量 - Amazon Simple Queue Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

透過 Amazon SQS 使用水平擴展和動作批次處理來增加輸送量

Amazon SQS 佇列可提供極高的輸送量。如需輸送量配額的詳細資訊,請參閱 Amazon SQS 訊息配額

若要達到高傳輸量,必須水平擴展訊息生產者和消費者 (增加更多生產者和消費者)。

水平擴展

由於您是透過 HTTP 請求-回應協定來存取 Amazon SQS,請求延遲 (自開始請求至收到回應的時間間隔) 會限制您透過單一連線使用單一執行緒所能達到的輸送量。例如,若 Amazon EC2 型的用戶端到 Amazon SQS 的延遲度在相同區域的平均值為 20 毫秒,透過單一連線使用單一執行緒可達到的最大輸送量平均為 50 TPS。

水平擴展意指增加訊息生產者 (提出 SendMessage 請求) 及消費者 (提出 ReceiveMessageDeleteMessage 請求) 的數目,藉以提高佇列整體的傳輸量。水平擴展的方式有三種:

  • 增加每個用戶端的執行緒數目

  • 增加更多用戶端

  • 增加每個用戶端的執行緒數目並增加更多用戶端

增加更多用戶端之後,基本上佇列傳輸量會獲得線性增益。例如,若用戶端數目加倍,傳輸量也會翻倍。

注意

進行水平擴展時,您必須確定您的 Amazon SQS 用戶端有足夠的連線和執行緒,能支援傳送請求和接收回應的並存訊息生產者與消費者的數目。例如,依預設, AWS SDK for Java AmazonSQSClient類別的執行個體最多可維持 50 個與 Amazon SQS 的連線。若要建立更多的並存生產者和消費者,必須調整 AmazonSQSClientBuilder 物件所允許的生產者和消費者執行緒數目上限,例如:

final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();

對於 AmazonSQSAsyncClient,您還必須確保有足夠的執行緒可用。

此示例僅適用於 Java 版本 1.x。

動作批次處理

批次處理可在每次往返服務期間處理更多工作 (例如,透過單次 SendMessageBatch 請求傳送多則訊息)。Amazon SQS 批次動作包括 SendMessageBatchDeleteMessageBatchChangeMessageVisibilityBatch。若要在不變動生產者或消費者的情況下利用批次處理功能,您可以使用 Amazon SQS 緩衝非同步用戶端

注意

由於 ReceiveMessage 一次可處理 10 則訊息,也就不必有 ReceiveMessageBatch 動作。

批次處理會將批次動作的延遲度分散到批次請求中的多則訊息,而非接受單則訊息 (例如 SendMessage 請求) 的所有延遲。由於每次往返所執行的工作較多,批次請求可以有效率地運用執行緒和連線,進而提高傳輸量。

您可以結合批次處理與水平擴展來提供傳輸量,所需的執行緒、連線和請求的數目會比單獨的訊息請求所需的量更少。您可以使用批次的 Amazon SQS 動作一次傳送、接收、刪除多達 10 則訊息。由於 Amazon SQS 收費是以請求數為準,批次處理可大幅降低您的成本。

批次處理可能會給您的應用程式帶來一些複雜性 (例如,您的應用程式必須將訊息累積起來後再傳送出去,或者有時候必須花更久的時間等待回應)。但是,批次處理在以下情況仍然頗具效益:

  • 應用程式會在短時間內產生許多訊息,所以絕不會延遲太久。

  • 訊息消費者自行從佇列擷取訊息,有別於典型的訊息生產者需要傳送訊息來回應其無法控制的事件。

重要

即使批次中的個別訊息失敗,批次請求也可能會成功。提出批次請求之後,務必確認個別訊息是否出現失敗情況,必要時重試動作。

單次操作和批次請求實際可行的 Java 範例

必要條件

新增 aws-java-sdk-sqs.jaraws-java-sdk-ec2.jar,以及 commons-logging.jar 套件到您的 Java 建置類別路徑。以下範例顯示 Maven 專案 pom.xml 檔案中的此等依存關係。

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST</version> </dependency> </dependencies>

SimpleProducerConsumer. 爪哇

以下 Java 程式碼範例實作單純的生產者-消費者模式。主要的執行緒會產生數個生產者和消費者執行緒,負責在指定時間處理 1 KB 的訊息。此範例包括提出單次操作請求的生產者和消費者,以及提出批次請求的生產者和消費者。

/* * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

監控範本執行階段的數量指標

Amazon SQS 會自動為傳送、接收及刪除的訊息產生數量指標。您可以透過佇列的 [監視] 索引標籤或CloudWatch 主控台存取這些指標和其他指標。

注意

可能要在佇列開始執行達 15 分鐘後方可取得指標。