Aumentar el rendimiento mediante el escalado horizontal y el procesamiento por lotes de acciones con Amazon SQS - Amazon Simple Queue Service

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Aumentar el rendimiento mediante el escalado horizontal y el procesamiento por lotes de acciones con Amazon SQS

Las colas de Amazon SQS pueden ofrecer un rendimiento muy elevado. Para obtener información sobre las cuotas de rendimiento, consulte Cuotas de SQS mensajes de Amazon.

Para conseguir un desempeño elevado, debe escalar horizontalmente los productores y los consumidores de mensajes (añadir más productores y consumidores).

Escalado horizontal

Dado que se accede a Amazon SQS a través de un protocolo HTTP de solicitud-respuesta, la latencia de solicitudes (el intervalo de tiempo que transcurre entre que se inicia una solicitud y se recibe una respuesta) limita el rendimiento que se puede alcanzar con un solo subproceso a través de una única conexión. Por ejemplo, si la latencia desde un cliente basado en Amazon EC2 hasta Amazon SQS en la misma región es de una media de 20 ms, el rendimiento máximo desde un único subproceso a través de una única conexión será de una media de 50 transacciones por segundo.

El escalado horizontal supone aumentar el número de productores de mensajes (que realizan solicitudes SendMessage) y consumidores de mensajes (que realizan solicitudes ReceiveMessage y DeleteMessage) para aumentar el desempeño general de la cola. Existen tres métodos para realizar el escalado horizontal:

  • Aumentar el número de subprocesos por cliente

  • Añadir más clientes

  • Aumentar el número de subprocesos por cliente y añadir más clientes

Cuando se añaden más clientes, se consiguen ganancias prácticamente lineales en el desempeño de la cola. Por ejemplo, si se duplica el número de clientes, también se obtiene el doble de desempeño.

nota

A medida que realice el escalado horizontal, debe asegurarse de que el cliente de Amazon SQS tiene suficientes conexiones o subprocesos para admitir el número de productores de mensajes simultáneos y consumidores que envían solicitudes y reciben respuestas. Por ejemplo, de forma predeterminada, las instancias de la AWS SDK for Java AmazonSQSClient clase mantienen como máximo 50 conexiones a Amazon SQS. Para crear productores y consumidores adicionales simultáneos, debe ajustar el número máximo de subprocesos de productores y consumidores admisibles en un objeto AmazonSQSClientBuilder; por ejemplo:

final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();

Para AmazonSQSAsyncClient, también debe asegurarse de que haya suficientes subprocesos disponibles.

Este ejemplo solo funciona para la versión 1.x de Java.

Agrupación en lotes de acciones

La agrupación por lotes realiza más trabajo durante cada ciclo de ida y vuelta al servicio (por ejemplo, al enviar varios mensajes en una única solicitud SendMessageBatch). Las acciones de procesamiento por lotes de Amazon SQS son SendMessageBatch, DeleteMessageBatch y ChangeMessageVisibilityBatch. Para aprovechar el procesamiento por lotes sin modificar los productores ni los consumidores, puede utilizar el Cliente asincrónico en búfer de Amazon SQS.

nota

Debido a que ReceiveMessage puede procesar 10 mensajes a la vez, no hay ninguna acción ReceiveMessageBatch.

La agrupación en lotes distribuye la latencia de la acción por lotes entre varios mensajes en una solicitud por lotes, en lugar de aceptar toda la latencia para un único mensaje (por ejemplo, una solicitud SendMessage). Como en cada ciclo de ida y vuelta se realiza más trabajo, las solicitudes por lotes hacen un uso más eficaz de los subprocesos y las conexiones, por lo que se mejora el desempeño.

Puede combinar la agrupación en lotes con el escalado horizontal para proporcionar un desempeño con menos subprocesos, conexiones y solicitudes de los que serían necesarios en el caso de utilizar solicitudes de mensajes individuales. Puede utilizar acciones de Amazon SQS por lotes para enviar, recibir o eliminar hasta diez mensajes a la vez. Dado que el uso de Amazon SQS se factura por solicitudes, el procesamiento por lotes puede reducir significativamente los costos.

La agrupación en lotes puede introducir cierta complejidad en una aplicación (por ejemplo, la aplicación debe acumular los mensajes antes de enviarlos, o a veces debe esperar más para recibir una respuesta). Sin embargo, la agrupación en lotes puede resultar eficaz en los casos siguientes:

  • Cuando la aplicación genera muchos mensajes en poco tiempo, por lo que el retraso nunca es muy largo.

  • Cuando un consumidor de mensajes busca mensajes en una cola a discreción, a diferencia de los productores de mensajes típicos que tienen que enviar mensajes como respuesta a eventos que no controlan.

importante

Una solicitud por lotes puede realizarse correctamente aunque se hayan producido errores en mensajes individuales del lote. Después de una solicitud por lotes, compruebe siempre si hay errores en mensajes individuales y vuelva a intentar la acción si es necesario.

Ejemplo funcional en Java de solicitudes de una sola operación y por lotes

Requisitos previos

Añada los paquetes aws-java-sdk-sqs.jar, aws-java-sdk-ec2.jar y commons-logging.jar a la ruta de clases de compilación Java. Los siguientes ejemplos muestran estas dependencias en el archivo pom.xml de un proyecto Maven.

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST</version> </dependency> </dependencies>

SimpleProducerConsumer.java

En el siguiente ejemplo de código Java se implementa un patrón productor-consumidor sencillo. El subproceso principal genera una serie de subprocesos productores y consumidores que procesan mensajes de 1 KB durante el tiempo especificado. Este ejemplo incluye productores y consumidores que realizan solicitudes de una única operación y otros que realizan solicitudes por lotes.

/* * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

Monitoreo de las métricas de volumen de la ejecución del ejemplo

Amazon SQS genera automáticamente métricas de volumen para los mensajes enviados, recibidos y eliminados. Puedes acceder a esas métricas y a otras a través de la pestaña Supervisión de tu cola o en la CloudWatch consola.

nota

Las métricas pueden tardar en estar disponibles hasta 15 minutos después del inicio de la cola.