수평적 조정과 작업 일괄 처리를 사용하여 처리량 증대 - Amazon Simple Queue Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

수평적 조정과 작업 일괄 처리를 사용하여 처리량 증대

Amazon SQS 대기열은 매우 높은 처리량을 제공할 수 있습니다. 처리량 할당량에 대한 자세한 내용은 메시지 관련 할당량 섹션을 참조하세요.

높은 처리량을 달성하려면 메시지 생성자와 소비자를 수평적으로 조정해야 합니다(더 많은 생성자와 소비자 추가).

수평적 조정

HTTP 요청-응답 프로토콜을 통해 Amazon SQS에 액세스하기 때문에 요청 지연 시간(요청을 시작해서 응답을 받기까지의 시간 간격)은 단일 연결을 사용하여 단일 스레드에서 확보할 수 있는 처리량을 제한합니다. 예를 들어, 동일한 리전에서 Amazon EC2 기반 클라이언트부터 Amazon SQS까지 지연 시간이 평균 20ms인 경우 단일 연결을 통한 단일 스레드의 최대 처리량은 평균 50TPS입니다.

수평적 조정은 메시지 생성자(SendMessage 요청 전송)와 소비자(ReceiveMessageDeleteMessage 요청 전송) 수를 늘리는 것으로 전체 대기열 처리량을 높여줍니다. 다음 세 가지 방법으로 수평 확장할 수 있습니다.

  • 클라이언트당 스레드 수 늘리기

  • 클라이언트 추가

  • 클라이언트당 스레드 수를 늘리고 클라이언트 추가

클라이언트를 더 추가하면 대기열 처리량이 선형적으로 증가합니다. 예를 들어 클라이언트 개수를 2배 늘리면 처리량도 2배가 됩니다.

참고

수평적으로 확장할 경우 요청을 전송하고 응답을 수신하는 동시 메시지 생산자와 소비자의 수를 지원할 수 있도록 Amazon SQS 클라이언트의 연결이나 스레드가 충분한지 확인해야 합니다. 예를 들어, 기본적으로 AWS SDK for Java AmazonSQSClient 클래스의 인스턴스는 Amazon SQS와의 연결을 최대 50개 유지합니다. 동시 생성자와 소비자를 추가로 만들려면 AmazonSQSClientBuilder 객체에 있는 허용되는 생성자 및 소비자 스레드의 최대 개수를 조정해야 합니다. 예를 들면 다음과 같습니다.

final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();

AmazonSQSAsyncClient의 경우 충분한 스레드를 사용할 수 있는지도 확인해야 합니다.

이 예제는 Java v. 1.x에서만 작동합니다.

작업 일괄 처리

일괄 처리는 서비스까지 왕복할 때마다 더 많은 작업을 수행합니다(예: SendMessageBatch 요청 하나로 여러 메시지를 전송하는 경우). Amazon SQS 배치 작업은 SendMessageBatch, DeleteMessageBatchChangeMessageVisibilityBatch입니다. 생산자 또는 소비자를 변경하지 않고 배치 처리를 활용하려면 Amazon SQS의 버퍼링된 비동기식 클라이언트를 사용하면 됩니다.

참고

ReceiveMessage는 한 번에 메시지를 10개 처리할 수 있으므로 ReceiveMessageBatch 작업이 없습니다.

일괄 처리는 메시지 하나의 전체 지연 시간을 수락하는 것이 아니라, 배치 작업의 지연 시간을 배치 요청의 여러 메시지로 분산시키는 것입니다(예: SendMessage 요청). 각 왕복마다 더 많은 작업을 수행하기 때문에 배치 요청은 스레드와 연결을 보다 효율적으로 사용하므로 처리량이 개선됩니다.

일괄 처리와 수평적 조정을 함께 사용하면 개별 메시지 요청보다 스레드, 연결 및 요청 수가 적은 처리량을 얻습니다. 배치 처리된 Amazon SQS 작업을 사용하여 한 번에 최대 10개 메시지를 전송, 수신 또는 삭제할 수 있습니다. Amazon SQS는 요청별로 요금을 부과하기 때문에 배치 처리 시 비용을 대폭 절감할 수 있습니다.

일괄 처리를 사용하면 애플리케이션에 어느 정도 복잡성이 수반됩니다(예를 들어 애플리케이션이 메시지를 누적한 후 메시지를 전송하고 때로는 응답을 받기까지 대기 시간이 길어짐). 그러나 다음과 같은 상황에서는 일괄 처리가 효과적일 수 있습니다.

  • 애플리케이션이 단시간에 많은 메시지를 생성하지만 지연 시간이 매우 길지 않은 경우.

  • 제어하지 못하는 이벤트에 대응하여 메시지를 전송해야 하는 일반적인 메시지 생성자와 달리 메시지 소비자가 단독 재량으로 대기열에서 메시지를 가져오는 경우.

중요

배치의 개별 메시지에 오류가 발생하더라도 배치 요청이 성공할 수 있습니다. 배치 요청 후에는 항상 개별 메시지의 오류 여부를 확인하고 필요에 따라 작업을 다시 시도해야 합니다.

단일 작업 및 배치 요청에 대한 Java 사용 예제

필수 조건

aws-java-sdk-sqs.jar, aws-java-sdk-ec2.jar, commons-logging.jar 패키지를 Java 빌드 클래스 경로에 추가합니다. 다음 예제는 Maven 프로젝트의 pom.xml 파일 내에 존재하는 이러한 종속성을 보여줍니다.

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST</version> </dependency> </dependencies>

SimpleProducerConsumer.java

다음 Java 코드 예제는 단순한 생성자-소비자 패턴을 구현한 것입니다. 기본 스레드는 지정된 시간 동안 1KB 메시지를 처리하는 생성자와 소비자 스레드를 다수 생성합니다. 이 예제에는 단일 작업을 요청하는 생성자와 소비자뿐 아니라 배치 요청을 하는 생성자와 소비자가 포함되어 있습니다.

/* * Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

예제 실행에서 볼륨 측정치 모니터링

Amazon SQS는 전송, 수신 및 삭제된 메시지에 대한 볼륨 지표를 자동으로 생성합니다. 대기열의 모니터링 탭이나 CloudWatch 콘솔을 통해 이러한 지표 및 기타 지표에 액세스할 수 있습니다.

참고

이 측정치의 경우 대기열을 사용할 수 있을 때까지 최대 15분이 걸릴 수 있습니다.