Aumento del throughput utilizzando la scalabilità orizzontale e l'action batching con Amazon SQS - Amazon Simple Queue Service

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Aumento del throughput utilizzando la scalabilità orizzontale e l'action batching con Amazon SQS

Le code Amazon SQS possono fornire un throughput molto elevato. Per ulteriori informazioni sulle quote di velocità di trasmissione effettiva, vedere Quote di messaggi Amazon SQS.

Per ottenere un throughput elevato, devi dimensionare orizzontalmente i produttori e i consumatori dei messaggi (ovvero aggiungere ulteriori produttori e consumatori).

Dimensionamento orizzontale

Poiché accedi ad Amazon SQS attraverso un protocollo di richieste/risposte HTTP, la latenza della richiesta (l'intervallo di tempo compreso tra l'avvio di una richiesta e la ricezione di una risposta) limita il throughput che puoi ottenere da un singolo thread su un'unica connessione. Ad esempio, se la latenza da un client basato su Amazon EC2 ad Amazon SQS nella stessa regione è in media di 20 ms, la velocità di trasmissione effettiva massima da un singolo thread su un'unica connessione è in media di 50 TPS.

Per dimensionamento orizzontale si intende aumentare il numero di produttori di messaggi (che generano richieste SendMessage) e di consumatori di messaggi (che generano richieste ReceiveMessage e DeleteMessage) per aumentare il throughput complessivo della coda. È possibile dimensionare orizzontalmente in tre modi:

  • Aumentare il numero di thread per client

  • Aggiungi altri client

  • Aumentare il numero di thread per client e aggiungere altri client

Aggiungendo più client dovresti ottenere essenzialmente guadagni lineari nel throughput della coda. Ad esempio, se raddoppi il numero di clienti, puoi ottenere il doppio del throughput.

Nota

Quando si ridimensiona in orizzontale, è necessario assicurarsi che il client Amazon SQS disponga di connessioni o thread sufficienti per supportare il numero di produttori e consumatori di messaggi simultanei che inviano richieste e ricevono risposte. Ad esempio, per impostazione predefinita, le istanze della AWS SDK for Java AmazonSQSClient classe mantengono al massimo 50 connessioni ad Amazon SQS. Per creare produttori e consumatori simultanei aggiuntivi, devi modificare il numero massimo di thread produttore e consumatore consentiti su un oggetto AmazonSQSClientBuilder, ad esempio:

final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();

Per AmazonSQSAsyncClient, devi inoltre accertarti che sia disponibile un numero sufficiente di thread.

Questo esempio funziona solo per Java v. 1.x.

Raggruppare le operazioni

Il Raggruppamento esegue più lavoro durante ogni round trip al servizio (ad esempio, quando invii più messaggi con una singola richiesta SendMessageBatch). Le operazioni Amazon SQS in batch sono SendMessageBatch, DeleteMessageBatch e ChangeMessageVisibilityBatch. Per usufruire del batching senza modificare produttori e consumatori, utilizza Amazon SQS Buffered Asynchronous Client.

Nota

Poiché ReceiveMessage può elaborare 10 messaggi alla volta, non c'è nessuna azione ReceiveMessageBatch.

Il raggruppamento distribuisce la latenza dell'operazione in batch su più messaggi in una richiesta batch, invece di accettare l'intera latenza per un solo messaggio (per esempio, una richiesta SendMessage). Poiché ogni round trip trasporta più lavoro, le richieste in batch fanno un utilizzo più efficiente di thread e connessioni , migliorano così il throughput.

Puoi abbinare il batching al dimensionamento orizzontale per offrire un throughput con un numero minore di thread, connessioni e richieste rispetto a quelli necessari per richieste di messaggi individuali. Puoi utilizzare le operazioni in batch di Amazon SQS per inviare, ricevere o eliminare fino a 10 messaggi alla volta. Poiché Amazon SQS addebita un costo per richiesta, il raggruppamento è in grado di ridurre in modo significativo i costi.

Il raggruppamento può introdurre alcune complessità per la tua applicazione (ad esempio, l'applicazione deve accumulare i messaggi prima di inviarli oppure talvolta è necessario attendere di più per una risposta). Tuttavia, il raggruppamento può essere ancora efficace nei seguenti casi:

  • La tua applicazione genera una notevole quantità di messaggi in un breve periodo di tempo, pertanto il ritardo non è mai molto lungo.

  • Il consumatore di un messaggio recupera i messaggi da una coda a sua discrezione, a differenza dei tipici produttori di messaggi che devono inviare messaggi in risposta a eventi che non controllano.

Importante

Una richiesta di batch potrebbe avere esito positivo anche se i singoli messaggi nel batch hanno avuto esito negativo. Dopo una richiesta di batch, verifica sempre la presenza di errori di messaggio individuali e, se necessario, prova nuovamente l'operazione.

Esempio di utilizzo di Java Working per richieste con operazioni singole e in batch

Prerequisiti

Aggiungi i pacchetti aws-java-sdk-sqs.jar, aws-java-sdk-ec2.jare commons-logging.jar al percorso di classe build Java. L'esempio seguente mostra queste dipendenze in un file pom.xml di progetto Maven.

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST</version> </dependency> </dependencies>

SimpleProducerConsumer.java

Il seguenti esempio di codice Java implementa un semplice modello produttore-consumatore. Il thread principale genera dinamicamente diversi thread produttore e consumatore che elaborano messaggi di 1 KB per un periodo di tempo specificato. Questo esempio include produttori e consumatori che effettuano richieste con operazioni singole e altri che effettuano richieste in batch.

/* * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

Monitoraggio delle metriche di volume dall'esecuzione di esempio

Amazon SQS genera automaticamente i parametri di volume per i messaggi inviati, ricevuti ed eliminati. Puoi accedere a queste metriche e ad altre tramite la scheda Monitoraggio della coda o sulla CloudWatch console.

Nota

Le metriche possono diventare disponibili fino a 15 minuti dopo l'avvio della coda.