Amazon SQS examples using SDK for Java 2.x - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

Amazon SQS examples using SDK for Java 2.x

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Java 2.x with Amazon SQS.

Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.

Scenarios are code examples that show you how to accomplish specific tasks by calling multiple functions within a service or combined with other AWS services.

Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.

Get started

The following code examples show how to get started using Amazon SQS.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.services.sqs.paginators.ListQueuesIterable; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class HelloSQS { public static void main(String[] args) { SqsClient sqsClient = SqsClient.builder() .region(Region.US_WEST_2) .build(); listQueues(sqsClient); sqsClient.close(); } public static void listQueues(SqsClient sqsClient) { try { ListQueuesIterable listQueues = sqsClient.listQueuesPaginator(); listQueues.stream() .flatMap(r -> r.queueUrls().stream()) .forEach(content -> System.out.println(" Queue URL: " + content.toLowerCase())); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
  • For API details, see ListQueues in AWS SDK for Java 2.x API Reference.

Actions

The following code example shows how to use CreateQueue.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.ListQueuesRequest; import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.List; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class SQSExample { public static void main(String[] args) { String queueName = "queue" + System.currentTimeMillis(); SqsClient sqsClient = SqsClient.builder() .region(Region.US_WEST_2) .build(); // Perform various tasks on the Amazon SQS queue. String queueUrl = createQueue(sqsClient, queueName); listQueues(sqsClient); listQueuesFilter(sqsClient, queueUrl); List<Message> messages = receiveMessages(sqsClient, queueUrl); sendBatchMessages(sqsClient, queueUrl); changeMessages(sqsClient, queueUrl, messages); deleteMessages(sqsClient, queueUrl, messages); sqsClient.close(); } public static String createQueue(SqsClient sqsClient, String queueName) { try { System.out.println("\nCreate Queue"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void listQueues(SqsClient sqsClient) { System.out.println("\nList Queues"); String prefix = "que"; try { ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(prefix).build(); ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest); for (String url : listQueuesResponse.queueUrls()) { System.out.println(url); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void listQueuesFilter(SqsClient sqsClient, String queueUrl) { // List queues with filters String namePrefix = "queue"; ListQueuesRequest filterListRequest = ListQueuesRequest.builder() .queueNamePrefix(namePrefix) .build(); ListQueuesResponse listQueuesFilteredResponse = sqsClient.listQueues(filterListRequest); System.out.println("Queue URLs with prefix: " + namePrefix); for (String url : listQueuesFilteredResponse.queueUrls()) { System.out.println(url); } System.out.println("\nSend message"); try { sqsClient.sendMessage(SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody("Hello world!") .delaySeconds(10) .build()); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void sendBatchMessages(SqsClient sqsClient, String queueUrl) { System.out.println("\nSend multiple messages"); try { SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(SendMessageBatchRequestEntry.builder().id("id1").messageBody("Hello from msg 1").build(), SendMessageBatchRequestEntry.builder().id("id2").messageBody("msg 2").delaySeconds(10) .build()) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static List<Message> receiveMessages(SqsClient sqsClient, String queueUrl) { System.out.println("\nReceive messages"); try { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null; } public static void changeMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { System.out.println("\nChange Message Visibility"); try { for (Message message : messages) { ChangeMessageVisibilityRequest req = ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .visibilityTimeout(100) .build(); sqsClient.changeMessageVisibility(req); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { System.out.println("\nDelete Messages"); try { for (Message message : messages) { DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build(); sqsClient.deleteMessage(deleteMessageRequest); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
  • For API details, see CreateQueue in AWS SDK for Java 2.x API Reference.

The following code example shows how to use DeleteMessage.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

try { for (Message message : messages) { DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build(); sqsClient.deleteMessage(deleteMessageRequest); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); }
  • For API details, see DeleteMessage in AWS SDK for Java 2.x API Reference.

The following code example shows how to use DeleteQueue.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.SqsException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class DeleteQueue { public static void main(String[] args) { final String usage = """ Usage: <queueName> Where: queueName - The name of the Amazon SQS queue to delete. """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String queueName = args[0]; SqsClient sqs = SqsClient.builder() .region(Region.US_WEST_2) .build(); deleteSQSQueue(sqs, queueName); sqs.close(); } public static void deleteSQSQueue(SqsClient sqsClient, String queueName) { try { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
  • For API details, see DeleteQueue in AWS SDK for Java 2.x API Reference.

The following code example shows how to use GetQueueUrl.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl();
  • For API details, see GetQueueUrl in AWS SDK for Java 2.x API Reference.

The following code example shows how to use ListQueues.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

String prefix = "que"; try { ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(prefix).build(); ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest); for (String url : listQueuesResponse.queueUrls()) { System.out.println(url); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); }
  • For API details, see ListQueues in AWS SDK for Java 2.x API Reference.

The following code example shows how to use ReceiveMessage.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

try { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null;
  • For API details, see ReceiveMessage in AWS SDK for Java 2.x API Reference.

The following code example shows how to use SendMessage.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Two examples of the SendMessage operation follow:

  • Send a message with a body and a delay

  • Send a message with a body and message attributes

Send a message with a body and a delay.

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SqsException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class SendMessages { public static void main(String[] args) { final String usage = """ Usage: <queueName> <message> Where: queueName - The name of the queue. message - The message to send. """; if (args.length != 2) { System.out.println(usage); System.exit(1); } String queueName = args[0]; String message = args[1]; SqsClient sqsClient = SqsClient.builder() .region(Region.US_WEST_2) .build(); sendMessage(sqsClient, queueName, message); sqsClient.close(); } public static void sendMessage(SqsClient sqsClient, String queueName, String message) { try { CreateQueueRequest request = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(request); GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); SendMessageRequest sendMsgRequest = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(message) .delaySeconds(5) .build(); sqsClient.sendMessage(sendMsgRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }

Send a message with a body and message attributes.

/** * <p>This method demonstrates how to add message attributes to a message. * Each attribute must specify a name, value, and data type. You use a Java Map to supply the attributes. The map's * key is the attribute name, and you specify the map's entry value using a builder that includes the attribute * value and data type.</p> * * <p>The data type must start with one of "String", "Number" or "Binary". You can optionally * define a custom extension by using a "." and your extension.</p> * * <p>The SQS Developer Guide provides more information on @see <a * href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes">message * attributes</a>.</p> * * @param thumbailPath Filesystem path of the image. * @param queueUrl URL of the SQS queue. */ static void sendMessageWithAttributes(Path thumbailPath, String queueUrl) { Map<String, MessageAttributeValue> messageAttributeMap; try { messageAttributeMap = Map.of( "Name", MessageAttributeValue.builder() .stringValue("Jane Doe") .dataType("String").build(), "Age", MessageAttributeValue.builder() .stringValue("42") .dataType("Number.int").build(), "Image", MessageAttributeValue.builder() .binaryValue(SdkBytes.fromByteArray(Files.readAllBytes(thumbailPath))) .dataType("Binary.jpg").build() ); } catch (IOException e) { LOGGER.error("An I/O exception occurred reading thumbnail image: {}", e.getMessage(), e); throw new RuntimeException(e); } SendMessageRequest request = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody("Hello SQS") .messageAttributes(messageAttributeMap) .build(); try { SendMessageResponse sendMessageResponse = SQS_CLIENT.sendMessage(request); LOGGER.info("Message ID: {}", sendMessageResponse.messageId()); } catch (SqsException e) { LOGGER.error("Exception occurred sending message: {}", e.getMessage(), e); throw new RuntimeException(e); } }
  • For API details, see SendMessage in AWS SDK for Java 2.x API Reference.

The following code example shows how to use SendMessageBatch.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(SendMessageBatchRequestEntry.builder().id("id1").messageBody("Hello from msg 1").build(), SendMessageBatchRequestEntry.builder().id("id2").messageBody("msg 2").delaySeconds(10) .build()) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest);

The following code example shows how to use SetQueueAttributes.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Configure an Amazon SQS to use server-side encryption (SSE) using a custom KMS key.

public static void addEncryption(String queueName, String kmsMasterKeyAlias) { SqsClient sqsClient = SqsClient.create(); GetQueueUrlRequest urlRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); GetQueueUrlResponse getQueueUrlResponse; try { getQueueUrlResponse = sqsClient.getQueueUrl(urlRequest); } catch (QueueDoesNotExistException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } String queueUrl = getQueueUrlResponse.queueUrl(); Map<QueueAttributeName, String> attributes = Map.of( QueueAttributeName.KMS_MASTER_KEY_ID, kmsMasterKeyAlias, QueueAttributeName.KMS_DATA_KEY_REUSE_PERIOD_SECONDS, "140" // Set the data key reuse period to 140 seconds. ); // This is how long SQS can reuse the data key before requesting a new one from KMS. SetQueueAttributesRequest attRequest = SetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributes(attributes) .build(); try { sqsClient.setQueueAttributes(attRequest); LOGGER.info("The attributes have been applied to {}", queueName); } catch (InvalidAttributeNameException | InvalidAttributeValueException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } finally { sqsClient.close(); } }

Scenarios

The following code example shows how to create a messaging application by using Amazon SQS.

SDK for Java 2.x

Shows how to use the Amazon SQS API to develop a Spring REST API that sends and retrieves messages.

For complete source code and instructions on how to set up and run, see the full example on GitHub.

Services used in this example
  • Amazon Comprehend

  • Amazon SQS

The following code example shows how to create and publish to a FIFO Amazon SNS topic.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

This example

  • creates an Amazon SNS FIFO topic, two Amazon SQS FIFO queues, and one Standard queue.

  • subscribes the queues to the topic and publishes a message to the topic.

The test verifies the receipt of the message to each queue. The complete example also shows the addition of access policies and deletes the resources at the end.

public class PriceUpdateExample { public final static SnsClient snsClient = SnsClient.create(); public final static SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { final String usage = "\n" + "Usage: " + " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" + "Where:\n" + " fifoTopicName - The name of the FIFO topic that you want to create. \n\n" + " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n" + " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" + " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n"; if (args.length != 4) { System.out.println(usage); System.exit(1); } final String fifoTopicName = args[0]; final String wholeSaleQueueName = args[1]; final String retailQueueName = args[2]; final String analyticsQueueName = args[3]; // For convenience, the QueueData class holds metadata about a queue: ARN, URL, // name and type. List<QueueData> queues = List.of( new QueueData(wholeSaleQueueName, QueueType.FIFO), new QueueData(retailQueueName, QueueType.FIFO), new QueueData(analyticsQueueName, QueueType.Standard)); // Create queues. createQueues(queues); // Create a topic. String topicARN = createFIFOTopic(fifoTopicName); // Subscribe each queue to the topic. subscribeQueues(queues, topicARN); // Allow the newly created topic to send messages to the queues. addAccessPolicyToQueuesFINAL(queues, topicARN); // Publish a sample price update message with payload. publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables"); // Clean up resources. deleteSubscriptions(queues); deleteQueues(queues); deleteTopic(topicARN); } public static String createFIFOTopic(String topicName) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = Map.of( "FifoTopic", "true", "ContentBasedDeduplication", "false", "FifoThroughputScope", "MessageGroup"); CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); String topicArn = response.topicArn(); System.out.println("The topic ARN is" + topicArn); return topicArn; } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void subscribeQueues(List<QueueData> queues, String topicARN) { queues.forEach(queue -> { SubscribeRequest subscribeRequest = SubscribeRequest.builder() .topicArn(topicARN) .endpoint(queue.queueARN) .protocol("sqs") .build(); // Subscribe to the endpoint by using the SNS service client. // Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO // topic. SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest); System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]"); queue.subscriptionARN = subscribeResponse.subscriptionArn(); }); } public static void publishPriceUpdate(String topicArn, String payload, String groupId) { try { // Create and publish a message that updates the wholesale price. String subject = "Price Update"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; MessageAttributeValue msgAttValue = MessageAttributeValue.builder() .dataType("String") .stringValue(attributeValue) .build(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put(attributeName, msgAttValue); PublishRequest pubRequest = PublishRequest.builder() .topicArn(topicArn) .subject(subject) .message(payload) .messageGroupId(groupId) .messageDeduplicationId(dedupId) .messageAttributes(attributes) .build(); final PublishResponse response = snsClient.publish(pubRequest); System.out.println(response.messageId()); System.out.println(response.sequenceNumber()); System.out.println("Message was published to " + topicArn); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } }

The following code example shows how to detect people and objects in a video with Amazon Rekognition.

SDK for Java 2.x

Shows how to use Amazon Rekognition Java API to create an app to detect faces and objects in videos located in an Amazon Simple Storage Service (Amazon S3) bucket. The app sends the admin an email notification with the results using Amazon Simple Email Service (Amazon SES).

For complete source code and instructions on how to set up and run, see the full example on GitHub.

Services used in this example
  • Amazon Rekognition

  • Amazon S3

  • Amazon SES

  • Amazon SNS

  • Amazon SQS

The following code example shows how to use the Amazon SQS Extended Client Library to work with large Amazon SQS messages.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.ExpirationStatus; import software.amazon.awssdk.services.s3.model.LifecycleExpiration; import software.amazon.awssdk.services.s3.model.LifecycleRule; import software.amazon.awssdk.services.s3.model.LifecycleRuleFilter; import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.PutBucketLifecycleConfigurationRequest; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import java.util.Arrays; import java.util.List; import java.util.UUID; /** * Example of using Amazon SQS Extended Client Library for Java 2.x. */ public class SqsExtendedClientExample { private static final Logger logger = LoggerFactory.getLogger(SqsExtendedClientExample.class); private String s3BucketName; private String queueUrl; private final String queueName; private final S3Client s3Client; private final SqsClient sqsExtendedClient; private final int messageSize; /** * Constructor with default clients and message size. */ public SqsExtendedClientExample() { this(S3Client.create(), 300000); } /** * Constructor with custom S3 client and message size. * * @param s3Client The S3 client to use * @param messageSize The size of the test message to create */ public SqsExtendedClientExample(S3Client s3Client, int messageSize) { this.s3Client = s3Client; this.messageSize = messageSize; // Generate a unique bucket name. this.s3BucketName = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); // Generate a unique queue name. this.queueName = "MyQueue-" + UUID.randomUUID(); // Configure the SQS extended client. final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, s3BucketName); this.sqsExtendedClient = new AmazonSQSExtendedClient(SqsClient.builder().build(), extendedClientConfig); } public static void main(String[] args) { SqsExtendedClientExample example = new SqsExtendedClientExample(); try { example.setup(); example.sendAndReceiveMessage(); } finally { example.cleanup(); } } /** * Send a large message and receive it back. * * @return The received message */ public Message sendAndReceiveMessage() { try { // Create a large message. char[] chars = new char[messageSize]; Arrays.fill(chars, 'x'); String largeMessage = new String(chars); // Send the message. final SendMessageRequest sendMessageRequest = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(largeMessage) .build(); sqsExtendedClient.sendMessage(sendMessageRequest); logger.info("Sent message of size: {}", largeMessage.length()); // Receive and return the message. final ReceiveMessageResponse receiveMessageResponse = sqsExtendedClient.receiveMessage( ReceiveMessageRequest.builder().queueUrl(queueUrl).build()); List<Message> messages = receiveMessageResponse.messages(); if (messages.isEmpty()) { throw new RuntimeException("No messages received"); } Message message = messages.getFirst(); logger.info("\nMessage received."); logger.info(" ID: {}", message.messageId()); logger.info(" Receipt handle: {}", message.receiptHandle()); logger.info(" Message body size: {}", message.body().length()); logger.info(" Message body (first 5 characters): {}", message.body().substring(0, 5)); return message; } catch (RuntimeException e) { logger.error("Error during message processing: {}", e.getMessage(), e); throw e; } }

The following code example shows how to work with S3 event notifications in an object-oriented way.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

This example show how to process S3 notification event by using Amazon SQS.

/** * This method receives S3 event notifications by using an SqsAsyncClient. * After the client receives the messages it deserializes the JSON payload and logs them. It uses * the S3EventNotification class (part of the S3 event notification API for Java) to deserialize * the JSON payload and access the messages in an object-oriented way. * * @param queueUrl The URL of the AWS SQS queue that receives the S3 event notifications. * @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/eventnotifications/s3/model/package-summary.html">S3EventNotification API</a>. * <p> * To use S3 event notification serialization/deserialization to objects, add the following * dependency to your Maven pom.xml file. * <dependency> * <groupId>software.amazon.awssdk</groupId> * <artifactId>s3-event-notifications</artifactId> * <version><LATEST></version> * </dependency> * <p> * The S3 event notification API became available with version 2.25.11 of the Java SDK. * <p> * This example shows the use of the API with AWS SQS, but it can be used to process S3 event notifications * in AWS SNS or AWS Lambda as well. * <p> * Note: The S3EventNotification class does not work with messages routed through AWS EventBridge. */ static void processS3Events(String bucketName, String queueUrl, String queueArn) { try { // Configure the bucket to send Object Created and Object Tagging notifications to an existing SQS queue. s3Client.putBucketNotificationConfiguration(b -> b .notificationConfiguration(ncb -> ncb .queueConfigurations(qcb -> qcb .events(Event.S3_OBJECT_CREATED, Event.S3_OBJECT_TAGGING) .queueArn(queueArn))) .bucket(bucketName) ).join(); triggerS3EventNotifications(bucketName); // Wait for event notifications to propagate. Thread.sleep(Duration.ofSeconds(5).toMillis()); boolean didReceiveMessages = true; while (didReceiveMessages) { // Display the number of messages that are available in the queue. sqsClient.getQueueAttributes(b -> b .queueUrl(queueUrl) .attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES) ).thenAccept(attributeResponse -> logger.info("Approximate number of messages in the queue: {}", attributeResponse.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES))) .join(); // Receive the messages. ReceiveMessageResponse response = sqsClient.receiveMessage(b -> b .queueUrl(queueUrl) ).get(); logger.info("Count of received messages: {}", response.messages().size()); didReceiveMessages = !response.messages().isEmpty(); // Create a collection to hold the received message for deletion // after we log the messages. HashSet<DeleteMessageBatchRequestEntry> messagesToDelete = new HashSet<>(); // Process each message. response.messages().forEach(message -> { logger.info("Message id: {}", message.messageId()); // Deserialize JSON message body to a S3EventNotification object // to access messages in an object-oriented way. S3EventNotification event = S3EventNotification.fromJson(message.body()); // Log the S3 event notification record details. if (event.getRecords() != null) { event.getRecords().forEach(record -> { String eventName = record.getEventName(); String key = record.getS3().getObject().getKey(); logger.info(record.toString()); logger.info("Event name is {} and key is {}", eventName, key); }); } // Add logged messages to collection for batch deletion. messagesToDelete.add(DeleteMessageBatchRequestEntry.builder() .id(message.messageId()) .receiptHandle(message.receiptHandle()) .build()); }); // Delete messages. if (!messagesToDelete.isEmpty()) { sqsClient.deleteMessageBatch(DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(messagesToDelete) .build() ).join(); } } // End of while block. } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }

The following code example shows how to:

  • Create topic (FIFO or non-FIFO).

  • Subscribe several queues to the topic with an option to apply a filter.

  • Publish messages to the topic.

  • Poll the queues for messages received.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

package com.example.sns; import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.CreateTopicRequest; import software.amazon.awssdk.services.sns.model.CreateTopicResponse; import software.amazon.awssdk.services.sns.model.DeleteTopicRequest; import software.amazon.awssdk.services.sns.model.DeleteTopicResponse; import software.amazon.awssdk.services.sns.model.MessageAttributeValue; import software.amazon.awssdk.services.sns.model.PublishRequest; import software.amazon.awssdk.services.sns.model.PublishResponse; import software.amazon.awssdk.services.sns.model.SetSubscriptionAttributesRequest; import software.amazon.awssdk.services.sns.model.SnsException; import software.amazon.awssdk.services.sns.model.SubscribeRequest; import software.amazon.awssdk.services.sns.model.SubscribeResponse; import software.amazon.awssdk.services.sns.model.UnsubscribeRequest; import software.amazon.awssdk.services.sns.model.UnsubscribeResponse; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Scanner; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * <p> * For more information, see the following documentation topic: * <p> * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * <p> * This Java example performs these tasks: * <p> * 1. Gives the user three options to choose from. * 2. Creates an Amazon Simple Notification Service (Amazon SNS) topic. * 3. Creates an Amazon Simple Queue Service (Amazon SQS) queue. * 4. Gets the SQS queue Amazon Resource Name (ARN) attribute. * 5. Attaches an AWS Identity and Access Management (IAM) policy to the queue. * 6. Subscribes to the SQS queue. * 7. Publishes a message to the topic. * 8. Displays the messages. * 9. Deletes the received message. * 10. Unsubscribes from the topic. * 11. Deletes the SNS topic. */ public class SNSWorkflow { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) { final String usage = "\n" + "Usage:\n" + " <fifoQueueARN>\n\n" + "Where:\n" + " accountId - Your AWS account Id value."; if (args.length != 1) { System.out.println(usage); System.exit(1); } SnsClient snsClient = SnsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); SqsClient sqsClient = SqsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); Scanner in = new Scanner(System.in); String accountId = args[0]; String useFIFO; String duplication = "n"; String topicName; String deduplicationID = null; String groupId = null; String topicArn; String sqsQueueName; String sqsQueueUrl; String sqsQueueArn; String subscriptionArn; boolean selectFIFO = false; String message; List<Message> messageList; List<String> filterList = new ArrayList<>(); String msgAttValue = ""; System.out.println(DASHES); System.out.println("Welcome to messaging with topics and queues."); System.out.println("In this scenario, you will create an SNS topic and subscribe an SQS queue to the topic.\n" + "You can select from several options for configuring the topic and the subscriptions for the queue.\n" + "You can then post to the topic and see the results in the queue."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("SNS topics can be configured as FIFO (First-In-First-Out).\n" + "FIFO topics deliver messages in order and support deduplication and message filtering.\n" + "Would you like to work with FIFO topics? (y/n)"); useFIFO = in.nextLine(); if (useFIFO.compareTo("y") == 0) { selectFIFO = true; System.out.println("You have selected FIFO"); System.out.println(" Because you have chosen a FIFO topic, deduplication is supported.\n" + " Deduplication IDs are either set in the message or automatically generated from content using a hash function.\n" + " If a message is successfully published to an SNS FIFO topic, any message published and determined to have the same deduplication ID,\n" + " within the five-minute deduplication interval, is accepted but not delivered.\n" + " For more information about deduplication, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html."); System.out.println( "Would you like to use content-based deduplication instead of entering a deduplication ID? (y/n)"); duplication = in.nextLine(); if (duplication.compareTo("y") == 0) { System.out.println("Please enter a group id value"); groupId = in.nextLine(); } else { System.out.println("Please enter deduplication Id value"); deduplicationID = in.nextLine(); System.out.println("Please enter a group id value"); groupId = in.nextLine(); } } System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a topic."); System.out.println("Enter a name for your SNS topic."); topicName = in.nextLine(); if (selectFIFO) { System.out.println("Because you have selected a FIFO topic, '.fifo' must be appended to the topic name."); topicName = topicName + ".fifo"; System.out.println("The name of the topic is " + topicName); topicArn = createFIFO(snsClient, topicName, duplication); System.out.println("The ARN of the FIFO topic is " + topicArn); } else { System.out.println("The name of the topic is " + topicName); topicArn = createSNSTopic(snsClient, topicName); System.out.println("The ARN of the non-FIFO topic is " + topicArn); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Create an SQS queue."); System.out.println("Enter a name for your SQS queue."); sqsQueueName = in.nextLine(); if (selectFIFO) { sqsQueueName = sqsQueueName + ".fifo"; } sqsQueueUrl = createQueue(sqsClient, sqsQueueName, selectFIFO); System.out.println("The queue URL is " + sqsQueueUrl); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Get the SQS queue ARN attribute."); sqsQueueArn = getSQSQueueAttrs(sqsClient, sqsQueueUrl); System.out.println("The ARN of the new queue is " + sqsQueueArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Attach an IAM policy to the queue."); // Define the policy to use. Make sure that you change the REGION if you are // running this code // in a different region. String policy = """ { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": "sqs:SendMessage", "Resource": "arn:aws:sqs:us-east-1:%s:%s", "Condition": { "ArnEquals": { "aws:SourceArn": "arn:aws:sns:us-east-1:%s:%s" } } } ] } """.formatted(accountId, sqsQueueName, accountId, topicName); setQueueAttr(sqsClient, sqsQueueUrl, policy); System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Subscribe to the SQS queue."); if (selectFIFO) { System.out.println( "If you add a filter to this subscription, then only the filtered messages will be received in the queue.\n" + "For information about message filtering, see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html\n" + "For this example, you can filter messages by a \"tone\" attribute."); System.out.println("Would you like to filter messages for " + sqsQueueName + "'s subscription to the topic " + topicName + "? (y/n)"); String filterAns = in.nextLine(); if (filterAns.compareTo("y") == 0) { boolean moreAns = false; System.out.println("You can filter messages by one or more of the following \"tone\" attributes."); System.out.println("1. cheerful"); System.out.println("2. funny"); System.out.println("3. serious"); System.out.println("4. sincere"); while (!moreAns) { System.out.println("Select a number or choose 0 to end."); String ans = in.nextLine(); switch (ans) { case "1": filterList.add("cheerful"); break; case "2": filterList.add("funny"); break; case "3": filterList.add("serious"); break; case "4": filterList.add("sincere"); break; default: moreAns = true; break; } } } } subscriptionArn = subQueue(snsClient, topicArn, sqsQueueArn, filterList); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Publish a message to the topic."); if (selectFIFO) { System.out.println("Would you like to add an attribute to this message? (y/n)"); String msgAns = in.nextLine(); if (msgAns.compareTo("y") == 0) { System.out.println("You can filter messages by one or more of the following \"tone\" attributes."); System.out.println("1. cheerful"); System.out.println("2. funny"); System.out.println("3. serious"); System.out.println("4. sincere"); System.out.println("Select a number or choose 0 to end."); String ans = in.nextLine(); switch (ans) { case "1": msgAttValue = "cheerful"; break; case "2": msgAttValue = "funny"; break; case "3": msgAttValue = "serious"; break; default: msgAttValue = "sincere"; break; } System.out.println("Selected value is " + msgAttValue); } System.out.println("Enter a message."); message = in.nextLine(); pubMessageFIFO(snsClient, message, topicArn, msgAttValue, duplication, groupId, deduplicationID); } else { System.out.println("Enter a message."); message = in.nextLine(); pubMessage(snsClient, message, topicArn); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Display the message. Press any key to continue."); in.nextLine(); messageList = receiveMessages(sqsClient, sqsQueueUrl, msgAttValue); for (Message mes : messageList) { System.out.println("Message Id: " + mes.messageId()); System.out.println("Full Message: " + mes.body()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. Delete the received message. Press any key to continue."); in.nextLine(); deleteMessages(sqsClient, sqsQueueUrl, messageList); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Unsubscribe from the topic and delete the queue. Press any key to continue."); in.nextLine(); unSub(snsClient, subscriptionArn); deleteSQSQueue(sqsClient, sqsQueueName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete the topic. Press any key to continue."); in.nextLine(); deleteSNSTopic(snsClient, topicArn); System.out.println(DASHES); System.out.println("The SNS/SQS workflow has completed successfully."); System.out.println(DASHES); } public static void deleteSNSTopic(SnsClient snsClient, String topicArn) { try { DeleteTopicRequest request = DeleteTopicRequest.builder() .topicArn(topicArn) .build(); DeleteTopicResponse result = snsClient.deleteTopic(request); System.out.println("Status was " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteSQSQueue(SqsClient sqsClient, String queueName) { try { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); System.out.println(queueName + " was successfully deleted."); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void unSub(SnsClient snsClient, String subscriptionArn) { try { UnsubscribeRequest request = UnsubscribeRequest.builder() .subscriptionArn(subscriptionArn) .build(); UnsubscribeResponse result = snsClient.unsubscribe(request); System.out.println("Status was " + result.sdkHttpResponse().statusCode() + "\nSubscription was removed for " + request.subscriptionArn()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { try { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for (Message msg : messages) { DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder() .id(msg.messageId()) .build(); entries.add(entry); } DeleteMessageBatchRequest deleteMessageBatchRequest = DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); sqsClient.deleteMessageBatch(deleteMessageBatchRequest); System.out.println("The batch delete of messages was successful"); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static List<Message> receiveMessages(SqsClient sqsClient, String queueUrl, String msgAttValue) { try { if (msgAttValue.isEmpty()) { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } else { // We know there are filters on the message. ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .messageAttributeNames(msgAttValue) // Include other message attributes if needed. .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveRequest).messages(); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null; } public static void pubMessage(SnsClient snsClient, String message, String topicArn) { try { PublishRequest request = PublishRequest.builder() .message(message) .topicArn(topicArn) .build(); PublishResponse result = snsClient.publish(request); System.out .println(result.messageId() + " Message sent. Status is " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void pubMessageFIFO(SnsClient snsClient, String message, String topicArn, String msgAttValue, String duplication, String groupId, String deduplicationID) { try { PublishRequest request; // Means the user did not choose to use a message attribute. if (msgAttValue.isEmpty()) { if (duplication.compareTo("y") == 0) { request = PublishRequest.builder() .message(message) .messageGroupId(groupId) .topicArn(topicArn) .build(); } else { request = PublishRequest.builder() .message(message) .messageDeduplicationId(deduplicationID) .messageGroupId(groupId) .topicArn(topicArn) .build(); } } else { Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put(msgAttValue, MessageAttributeValue.builder() .dataType("String") .stringValue("true") .build()); if (duplication.compareTo("y") == 0) { request = PublishRequest.builder() .message(message) .messageGroupId(groupId) .topicArn(topicArn) .build(); } else { // Create a publish request with the message and attributes. request = PublishRequest.builder() .topicArn(topicArn) .message(message) .messageDeduplicationId(deduplicationID) .messageGroupId(groupId) .messageAttributes(messageAttributes) .build(); } } // Publish the message to the topic. PublishResponse result = snsClient.publish(request); System.out .println(result.messageId() + " Message sent. Status was " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } // Subscribe to the SQS queue. public static String subQueue(SnsClient snsClient, String topicArn, String queueArn, List<String> filterList) { try { SubscribeRequest request; if (filterList.isEmpty()) { // No filter subscription is added. request = SubscribeRequest.builder() .protocol("sqs") .endpoint(queueArn) .returnSubscriptionArn(true) .topicArn(topicArn) .build(); SubscribeResponse result = snsClient.subscribe(request); System.out.println("The queue " + queueArn + " has been subscribed to the topic " + topicArn + "\n" + "with the subscription ARN " + result.subscriptionArn()); return result.subscriptionArn(); } else { request = SubscribeRequest.builder() .protocol("sqs") .endpoint(queueArn) .returnSubscriptionArn(true) .topicArn(topicArn) .build(); SubscribeResponse result = snsClient.subscribe(request); System.out.println("The queue " + queueArn + " has been subscribed to the topic " + topicArn + "\n" + "with the subscription ARN " + result.subscriptionArn()); String attributeName = "FilterPolicy"; Gson gson = new Gson(); String jsonString = "{\"tone\": []}"; JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class); JsonArray toneArray = jsonObject.getAsJsonArray("tone"); for (String value : filterList) { toneArray.add(new JsonPrimitive(value)); } String updatedJsonString = gson.toJson(jsonObject); System.out.println(updatedJsonString); SetSubscriptionAttributesRequest attRequest = SetSubscriptionAttributesRequest.builder() .subscriptionArn(result.subscriptionArn()) .attributeName(attributeName) .attributeValue(updatedJsonString) .build(); snsClient.setSubscriptionAttributes(attRequest); return result.subscriptionArn(); } } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } // Attach a policy to the queue. public static void setQueueAttr(SqsClient sqsClient, String queueUrl, String policy) { try { Map<software.amazon.awssdk.services.sqs.model.QueueAttributeName, String> attrMap = new HashMap<>(); attrMap.put(QueueAttributeName.POLICY, policy); SetQueueAttributesRequest attributesRequest = SetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributes(attrMap) .build(); sqsClient.setQueueAttributes(attributesRequest); System.out.println("The policy has been successfully attached."); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static String getSQSQueueAttrs(SqsClient sqsClient, String queueUrl) { // Specify the attributes to retrieve. List<QueueAttributeName> atts = new ArrayList<>(); atts.add(QueueAttributeName.QUEUE_ARN); GetQueueAttributesRequest attributesRequest = GetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributeNames(atts) .build(); GetQueueAttributesResponse response = sqsClient.getQueueAttributes(attributesRequest); Map<String, String> queueAtts = response.attributesAsStrings(); for (Map.Entry<String, String> queueAtt : queueAtts.entrySet()) return queueAtt.getValue(); return ""; } public static String createQueue(SqsClient sqsClient, String queueName, Boolean selectFIFO) { try { System.out.println("\nCreate Queue"); if (selectFIFO) { Map<QueueAttributeName, String> attrs = new HashMap<>(); attrs.put(QueueAttributeName.FIFO_QUEUE, "true"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .attributes(attrs) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } else { CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createSNSTopic(SnsClient snsClient, String topicName) { CreateTopicResponse result; try { CreateTopicRequest request = CreateTopicRequest.builder() .name(topicName) .build(); result = snsClient.createTopic(request); return result.topicArn(); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createFIFO(SnsClient snsClient, String topicName, String duplication) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = new HashMap<>(); if (duplication.compareTo("n") == 0) { topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); } else { topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "true"); } CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); return response.topicArn(); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } }

The following code example shows how to:

  • Create an Amazon SQS queue.

  • Send batches of messages to the queue.

  • Receive batches of messages from the queue.

  • Delete batches of messages from the queue.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

You can handle batch message operations with Amazon SQS using two different approaches with the AWS SDK for Java 2.x:

SendRecvBatch.java uses explicit batch operations. You manually create message batches and call sendMessageBatch() and deleteMessageBatch() directly. You also handle batch responses, including any failed messages. This approach gives you full control over batch sizing and error handling. However, it requires more code to manage the batching logic.

SimpleProducerConsumer.java uses the high-level SqsAsyncBatchManager library for automatic request batching. You make individual sendMessage() and deleteMessage() calls with the same method signatures as the standard client. The SDK automatically buffers these calls and sends them as batch operations. This approach requires minimal code changes while providing batching performance benefits.

Use explicit batching when you need fine-grained control over batch composition and error handling. Use automatic batching when you want to optimize performance with minimal code changes.

SendRecvBatch.java - Uses explicit batch operations with messages.

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.SqsException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ /** * This code demonstrates basic message operations in Amazon Simple Queue Service (Amazon SQS). */ public class SendRecvBatch { private static final Logger LOGGER = LoggerFactory.getLogger(SendRecvBatch.class); private static final SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { usageDemo(); } /** * Send a batch of messages in a single request to an SQS queue. * This request may return overall success even when some messages were not sent. * The caller must inspect the Successful and Failed lists in the response and * resend any failed messages. * * @param queueUrl The URL of the queue to receive the messages. * @param messages The messages to send to the queue. Each message contains a body and attributes. * @return The response from SQS that contains the list of successful and failed messages. */ public static SendMessageBatchResponse sendMessages( String queueUrl, List<MessageEntry> messages) { try { List<SendMessageBatchRequestEntry> entries = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { MessageEntry message = messages.get(i); entries.add(SendMessageBatchRequestEntry.builder() .id(String.valueOf(i)) .messageBody(message.getBody()) .messageAttributes(message.getAttributes()) .build()); } SendMessageBatchRequest sendBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); SendMessageBatchResponse response = sqsClient.sendMessageBatch(sendBatchRequest); if (!response.successful().isEmpty()) { for (SendMessageBatchResultEntry resultEntry : response.successful()) { LOGGER.info("Message sent: {}: {}", resultEntry.messageId(), messages.get(Integer.parseInt(resultEntry.id())).getBody()); } } if (!response.failed().isEmpty()) { for (BatchResultErrorEntry errorEntry : response.failed()) { LOGGER.warn("Failed to send: {}: {}", errorEntry.id(), messages.get(Integer.parseInt(errorEntry.id())).getBody()); } } return response; } catch (SqsException e) { LOGGER.error("Send messages failed to queue: {}", queueUrl, e); throw e; } } /** * Receive a batch of messages in a single request from an SQS queue. * * @param queueUrl The URL of the queue from which to receive messages. * @param maxNumber The maximum number of messages to receive (capped at 10 by SQS). * The actual number of messages received might be less. * @param waitTime The maximum time to wait (in seconds) before returning. When * this number is greater than zero, long polling is used. This * can result in reduced costs and fewer false empty responses. * @return The list of Message objects received. These each contain the body * of the message and metadata and custom attributes. */ public static List<Message> receiveMessages(String queueUrl, int maxNumber, int waitTime) { try { ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(maxNumber) .waitTimeSeconds(waitTime) .messageAttributeNames("All") .build(); List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages(); for (Message message : messages) { LOGGER.info("Received message: {}: {}", message.messageId(), message.body()); } return messages; } catch (SqsException e) { LOGGER.error("Couldn't receive messages from queue: {}", queueUrl, e); throw e; } } /** * Delete a batch of messages from a queue in a single request. * * @param queueUrl The URL of the queue from which to delete the messages. * @param messages The list of messages to delete. * @return The response from SQS that contains the list of successful and failed * message deletions. */ public static DeleteMessageBatchResponse deleteMessages(String queueUrl, List<Message> messages) { try { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { entries.add(DeleteMessageBatchRequestEntry.builder() .id(String.valueOf(i)) .receiptHandle(messages.get(i).receiptHandle()) .build()); } DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(deleteRequest); if (!response.successful().isEmpty()) { for (DeleteMessageBatchResultEntry resultEntry : response.successful()) { LOGGER.info("Deleted {}", messages.get(Integer.parseInt(resultEntry.id())).receiptHandle()); } } if (!response.failed().isEmpty()) { for (BatchResultErrorEntry errorEntry : response.failed()) { LOGGER.warn("Could not delete {}", messages.get(Integer.parseInt(errorEntry.id())).receiptHandle()); } } return response; } catch (SqsException e) { LOGGER.error("Couldn't delete messages from queue {}", queueUrl, e); throw e; } } /** * Helper class to represent a message with body and attributes. */ public static class MessageEntry { private final String body; private final Map<String, MessageAttributeValue> attributes; public MessageEntry(String body, Map<String, MessageAttributeValue> attributes) { this.body = body; this.attributes = attributes != null ? attributes : new HashMap<>(); } public String getBody() { return body; } public Map<String, MessageAttributeValue> getAttributes() { return attributes; } } /** * Shows how to: * * Read the lines from a file and send the lines in * batches of 10 as messages to a queue. * * Receive the messages in batches until the queue is empty. * * Reassemble the lines of the file and verify they match the original file. */ public static void usageDemo() { LOGGER.info("-".repeat(88)); LOGGER.info("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!"); LOGGER.info("-".repeat(88)); String queueUrl = null; try { // Create a queue for the demo. String queueName = "sqs-usage-demo-message-wrapper-" + System.currentTimeMillis(); CreateQueueRequest createRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); queueUrl = sqsClient.createQueue(createRequest).queueUrl(); LOGGER.info("Created queue: {}", queueUrl); try (InputStream inputStream = SendRecvBatch.class.getResourceAsStream("/log4j2.xml"); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { List<String> lines = reader.lines().toList(); // Send file lines in batches. int batchSize = 10; LOGGER.info("Sending file lines in batches of {} as messages.", batchSize); for (int i = 0; i < lines.size(); i += batchSize) { List<MessageEntry> messageBatch = new ArrayList<>(); for (int j = i; j < Math.min(i + batchSize, lines.size()); j++) { String line = lines.get(j); if (line == null || line.trim().isEmpty()) { continue; // Skip empty lines. } Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("line", MessageAttributeValue.builder() .dataType("String") .stringValue(String.valueOf(j)) .build()); messageBatch.add(new MessageEntry(lines.get(j), attributes)); } sendMessages(queueUrl, messageBatch); System.out.print("."); System.out.flush(); } LOGGER.info("\nDone. Sent {} messages.", lines.size()); // Receive and process messages. LOGGER.info("Receiving, handling, and deleting messages in batches of {}.", batchSize); String[] receivedLines = new String[lines.size()]; boolean moreMessages = true; while (moreMessages) { List<Message> receivedMessages = receiveMessages(queueUrl, batchSize, 5); for (Message message : receivedMessages) { int lineNumber = Integer.parseInt(message.messageAttributes().get("line").stringValue()); receivedLines[lineNumber] = message.body(); } if (!receivedMessages.isEmpty()) { deleteMessages(queueUrl, receivedMessages); } else { moreMessages = false; } } LOGGER.info("\nDone."); // Verify that all lines were received correctly. boolean allLinesMatch = true; for (int i = 0; i < lines.size(); i++) { String originalLine = lines.get(i); String receivedLine = receivedLines[i] == null ? "" : receivedLines[i]; if (!originalLine.equals(receivedLine)) { allLinesMatch = false; break; } } if (allLinesMatch) { LOGGER.info("Successfully reassembled all file lines!"); } else { LOGGER.info("Uh oh, some lines were missed!"); } } } catch (SqsException e) { LOGGER.error("SQS operation failed", e); } catch (RuntimeException | IOException e) { LOGGER.error("Unexpected runtime error during demo", e); } finally { // Clean up by deleting the queue if it was created. if (queueUrl != null) { try { DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); LOGGER.info("Deleted queue: {}", queueUrl); } catch (SqsException e) { LOGGER.error("Failed to delete queue: {}", queueUrl, e); } } } LOGGER.info("Thanks for watching!"); LOGGER.info("-".repeat(88)); } }

SimpleProducerConsumer.java - Uses automatic batching of messages.

package com.example.sqs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import software.amazon.awssdk.core.exception.SdkException; import java.math.BigInteger; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Demonstrates the AWS SDK for Java 2.x Automatic Request Batching API for Amazon SQS. * * This example showcases the high-level SqsAsyncBatchManager library that provides * efficient batching and buffering for SQS operations. The batch manager offers * methods that directly mirror SqsAsyncClient methods—sendMessage, changeMessageVisibility, * deleteMessage, and receiveMessage—making it a drop-in replacement with minimal code changes. * * Key features of the SqsAsyncBatchManager: * - Automatic batching: The SDK automatically buffers individual requests and sends them * as batches when maxBatchSize (default: 10) or sendRequestFrequency (default: 200ms) * thresholds are reached * - Familiar API: Method signatures match SqsAsyncClient exactly, requiring no learning curve * - Background optimization: The batch manager maintains internal buffers and handles * batching logic transparently * - Asynchronous operations: All methods return CompletableFuture for non-blocking execution * * Performance benefits demonstrated: * - Reduced API calls: Multiple individual requests are consolidated into single batch operations * - Lower costs: Fewer API calls result in reduced SQS charges * - Higher throughput: Batch operations process more messages per second * - Efficient resource utilization: Fewer network round trips and better connection reuse * * This example compares: * 1. Single-message operations using SqsAsyncClient directly * 2. Batch operations using SqsAsyncBatchManager with identical method calls * * Usage patterns: * - Set batch size to 1 to use SqsAsyncClient for baseline performance measurement * - Set batch size > 1 to use SqsAsyncBatchManager for optimized batch processing * - Monitor real-time throughput metrics to observe performance improvements * * Prerequisites: * - AWS SDK for Java 2.x version 2.28.0 or later * - An existing SQS queue * - Valid AWS credentials configured * * The program displays real-time metrics showing the dramatic performance difference * between individual operations and automatic batching. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Logger log = LoggerFactory.getLogger(SimpleProducerConsumer.class); /** * Runs the SQS batching demonstration with user-configured parameters. * * Prompts for queue name, thread counts, batch size, message size, and runtime. * Creates producer and consumer threads to demonstrate batching performance. * * @param args command line arguments (not used) * @throws InterruptedException if thread operations are interrupted */ public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); // Create SQS async client and batch manager for all operations. // The SqsAsyncBatchManager is created from the SqsAsyncClient using the // batchManager() factory method, which provides default batching configuration. // This high-level library automatically handles request buffering and batching // while maintaining the same method signatures as SqsAsyncClient. final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); final SqsAsyncBatchManager batchManager = sqsAsyncClient.batchManager(); final String queueUrl = sqsAsyncClient.getQueueUrl(GetQueueUrlRequest.builder() .queueName(queueName) .build()).join().queueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsAsyncClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(batchManager, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsAsyncClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(batchManager, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); // Close resources batchManager.close(); sqsAsyncClient.close(); } /** * Creates a random string of approximately the specified size in bytes. * * @param sizeByte the target size in bytes for the generated string * @return a random string encoded in base-32 */ private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * Sends messages individually using SqsAsyncClient for baseline performance measurement. * * This producer demonstrates traditional single-message operations without batching. * Each sendMessage() call results in a separate API request to SQS, providing * a performance baseline for comparison with the batch operations. * * The sendMessage() method signature is identical to SqsAsyncBatchManager.sendMessage(), * showing how the high-level batching library maintains API compatibility while * adding automatic optimization behind the scenes. */ private static class Producer extends Thread { final SqsAsyncClient sqsAsyncClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; /** * Creates a producer thread for single-message operations. * * @param sqsAsyncClient the SQS client for sending messages * @param queueUrl the URL of the target queue * @param messageSizeByte the size of messages to generate * @param producedCount shared counter for tracking sent messages * @param stop shared flag to signal thread termination */ Producer(SqsAsyncClient sqsAsyncClient, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsAsyncClient = sqsAsyncClient; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /** * Continuously sends messages until the stop flag is set. * * Uses SqsAsyncClient.sendMessage() directly, resulting in one API call per message. * This approach provides baseline performance metrics for comparison with batching. * Each call blocks until the individual message is sent, demonstrating traditional * one-request-per-operation behavior. */ public void run() { try { while (!stop.get()) { sqsAsyncClient.sendMessage(SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(theMessage) .build()).join(); producedCount.incrementAndGet(); } } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * Sends messages using SqsAsyncBatchManager for automatic request batching and optimization. * * This producer demonstrates the AWS SDK for Java 2.x high-level batching library. * The SqsAsyncBatchManager automatically buffers individual sendMessage() calls and * sends them as batches when thresholds are reached: * - maxBatchSize: Maximum 10 messages per batch (default) * - sendRequestFrequency: 200ms timeout before sending partial batches (default) * * Key advantages of the batching approach: * - Identical API: batchManager.sendMessage() has the same signature as sqsAsyncClient.sendMessage() * - Automatic optimization: No code changes needed to benefit from batching * - Transparent buffering: The SDK handles batching logic internally * - Reduced API calls: Multiple messages sent in single batch requests * - Lower costs: Fewer API calls result in reduced SQS charges * - Higher throughput: Batch operations process significantly more messages per second */ private static class BatchProducer extends Thread { final SqsAsyncBatchManager batchManager; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; /** * Creates a producer thread for batch operations. * * @param batchManager the batch manager for efficient message sending * @param queueUrl the URL of the target queue * @param batchSize the number of messages to send per batch * @param messageSizeByte the size of messages to generate * @param producedCount shared counter for tracking sent messages * @param stop shared flag to signal thread termination */ BatchProducer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.batchManager = batchManager; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /** * Continuously sends batches of messages using the high-level batching library. * * Notice how batchManager.sendMessage() uses the exact same method signature * and request builder pattern as SqsAsyncClient.sendMessage(). This demonstrates * the drop-in replacement capability of the SqsAsyncBatchManager. * * The SDK automatically: * - Buffers individual sendMessage() calls internally * - Groups them into batch requests when thresholds are met * - Sends SendMessageBatchRequest operations to SQS * - Returns individual CompletableFuture responses for each message * * This transparent batching provides significant performance improvements * without requiring changes to application logic or error handling patterns. */ public void run() { try { while (!stop.get()) { // Send multiple messages using the high-level batch manager. // Each batchManager.sendMessage() call uses identical syntax to // sqsAsyncClient.sendMessage(), demonstrating API compatibility. // The SDK automatically buffers these calls and sends them as // batch operations when maxBatchSize (10) or sendRequestFrequency (200ms) // thresholds are reached, significantly improving throughput. for (int i = 0; i < batchSize; i++) { CompletableFuture<SendMessageResponse> future = batchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(theMessage) .build()); // Handle the response asynchronously future.whenComplete((response, throwable) -> { if (throwable == null) { producedCount.incrementAndGet(); } else if (!(throwable instanceof java.util.concurrent.CancellationException) && !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { log.error("BatchProducer: Failed to send message", throwable); } // Ignore CancellationException and executor shutdown errors - expected during shutdown }); } // Small delay to allow batching to occur Thread.sleep(10); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("BatchProducer interrupted: " + e.getMessage()); } catch (SdkException | java.util.concurrent.CompletionException e) { log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * Receives and deletes messages individually using SqsAsyncClient for baseline measurement. * * This consumer demonstrates traditional single-message operations without batching. * Each receiveMessage() and deleteMessage() call results in separate API requests, * providing a performance baseline for comparison with batch operations. * * The method signatures are identical to SqsAsyncBatchManager methods: * - receiveMessage() matches batchManager.receiveMessage() * - deleteMessage() matches batchManager.deleteMessage() * * This API consistency allows easy migration to the high-level batching library. */ private static class Consumer extends Thread { final SqsAsyncClient sqsAsyncClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; /** * Creates a consumer thread for single-message operations. * * @param sqsAsyncClient the SQS client for receiving messages * @param queueUrl the URL of the source queue * @param consumedCount shared counter for tracking processed messages * @param stop shared flag to signal thread termination */ Consumer(SqsAsyncClient sqsAsyncClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsAsyncClient = sqsAsyncClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /** * Continuously receives and deletes messages using traditional single-request operations. * * Uses SqsAsyncClient methods directly: * - receiveMessage(): One API call per receive operation * - deleteMessage(): One API call per delete operation * * This approach demonstrates the baseline performance without batching optimization. * Compare these method calls with the identical signatures used in BatchConsumer * to see how the high-level batching library maintains API compatibility. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResponse result = sqsAsyncClient.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build()).join(); if (!result.messages().isEmpty()) { final Message m = result.messages().get(0); // Note: deleteMessage() signature identical to batchManager.deleteMessage() sqsAsyncClient.deleteMessage(DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(m.receiptHandle()) .build()).join(); consumedCount.incrementAndGet(); } } catch (SdkException | java.util.concurrent.CompletionException e) { log.error(e.getMessage()); } } } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * Receives and deletes messages using SqsAsyncBatchManager for automatic optimization. * * This consumer demonstrates the AWS SDK for Java 2.x high-level batching library * for message consumption. The SqsAsyncBatchManager provides two key optimizations: * * 1. Receive optimization: Maintains an internal buffer of messages fetched in the * background, so receiveMessage() calls return immediately from the buffer * 2. Delete batching: Automatically buffers deleteMessage() calls and sends them * as DeleteMessageBatchRequest operations when thresholds are reached * * Key features: * - Identical API: receiveMessage() and deleteMessage() have the same signatures * as SqsAsyncClient methods, making this a true drop-in replacement * - Background fetching: The batch manager continuously fetches messages to keep * the internal buffer populated, reducing receive latency * - Automatic delete batching: Individual deleteMessage() calls are buffered and * sent as batch operations (up to 10 per batch, 200ms frequency) * - Transparent optimization: No application logic changes needed to benefit * * Performance benefits: * - Reduced API calls through automatic batching of delete operations * - Lower latency for receives due to background message buffering * - Higher overall throughput with fewer network round trips */ private static class BatchConsumer extends Thread { final SqsAsyncBatchManager batchManager; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; /** * Creates a consumer thread for batch operations. * * @param batchManager the batch manager for efficient message processing * @param queueUrl the URL of the source queue * @param batchSize the maximum number of messages to receive per batch * @param consumedCount shared counter for tracking processed messages * @param stop shared flag to signal thread termination */ BatchConsumer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.batchManager = batchManager; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } /** * Continuously receives and deletes messages using the high-level batching library. * * Demonstrates the key advantage of SqsAsyncBatchManager: identical method signatures * with automatic optimization. Notice how: * * - batchManager.receiveMessage() uses the same syntax as sqsAsyncClient.receiveMessage() * - batchManager.deleteMessage() uses the same syntax as sqsAsyncClient.deleteMessage() * * Behind the scenes, the batch manager: * 1. Maintains an internal message buffer populated by background fetching * 2. Returns messages immediately from the buffer (reduced latency) * 3. Automatically batches deleteMessage() calls into DeleteMessageBatchRequest operations * 4. Sends batch deletes when maxBatchSize (10) or sendRequestFrequency (200ms) is reached * * This provides significant performance improvements with zero code changes * compared to traditional SqsAsyncClient usage patterns. */ public void run() { try { while (!stop.get()) { // Receive messages using the high-level batch manager. // This call uses identical syntax to sqsAsyncClient.receiveMessage() // but benefits from internal message buffering for improved performance. final ReceiveMessageResponse result = batchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(Math.min(batchSize, 10)) .build()).join(); if (!result.messages().isEmpty()) { final List<Message> messages = result.messages(); // Delete messages using the batch manager. // Each deleteMessage() call uses identical syntax to SqsAsyncClient // but the SDK automatically buffers these calls and sends them // as DeleteMessageBatchRequest operations for optimal performance. for (Message message : messages) { CompletableFuture<DeleteMessageResponse> future = batchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build()); future.whenComplete((response, throwable) -> { if (throwable == null) { consumedCount.incrementAndGet(); } else if (!(throwable instanceof java.util.concurrent.CancellationException) && !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { log.error("BatchConsumer: Failed to delete message", throwable); } // Ignore CancellationException and executor shutdown errors - expected during shutdown }); } } // Small delay to prevent tight polling Thread.sleep(10); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("BatchConsumer interrupted: " + e.getMessage()); } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * Displays real-time throughput statistics every second. * * This thread logs the current count of produced and consumed messages * to help you monitor the performance comparison. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; /** * Creates a monitoring thread that displays throughput statistics. * * @param producedCount shared counter for messages sent * @param consumedCount shared counter for messages processed * @param stop shared flag to signal thread termination */ Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } /** * Logs throughput statistics every second until stopped. * * Displays the current count of produced and consumed messages * to help monitor the performance comparison between batching strategies. */ public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

The following code example shows how to use the Amazon SQS Java Messaging Library to work with the JMS interface.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

The following examples work with standard Amazon SQS queues and include:

  • Sending a text message.

  • Receiving messages synchronously.

  • Receiving messages asynchronously.

  • Receiving messages using CLIENT_ACKNOWLEDGE mode.

  • Receiving messages using the UNORDERED_ACKNOWLEDGE mode.

  • Using Spring to inject dependencies.

  • A utility class that provides common methods used by the other examples.

For more information on using JMS with Amazon SQS, see the Amazon SQS Developer Guide.

Sending a text message.

/** * This method establishes a connection to a standard Amazon SQS queue using the Amazon SQS * Java Messaging Library and sends text messages to it. It uses JMS (Java Message Service) API * with automatic acknowledgment mode to ensure reliable message delivery, and automatically * manages all messaging resources. * * @throws JMSException If there is a problem connecting to or sending messages to the queue */ public static void doSendTextMessage() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create the connection in a try-with-resources statement so that it's closed automatically. try (SQSConnection connection = connectionFactory.createConnection()) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); // Create a session that uses the JMS auto-acknowledge mode. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); createAndSendMessages(session, producer); } // The connection closes automatically. This also closes the session. LOGGER.info("Connection closed"); } /** * This method reads text input from the keyboard and sends each line as a separate message * to a standard Amazon SQS queue using the Amazon SQS Java Messaging Library. It continues * to accept input until the user enters an empty line, using JMS (Java Message Service) API to * handle the message delivery. * * @param session The JMS session used to create messages * @param producer The JMS message producer used to send messages to the queue */ private static void createAndSendMessages(Session session, MessageProducer producer) { BufferedReader inputReader = new BufferedReader( new InputStreamReader(System.in, Charset.defaultCharset())); try { String input; while (true) { LOGGER.info("Enter message to send (leave empty to exit): "); input = inputReader.readLine(); if (input == null || input.isEmpty()) break; TextMessage message = session.createTextMessage(input); producer.send(message); LOGGER.info("Send message {}", message.getJMSMessageID()); } } catch (EOFException e) { // Just return on EOF } catch (IOException e) { LOGGER.error("Failed reading input: {}", e.getMessage(), e); } catch (JMSException e) { LOGGER.error("Failed sending message: {}", e.getMessage(), e); } }

Receiving messages synchronously.

/** * This method receives messages from a standard Amazon SQS queue using the Amazon SQS Java * Messaging Library. It creates a connection to the queue using JMS (Java Message Service), * waits for messages to arrive, and processes them one at a time. The method handles all * necessary setup and cleanup of messaging resources. * * @throws JMSException If there is a problem connecting to or receiving messages from the queue */ public static void doReceiveMessageSync() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create a connection. try (SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); // Create a session. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); connection.start(); receiveMessages(consumer); } // The connection closes automatically. This also closes the session. LOGGER.info("Connection closed"); } /** * This method continuously checks for new messages from a standard Amazon SQS queue using * the Amazon SQS Java Messaging Library. It waits up to 20 seconds for each message, processes * it using JMS (Java Message Service), and confirms receipt. The method stops checking for * messages after 20 seconds of no activity. * * @param consumer The JMS message consumer that receives messages from the queue */ private static void receiveMessages(MessageConsumer consumer) { try { while (true) { LOGGER.info("Waiting for messages..."); // Wait 1 minute for a message Message message = consumer.receive(Duration.ofSeconds(20).toMillis()); if (message == null) { LOGGER.info("Shutting down after 20 seconds of silence."); break; } SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); LOGGER.info("Acknowledged message {}", message.getJMSMessageID()); } } catch (JMSException e) { LOGGER.error("Error receiving from SQS: {}", e.getMessage(), e); } }

Receiving messages asynchronously.

/** * This method sets up automatic message handling for a standard Amazon SQS queue using the * Amazon SQS Java Messaging Library. It creates a listener that processes messages as soon * as they arrive using JMS (Java Message Service), runs for 5 seconds, then cleans up all * messaging resources. * * @throws JMSException If there is a problem connecting to or receiving messages from the queue */ public static void doReceiveMessageAsync() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create a connection. try (SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); // Create a session. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); try { // Create a consumer for the queue. MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); // Provide an implementation of the MessageListener interface, which has a single 'onMessage' method. // We use a lambda expression for the implementation. consumer.setMessageListener(message -> { try { SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); } catch (JMSException e) { LOGGER.error("Error processing message: {}", e.getMessage()); } }); // Start receiving incoming messages. connection.start(); LOGGER.info("Waiting for messages..."); } catch (JMSException e) { throw new RuntimeException(e); } try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } } // The connection closes automatically. This also closes the session. LOGGER.info( "Connection closed" ); }

Receiving messages using CLIENT_ACKNOWLEDGE mode.

/** * This method demonstrates how message acknowledgment affects message processing in a standard * Amazon SQS queue using the Amazon SQS Java Messaging Library. It sends messages to the queue, * then shows how JMS (Java Message Service) client acknowledgment mode handles both explicit * and implicit message confirmations, including how acknowledging one message can automatically * acknowledge previous messages. * * @throws JMSException If there is a problem with the messaging operations */ public static void doReceiveMessagesSyncClientAcknowledge() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create the connection in a try-with-resources statement so that it's closed automatically. try (SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, TIME_OUT_SECONDS); // Create a session with client acknowledge mode. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Create a producer and consumer. MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); // Open the connection. connection.start(); // Send two text messages. sendMessage(producer, session, "Message 1"); sendMessage(producer, session, "Message 2"); // Receive a message and don't acknowledge it. receiveMessage(consumer, false); // Receive another message and acknowledge it. receiveMessage(consumer, true); // Wait for the visibility time out, so that unacknowledged messages reappear in the queue, LOGGER.info("Waiting for visibility timeout..."); try { Thread.sleep(TIME_OUT_MILLIS); } catch (InterruptedException e) { LOGGER.error("Interrupted while waiting for visibility timeout", e); Thread.currentThread().interrupt(); throw new RuntimeException("Processing interrupted", e); } /* We will attempt to receive another message, but none will be available. This is because in CLIENT_ACKNOWLEDGE mode, when we acknowledged the second message, all previous messages were automatically acknowledged as well. Therefore, although we never directly acknowledged the first message, it was implicitly acknowledged when we confirmed the second one. */ receiveMessage(consumer, true); } // The connection closes automatically. This also closes the session. LOGGER.info("Connection closed."); } /** * Sends a text message using the specified JMS MessageProducer and Session. * * @param producer The JMS MessageProducer used to send the message * @param session The JMS Session used to create the text message * @param messageText The text content to be sent in the message * @throws JMSException If there is an error creating or sending the message */ private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException { // Create a text message and send it. producer.send(session.createTextMessage(messageText)); } /** * Receives and processes a message from a JMS queue using the specified consumer. * The method waits for a message until the configured timeout period is reached. * If a message is received, it is logged and optionally acknowledged based on the * acknowledge parameter. * * @param consumer The JMS MessageConsumer used to receive messages from the queue * @param acknowledge Boolean flag indicating whether to acknowledge the message. * If true, the message will be acknowledged after processing * @throws JMSException If there is an error receiving, processing, or acknowledging the message */ private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException { // Receive a message. Message message = consumer.receive(TIME_OUT_MILLIS); if (message == null) { LOGGER.info("Queue is empty!"); } else { // Since this queue has only text messages, cast the message object and print the text. LOGGER.info("Received: {} Acknowledged: {}", ((TextMessage) message).getText(), acknowledge); // Acknowledge the message if asked. if (acknowledge) message.acknowledge(); } }

Receiving messages using the UNORDERED_ACKNOWLEDGE mode.

/** * Demonstrates message acknowledgment behavior in UNORDERED_ACKNOWLEDGE mode with Amazon SQS JMS. * In this mode, each message must be explicitly acknowledged regardless of receive order. * Unacknowledged messages return to the queue after the visibility timeout expires, * unlike CLIENT_ACKNOWLEDGE mode where acknowledging one message acknowledges all previous messages. * * @throws JMSException If a JMS-related error occurs during message operations */ public static void doReceiveMessagesUnorderedAcknowledge() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create the connection in a try-with-resources statement so that it's closed automatically. try( SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, TIME_OUT_SECONDS); // Create a session with unordered acknowledge mode. Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); // Create the producer and consumer. MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); // Open a connection. connection.start(); // Send two text messages. sendMessage(producer, session, "Message 1"); sendMessage(producer, session, "Message 2"); // Receive a message and don't acknowledge it. receiveMessage(consumer, false); // Receive another message and acknowledge it. receiveMessage(consumer, true); // Wait for the visibility time out, so that unacknowledged messages reappear in the queue. LOGGER.info("Waiting for visibility timeout..."); try { Thread.sleep(TIME_OUT_MILLIS); } catch (InterruptedException e) { LOGGER.error("Interrupted while waiting for visibility timeout", e); Thread.currentThread().interrupt(); throw new RuntimeException("Processing interrupted", e); } /* We will attempt to receive another message, and we'll get the first message again. This occurs because in UNORDERED_ACKNOWLEDGE mode, each message requires its own separate acknowledgment. Since we only acknowledged the second message, the first message remains in the queue for redelivery. */ receiveMessage(consumer, true); LOGGER.info("Connection closed."); } // The connection closes automatically. This also closes the session. } /** * Sends a text message to an Amazon SQS queue using JMS. * * @param producer The JMS MessageProducer for the queue * @param session The JMS Session for message creation * @param messageText The message content * @throws JMSException If message creation or sending fails */ private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException { // Create a text message and send it. producer.send(session.createTextMessage(messageText)); } /** * Synchronously receives a message from an Amazon SQS queue using the JMS API * with an acknowledgment parameter. * * @param consumer The JMS MessageConsumer for the queue * @param acknowledge If true, acknowledges the message after receipt * @throws JMSException If message reception or acknowledgment fails */ private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException { // Receive a message. Message message = consumer.receive(TIME_OUT_MILLIS); if (message == null) { LOGGER.info("Queue is empty!"); } else { // Since this queue has only text messages, cast the message object and print the text. LOGGER.info("Received: {} Acknowledged: {}", ((TextMessage) message).getText(), acknowledge); // Acknowledge the message if asked. if (acknowledge) message.acknowledge(); } }

Using Spring to inject dependencies.

package com.example.sqs.jms.spring; import com.amazon.sqs.javamessaging.SQSConnection; import com.example.sqs.jms.SqsJmsExampleUtils; import jakarta.jms.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.support.FileSystemXmlApplicationContext; import java.io.File; import java.net.URL; import java.util.concurrent.TimeUnit; /** * Demonstrates how to send and receive messages using the Amazon SQS Java Messaging Library * with Spring Framework integration. This example connects to a standard Amazon SQS message * queue using Spring's dependency injection to configure the connection and messaging components. * The application uses the JMS (Java Message Service) API to handle message operations. */ public class SpringExample { private static final Integer POLLING_SECONDS = 15; private static final String SPRING_XML_CONFIG_FILE = "SpringExampleConfiguration.xml.txt"; private static final Logger LOGGER = LoggerFactory.getLogger(SpringExample.class); /** * Demonstrates sending and receiving messages through a standard Amazon SQS message queue * using Spring Framework configuration. This method loads connection settings from an XML file, * establishes a messaging session using the Amazon SQS Java Messaging Library, and processes * messages using JMS (Java Message Service) operations. If the queue doesn't exist, it will * be created automatically. * * @param args Command line arguments (not used) */ public static void main(String[] args) { URL resource = SpringExample.class.getClassLoader().getResource(SPRING_XML_CONFIG_FILE); File springFile = new File(resource.getFile()); if (!springFile.exists() || !springFile.canRead()) { LOGGER.error("File " + SPRING_XML_CONFIG_FILE + " doesn't exist or isn't readable."); System.exit(1); } try (FileSystemXmlApplicationContext context = new FileSystemXmlApplicationContext("file://" + springFile.getAbsolutePath())) { Connection connection; try { connection = context.getBean(Connection.class); } catch (NoSuchBeanDefinitionException e) { LOGGER.error("Can't find the JMS connection to use: " + e.getMessage(), e); System.exit(2); return; } String queueName; try { queueName = context.getBean("queueName", String.class); } catch (NoSuchBeanDefinitionException e) { LOGGER.error("Can't find the name of the queue to use: " + e.getMessage(), e); System.exit(3); return; } try { if (connection instanceof SQSConnection) { SqsJmsExampleUtils.ensureQueueExists((SQSConnection) connection, queueName, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); } // Create the JMS session. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); SqsJmsExampleUtils.sendTextMessage(session, queueName); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); receiveMessages(consumer); } catch (JMSException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } } // Spring context autocloses. Managed Spring beans that implement AutoClosable, such as the // 'connection' bean, are also closed. LOGGER.info("Context closed"); } /** * Continuously checks for and processes messages from a standard Amazon SQS message queue * using the Amazon SQS Java Messaging Library underlying the JMS API. This method waits for incoming messages, * processes them when they arrive, and acknowledges their receipt using JMS (Java Message * Service) operations. The method will stop checking for messages after 15 seconds of * inactivity. * * @param consumer The JMS message consumer used to receive messages from the queue */ private static void receiveMessages(MessageConsumer consumer) { try { while (true) { LOGGER.info("Waiting for messages..."); // Wait 15 seconds for a message. Message message = consumer.receive(TimeUnit.SECONDS.toMillis(POLLING_SECONDS)); if (message == null) { LOGGER.info("Shutting down after {} seconds of silence.", POLLING_SECONDS); break; } SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); LOGGER.info("Message acknowledged."); } } catch (JMSException e) { LOGGER.error("Error receiving from SQS.", e); } } }

Spring bean definitions.

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd "> <!-- Define the AWS Region --> <bean id="region" class="software.amazon.awssdk.regions.Region" factory-method="of"> <constructor-arg value="us-east-1"/> </bean> <bean id="credentialsProviderBean" class="software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider" factory-method="create"/> <bean id="clientBuilder" class="software.amazon.awssdk.services.sqs.SqsClient" factory-method="builder"/> <bean id="regionSetClientBuilder" factory-bean="clientBuilder" factory-method="region"> <constructor-arg ref="region"/> </bean> <!-- Configure the Builder with Credentials Provider --> <bean id="sqsClient" factory-bean="regionSetClientBuilder" factory-method="credentialsProvider"> <constructor-arg ref="credentialsProviderBean"/> </bean> <bean id="providerConfiguration" class="com.amazon.sqs.javamessaging.ProviderConfiguration"> <property name="numberOfMessagesToPrefetch" value="5"/> </bean> <bean id="connectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"> <constructor-arg ref="providerConfiguration"/> <constructor-arg ref="clientBuilder"/> </bean> <bean id="connection" factory-bean="connectionFactory" factory-method="createConnection" init-method="start" destroy-method="close"/> <bean id="queueName" class="java.lang.String"> <constructor-arg value="SQSJMSClientExampleQueue"/> </bean> </beans>

A utility class that provides common methods used by the other examples.

package com.example.sqs.jms; import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; import com.amazon.sqs.javamessaging.ProviderConfiguration; import com.amazon.sqs.javamessaging.SQSConnection; import com.amazon.sqs.javamessaging.SQSConnectionFactory; import jakarta.jms.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import java.time.Duration; import java.util.Base64; import java.util.Map; /** * This utility class provides helper methods for working with Amazon Simple Queue Service (Amazon SQS) * through the Java Message Service (JMS) interface. It contains common operations for managing message * queues and handling message delivery. */ public class SqsJmsExampleUtils { private static final Logger LOGGER = LoggerFactory.getLogger(SqsJmsExampleUtils.class); public static final Long QUEUE_VISIBILITY_TIMEOUT = 5L; /** * This method verifies that a message queue exists and creates it if necessary. The method checks for * an existing queue first to optimize performance. * * @param connection The active connection to the messaging service * @param queueName The name of the queue to verify or create * @param visibilityTimeout The duration in seconds that messages will be hidden after being received * @throws JMSException If there is an error accessing or creating the queue */ public static void ensureQueueExists(SQSConnection connection, String queueName, Long visibilityTimeout) throws JMSException { AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); /* In most cases, you can do this with just a 'createQueue' call, but 'getQueueUrl' (called by 'queueExists') is a faster operation for the common case where the queue already exists. Also, many users and roles have permission to call 'getQueueUrl' but don't have permission to call 'createQueue'. */ if( !client.queueExists(queueName) ) { CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .attributes(Map.of(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(visibilityTimeout))) .build(); client.createQueue( createQueueRequest ); } } /** * This method sends a simple text message to a specified message queue. It handles all necessary * setup for the message delivery process. * * @param session The active messaging session used to create and send the message * @param queueName The name of the queue where the message will be sent */ public static void sendTextMessage(Session session, String queueName) { // Rest of implementation... try { MessageProducer producer = session.createProducer( session.createQueue( queueName) ); Message message = session.createTextMessage("Hello world!"); producer.send(message); } catch (JMSException e) { LOGGER.error( "Error receiving from SQS", e ); } } /** * This method processes incoming messages and logs their content based on the message type. * It supports text messages, binary data, and Java objects. * * @param message The message to be processed and logged * @throws JMSException If there is an error reading the message content */ public static void handleMessage(Message message) throws JMSException { // Rest of implementation... LOGGER.info( "Got message {}", message.getJMSMessageID() ); LOGGER.info( "Content: "); if(message instanceof TextMessage txtMessage) { LOGGER.info( "\t{}", txtMessage.getText() ); } else if(message instanceof BytesMessage byteMessage){ // Assume the length fits in an int - SQS only supports sizes up to 256k so that // should be true byte[] bytes = new byte[(int)byteMessage.getBodyLength()]; byteMessage.readBytes(bytes); LOGGER.info( "\t{}", Base64.getEncoder().encodeToString( bytes ) ); } else if( message instanceof ObjectMessage) { ObjectMessage objMessage = (ObjectMessage) message; LOGGER.info( "\t{}", objMessage.getObject() ); } } /** * This method sets up automatic message processing for a specified queue. It creates a listener * that will receive and handle incoming messages without blocking the main program. * * @param session The active messaging session * @param queueName The name of the queue to monitor * @param connection The active connection to the messaging service */ public static void receiveMessagesAsync(Session session, String queueName, Connection connection) { // Rest of implementation... try { // Create a consumer for the queue. MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); // Provide an implementation of the MessageListener interface, which has a single 'onMessage' method. // We use a lambda expression for the implementation. consumer.setMessageListener(message -> { try { SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); } catch (JMSException e) { LOGGER.error("Error processing message: {}", e.getMessage()); } }); // Start receiving incoming messages. connection.start(); } catch (JMSException e) { throw new RuntimeException(e); } try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * This method performs cleanup operations after message processing is complete. It receives * any messages in the specified queue, removes the message queue and closes all * active connections to prevent resource leaks. * * @param queueName The name of the queue to be removed * @param visibilityTimeout The duration in seconds that messages are hidden after being received * @throws JMSException If there is an error during the cleanup process */ public static void cleanUpExample(String queueName, Long visibilityTimeout) throws JMSException { LOGGER.info("Performing cleanup."); SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); try (SQSConnection connection = connectionFactory.createConnection() ) { ensureQueueExists(connection, queueName, visibilityTimeout); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); receiveMessagesAsync(session, queueName, connection); SqsClient sqsClient = connection.getWrappedAmazonSQSClient().getAmazonSQSClient(); try { String queueUrl = sqsClient.getQueueUrl(b -> b.queueName(queueName)).queueUrl(); sqsClient.deleteQueue(b -> b.queueUrl(queueUrl)); LOGGER.info("Queue deleted: {}", queueUrl); } catch (SdkException e) { LOGGER.error("Error during SQS operations: ", e); } } LOGGER.info("Clean up: Connection closed"); } /** * This method creates a background task that sends multiple messages to a specified queue * after waiting for a set time period. The task operates independently to ensure efficient * message processing without interrupting other operations. * * @param queueName The name of the queue where messages will be sent * @param secondsToWait The number of seconds to wait before sending messages * @param numMessages The number of messages to send * @param visibilityTimeout The duration in seconds that messages remain hidden after being received * @return A task that can be executed to send the messages */ public static Runnable sendAMessageAsync(String queueName, Long secondsToWait, Integer numMessages, Long visibilityTimeout) { return () -> { try { Thread.sleep(Duration.ofSeconds(secondsToWait).toMillis()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } try { SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); try (SQSConnection connection = connectionFactory.createConnection()) { ensureQueueExists(connection, queueName, visibilityTimeout); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); for (int i = 1; i <= numMessages; i++) { MessageProducer producer = session.createProducer(session.createQueue(queueName)); producer.send(session.createTextMessage("Hello World " + i + "!")); } } } catch (JMSException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } }; } }

The following code example shows how to perform tagging operation with Amazon SQS.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

The following example creates tags for a queue, lists tags, and removes a tag.

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ListQueueTagsResponse; import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.Map; import java.util.UUID; /** * Before running this Java V2 code example, set up your development environment, including your credentials. For more * information, see the <a href="https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html">AWS * SDK for Java Developer Guide</a>. */ public class TagExamples { static final SqsClient sqsClient = SqsClient.create(); static final String queueName = "TagExamples-queue-" + UUID.randomUUID().toString().replace("-", "").substring(0, 20); private static final Logger LOGGER = LoggerFactory.getLogger(TagExamples.class); public static void main(String[] args) { final String queueUrl; try { queueUrl = sqsClient.createQueue(b -> b.queueName(queueName)).queueUrl(); LOGGER.info("Queue created. The URL is: {}", queueUrl); } catch (RuntimeException e) { LOGGER.error("Program ending because queue was not created."); throw new RuntimeException(e); } try { addTags(queueUrl); listTags(queueUrl); removeTags(queueUrl); } catch (RuntimeException e) { LOGGER.error("Program ending because of an error in a method."); } finally { try { sqsClient.deleteQueue(b -> b.queueUrl(queueUrl)); LOGGER.info("Queue successfully deleted. Program ending."); sqsClient.close(); } catch (RuntimeException e) { LOGGER.error("Program ending."); } finally { sqsClient.close(); } } } /** This method demonstrates how to use a Java Map to a tag a aueue. * @param queueUrl The URL of the queue to tag. */ public static void addTags(String queueUrl) { // Build a map of the tags. final Map<String, String> tagsToAdd = Map.of( "Team", "Development", "Priority", "Beta", "Accounting ID", "456def"); try { // Add tags to the queue using a Consumer<TagQueueRequest.Builder> parameter. sqsClient.tagQueue(b -> b .queueUrl(queueUrl) .tags(tagsToAdd) ); } catch (QueueDoesNotExistException e) { LOGGER.error("Queue does not exist: {}", e.getMessage(), e); throw new RuntimeException(e); } } /** This method demonstrates how to view the tags for a queue. * @param queueUrl The URL of the queue whose tags you want to list. */ public static void listTags(String queueUrl) { ListQueueTagsResponse response; try { // Call the listQueueTags method with a Consumer<ListQueueTagsRequest.Builder> parameter that creates a ListQueueTagsRequest. response = sqsClient.listQueueTags(b -> b .queueUrl(queueUrl)); } catch (SqsException e) { LOGGER.error("Exception thrown: {}", e.getMessage(), e); throw new RuntimeException(e); } // Log the tags. response.tags() .forEach((k, v) -> LOGGER.info("Key: {} -> Value: {}", k, v)); } /** * This method demonstrates how to remove tags from a queue. * @param queueUrl The URL of the queue whose tags you want to remove. */ public static void removeTags(String queueUrl) { try { // Call the untagQueue method with a Consumer<UntagQueueRequest.Builder> parameter. sqsClient.untagQueue(b -> b .queueUrl(queueUrl) .tagKeys("Accounting ID") // Remove a single tag. ); } catch (SqsException e) { LOGGER.error("Exception thrown: {}", e.getMessage(), e); throw new RuntimeException(e); } } }

Serverless examples

The following code example shows how to implement a Lambda function that receives an event triggered by receiving messages from an SQS queue. The function retrieves the messages from the event parameter and logs the content of each message.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming an SQS event with Lambda using Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; public class Function implements RequestHandler<SQSEvent, Void> { @Override public Void handleRequest(SQSEvent sqsEvent, Context context) { for (SQSMessage msg : sqsEvent.getRecords()) { processMessage(msg, context); } context.getLogger().log("done"); return null; } private void processMessage(SQSMessage msg, Context context) { try { context.getLogger().log("Processed message " + msg.getBody()); // TODO: Do interesting work based on the new message } catch (Exception e) { context.getLogger().log("An error occurred"); throw e; } } }

The following code example shows how to implement partial batch response for Lambda functions that receive events from an SQS queue. The function reports the batch item failures in the response, signaling to Lambda to retry those messages later.

SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Reporting SQS batch item failures with Lambda using Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(message.getMessageId())); } } return new SQSBatchResponse(batchItemFailures); } }