Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Augmenter le débit grâce à la mise à l'échelle horizontale et au traitement par lots d'actions avec Amazon SQS
Les files d'attente Amazon SQS peuvent fournir un débit très élevé. Pour plus d'informations sur les quotas de débit, consultez Quotas de SQS messages Amazon.
Pour atteindre un débit élevé, vous devez effectuer une mise à l'échelle horizontale des producteurs et consommateurs de messages (ajouter des producteurs et des consommateurs supplémentaires).
Rubriques
Mise à l'échelle horizontale
Vous accédez à Amazon SQS via un protocole de demande-réponse HTTP. Par conséquent, la latence de la demande (l'intervalle de temps entre la création d'une requête et la réception de la réponse) limite le débit que vous pouvez obtenir d'un seul thread avec une connexion unique. Par exemple, si la latence d'un client basé sur Amazon EC2 envoyant des demandes vers Amazon SQS dans la même région avoisine les 20 ms, le débit maximal d'un thread unique sur une seule connexion est en moyenne de 50 TPS.
La mise à l'échelle horizontale consiste à augmenter le nombre de producteurs de messages (émettant des requêtes SendMessage
) et de consommateurs de messages (émettant des requêtes ReceiveMessage
et DeleteMessage
) afin d'augmenter le débit global de votre file d'attente. Vous pouvez mettre à l'échelle horizontalement de trois manières :
-
Augmenter le nombre de threads par client
-
Ajouter des clients
-
Augmentez le nombre de threads par client et ajouter d'autres clients
Lorsque vous ajoutez des clients, vous créez essentiellement un gain de débit linéaire pour votre file d'attente. Ainsi, si vous doublez le nombre de clients, vous obtiendrez deux fois plus de débit.
Note
Lorsque vous procédez à une mise à l'échelle horizontale, assurez-vous que votre client Amazon SQS dispose de suffisamment de connexions ou de threads pour prendre en charge le nombre de producteurs et de consommateurs de messages qui envoient des requêtes et reçoivent des réponses simultanément. Par exemple, par défaut, les instances de la AWS SDK for Java AmazonSQSClient
classe conservent au maximum 50 connexions à Amazon SQS. Pour créer des producteurs et consommateurs simultanés supplémentaires, vous devez modifier le nombre maximum de threads de producteurs et de consommateurs autorisés sur un objet AmazonSQSClientBuilder
, par exemple :
final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();
Pour AmazonSQSAsyncClient
, vous devez également veiller à ce que le nombre de threads disponibles soit suffisant.
Cet exemple ne fonctionne que pour Java v. 1.x.
Traitement par lots des actions
Le traitement par lots effectue davantage de travail au cours de chaque aller-retour vers le service (par exemple, lorsque vous envoyez plusieurs messages avec une seule requête SendMessageBatch
). Les actions par lots Amazon SQS sont SendMessageBatch
, DeleteMessageBatch
et ChangeMessageVisibilityBatch
. Pour profiter du traitement par lots sans modifier vos producteurs ou consommateurs, vous pouvez utiliser le Client asynchrone en mémoire tampon Amazon SQS.
Note
Dans la mesure où ReceiveMessage
peut traiter 10 messages simultanément, il n'y a pas d'action ReceiveMessageBatch
.
Le traitement par lots répartit la latence de l'action sur plusieurs messages dans une demande par lots, au lieu d'accepter la totalité de la latence pour un seul message (par exemple, une demande SendMessage
). Dans la mesure où chaque échange avec le service inclut davantage de tâches à traiter, les requêtes par lots assurent une utilisation plus efficace des threads et connexions, ce qui améliore le débit.
Vous pouvez combiner la mise à l'échelle horizontale et le traitement par lots afin de proposer un débit avec moins de threads, de connexions et de requêtes que pour les requêtes de message individuelles. Vous pouvez utiliser des actions Amazon SQS par lots pour envoyer, recevoir ou supprimer jusqu'à 10 messages à la fois. Dans la mesure où Amazon SQS facture par demande, le traitement par lots peut vous aider à réduire considérablement vos coûts.
Le traitement par lots peut ajouter un peu de complexité pour votre application (par exemple, votre application doit accumuler les messages avant de les envoyer, ou doit parfois attendre plus longtemps pour une réponse). Il reste toutefois efficace dans les cas suivants :
-
Votre application génère de nombreux messages en peu de temps, si bien que le délai d'attente n'est jamais très long.
-
Un consommateur de messages récupère les messages dans une file d'attente à sa discrétion, contrairement à un système où des producteurs de messages classiques ont besoin d'envoyer des messages en réponse à des événements qu'ils ne contrôlent pas.
Important
Une requête de traitement par lots peut aboutir même si des messages individuels inclus dans cette requête n'ont pas pu être traités. Après une requête de traitement par lot, recherchez toujours d'éventuelles erreurs concernant des messages individuels et, le cas échéant, relancez l'action.
Exemple d'utilisation de Java pour les requêtes en une seule opération et par lots
Prérequis
Ajoutez les packages aws-java-sdk-sqs.jar
, aws-java-sdk-ec2.jar
et commons-logging.jar
au chemin de classe de votre version Java. L'exemple suivant illustre ces dépendances dans un fichier pom.xml
de projet Maven.
<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>
LATEST
</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST
</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST
</version> </dependency> </dependencies>
SimpleProducerConsumer.java
L'exemple de code Java suivant implémente un schéma simple consommateur-producteur. Le thread principal génère un certain nombre de threads producteurs et consommateurs qui traitent des messages de 1 Ko pendant une durée spécifiée. Cet exemple inclut des producteurs et des consommateurs qui émettent des requêtes d'opérations simples, et d'autres qui créent des requêtes de traitement par lot.
/* * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }
Surveillance des métriques de volume pour l'exemple exécuté
Amazon SQS génère automatiquement des métriques de volume pour les messages envoyés, reçus et supprimés. Vous pouvez accéder à ces statistiques et à d'autres via l'onglet Surveillance de votre file d'attente ou sur la CloudWatch console
Note
Après le démarrage de la file d'attente, il faut patienter jusqu'à 15 minutes pour que ces métriques soient disponibles.