Managing large Amazon SQS messages using Java and Amazon S3 - Amazon Simple Queue Service

Managing large Amazon SQS messages using Java and Amazon S3

You the can use the Amazon SQS Extended Client Library for Java and Amazon Simple Storage Service (Amazon S3) to manage large Amazon Simple Queue Service (Amazon SQS) messages. This is especially useful for consuming large message payloads, from 256 KB and up to 2 GB. The library saves the message payload to an Amazon S3 bucket and sends a message containing a reference of the stored Amazon S3 object to an Amazon Amazon SQS queue.

You can use the Amazon SQS Extended Client Library for Java to do the following:

  • Specify whether messages are always stored in Amazon S3 or only when the size of a message exceeds 256 KB

  • Send a message that references a single message object stored in an S3 bucket

  • Retrieve the message object from an S3 bucket

  • Delete the message object from an S3 bucket

Prerequisites

The following example uses the AWS Java SDK. To install and set up the SDK, see Set up the AWS SDK for Java in the AWS SDK for Java Developer Guide.

Before you run the example code, configure your AWS credentials. For more information, see Set up AWS Credentials and Region for Development in the AWS SDK for Java Developer Guide.

The SDK for Java and Amazon SQS Extended Client Library for Java require the J2SE Development Kit 8.0 or later.

Note

You can use the Amazon SQS Extended Client Library for Java to manage Amazon SQS messages using Amazon S3 only with the AWS SDK for Java. You can't do this with the AWS CLI, the Amazon SQS console, the Amazon SQS HTTP API, or any of the other AWS SDKs.

AWS SDK for Java 1.x Example: Using Amazon S3 to manage large Amazon SQS messages

The following AWS SDK for Java 2.x example creates an Amazon S3 bucket with a random name and adds a lifecycle rule to permanently delete objects after 14 days. It also creates a queue named MyQueue and sends a random message that is stored in an S3 bucket and is more than 256 KB to the queue. Finally, the code retrieves the message, returns information about it, and then deletes the message, the queue, and the bucket.

/* * Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.*; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import java.util.Arrays; import java.util.List; import java.util.UUID; public class SQSExtendedClientExample { // Create an Amazon S3 bucket with a random name. private final static String S3_BUCKET_NAME = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); public static void main(String[] args) { /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see * Creating Service Clients in the AWS SDK for Java Developer Guide. */ final AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient(); /* * Set the Amazon S3 bucket name, and then set a lifecycle rule on the * bucket to permanently delete objects 14 days after each object's * creation date. */ final BucketLifecycleConfiguration.Rule expirationRule = new BucketLifecycleConfiguration.Rule(); expirationRule.withExpirationInDays(14).withStatus("Enabled"); final BucketLifecycleConfiguration lifecycleConfig = new BucketLifecycleConfiguration().withRules(expirationRule); // Create the bucket and allow message objects to be stored in the bucket. s3.createBucket(S3_BUCKET_NAME); s3.setBucketLifecycleConfiguration(S3_BUCKET_NAME, lifecycleConfig); System.out.println("Bucket created and configured."); /* * Set the Amazon SQS extended client configuration with large payload * support enabled. */ final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withLargePayloadSupportEnabled(s3, S3_BUCKET_NAME); final AmazonSQS sqsExtended = new AmazonSQSExtendedClient(AmazonSQSClientBuilder .defaultClient(), extendedClientConfig); /* * Create a long string of characters for the message object which will * be stored in the bucket. */ int stringLength = 300000; char[] chars = new char[stringLength]; Arrays.fill(chars, 'x'); final String myLongString = new String(chars); // Create a message queue for this example. final String QueueName = "MyQueue" + UUID.randomUUID().toString(); final CreateQueueRequest createQueueRequest = new CreateQueueRequest(QueueName); final String myQueueUrl = sqsExtended .createQueue(createQueueRequest).getQueueUrl(); System.out.println("Queue created."); // Send the message. final SendMessageRequest myMessageRequest = new SendMessageRequest(myQueueUrl, myLongString); sqsExtended.sendMessage(myMessageRequest); System.out.println("Sent the message."); // Receive the message. final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqsExtended .receiveMessage(receiveMessageRequest).getMessages(); // Print information about the message. for (Message message : messages) { System.out.println("\nMessage received."); System.out.println(" ID: " + message.getMessageId()); System.out.println(" Receipt handle: " + message.getReceiptHandle()); System.out.println(" Message body (first 5 characters): " + message.getBody().substring(0, 5)); } // Delete the message, the queue, and the bucket. final String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqsExtended.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle)); System.out.println("Deleted the message."); sqsExtended.deleteQueue(new DeleteQueueRequest(myQueueUrl)); System.out.println("Deleted the queue."); deleteBucketAndAllContents(s3); System.out.println("Deleted the bucket."); } private static void deleteBucketAndAllContents(AmazonS3 client) { ObjectListing objectListing = client.listObjects(S3_BUCKET_NAME); while (true) { for (S3ObjectSummary objectSummary : objectListing .getObjectSummaries()) { client.deleteObject(S3_BUCKET_NAME, objectSummary.getKey()); } if (objectListing.isTruncated()) { objectListing = client.listNextBatchOfObjects(objectListing); } else { break; } } final VersionListing list = client.listVersions( new ListVersionsRequest().withBucketName(S3_BUCKET_NAME)); for (S3VersionSummary s : list.getVersionSummaries()) { client.deleteVersion(S3_BUCKET_NAME, s.getKey(), s.getVersionId()); } client.deleteBucket(S3_BUCKET_NAME); } }

AWS SDK for Java 2.x Example: Using Amazon S3 to manage large Amazon SQS messages

The following AWS SDK for Java 2.x example creates an Amazon S3 bucket with a random name and adds a lifecycle rule to permanently delete objects after 14 days. It also creates a queue named MyQueue and sends a random message that is stored in an S3 bucket and is more than 256 KB to the queue. Finally, the code retrieves the message, returns information about it, and then deletes the message, the queue, and the bucket.

/* * Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.ExpirationStatus; import software.amazon.awssdk.services.s3.model.LifecycleExpiration; import software.amazon.awssdk.services.s3.model.LifecycleRule; import software.amazon.awssdk.services.s3.model.LifecycleRuleFilter; import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.PutBucketLifecycleConfigurationRequest; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import java.util.Arrays; import java.util.List; import java.util.UUID; /** * Examples of using Amazon SQS Extended Client Library for Java 2.x * */ public class SqsExtendedClientExamples { // Create an Amazon S3 bucket with a random name. private final static String S3_BUCKET_NAME = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); public static void main(String[] args) { /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see * Creating Service Clients in the AWS SDK for Java Developer Guide. */ final S3Client s3 = S3Client.create(); /* * Set the Amazon S3 bucket name, and then set a lifecycle rule on the * bucket to permanently delete objects 14 days after each object's * creation date. */ final LifecycleRule lifeCycleRule = LifecycleRule.builder() .expiration(LifecycleExpiration.builder().days(14).build()) .filter(LifecycleRuleFilter.builder().prefix("").build()) .status(ExpirationStatus.ENABLED) .build(); final BucketLifecycleConfiguration lifecycleConfig = BucketLifecycleConfiguration.builder() .rules(lifeCycleRule) .build(); // Create the bucket and configure it s3.createBucket(CreateBucketRequest.builder().bucket(S3_BUCKET_NAME).build()); s3.putBucketLifecycleConfiguration(PutBucketLifecycleConfigurationRequest.builder() .bucket(S3_BUCKET_NAME) .lifecycleConfiguration(lifecycleConfig) .build()); System.out.println("Bucket created and configured."); // Set the Amazon SQS extended client configuration with large payload support enabled final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration().withPayloadSupportEnabled(s3, S3_BUCKET_NAME); final SqsClient sqsExtended = new AmazonSQSExtendedClient(SqsClient.builder().build(), extendedClientConfig); // Create a long string of characters for the message object int stringLength = 300000; char[] chars = new char[stringLength]; Arrays.fill(chars, 'x'); final String myLongString = new String(chars); // Create a message queue for this example final String queueName = "MyQueue-" + UUID.randomUUID(); final CreateQueueResponse createQueueResponse = sqsExtended.createQueue(CreateQueueRequest.builder().queueName(queueName).build()); final String myQueueUrl = createQueueResponse.queueUrl(); System.out.println("Queue created."); // Send the message final SendMessageRequest sendMessageRequest = SendMessageRequest.builder() .queueUrl(myQueueUrl) .messageBody(myLongString) .build(); sqsExtended.sendMessage(sendMessageRequest); System.out.println("Sent the message."); // Receive the message final ReceiveMessageResponse receiveMessageResponse = sqsExtended.receiveMessage(ReceiveMessageRequest.builder().queueUrl(myQueueUrl).build()); List<Message> messages = receiveMessageResponse.messages(); // Print information about the message for (Message message : messages) { System.out.println("\nMessage received."); System.out.println(" ID: " + message.messageId()); System.out.println(" Receipt handle: " + message.receiptHandle()); System.out.println(" Message body (first 5 characters): " + message.body().substring(0, 5)); } // Delete the message, the queue, and the bucket final String messageReceiptHandle = messages.get(0).receiptHandle(); sqsExtended.deleteMessage(DeleteMessageRequest.builder().queueUrl(myQueueUrl).receiptHandle(messageReceiptHandle).build()); System.out.println("Deleted the message."); sqsExtended.deleteQueue(DeleteQueueRequest.builder().queueUrl(myQueueUrl).build()); System.out.println("Deleted the queue."); deleteBucketAndAllContents(s3); System.out.println("Deleted the bucket."); } private static void deleteBucketAndAllContents(S3Client client) { ListObjectsV2Response listObjectsResponse = client.listObjectsV2(ListObjectsV2Request.builder().bucket(S3_BUCKET_NAME).build()); listObjectsResponse.contents().forEach(object -> { client.deleteObject(DeleteObjectRequest.builder().bucket(S3_BUCKET_NAME).key(object.key()).build()); }); ListObjectVersionsResponse listVersionsResponse = client.listObjectVersions(ListObjectVersionsRequest.builder().bucket(S3_BUCKET_NAME).build()); listVersionsResponse.versions().forEach(version -> { client.deleteObject(DeleteObjectRequest.builder().bucket(S3_BUCKET_NAME).key(version.key()).versionId(version.versionId()).build()); }); client.deleteBucket(DeleteBucketRequest.builder().bucket(S3_BUCKET_NAME).build()); } }

You can use Apache Maven to configure and build Amazon SQS Extended Client for your Java project, or to build the SDK itself. Specify individual modules from the SDK that you use in your application.

<properties> <aws-java-sdk.version>2.20.153</aws-java-sdk.version> </properties> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> <version>${aws-java-sdk.version}</version> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>s3</artifactId> <version>${aws-java-sdk.version}</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-extended-client-lib</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.12.6</version> </dependency> </dependencies>