Menu
Amazon Simple Queue Service
Developer Guide (API Version 2012-11-05)

Using JMS with Amazon SQS

The Amazon SQS Java Messaging Library is a JMS interface to Amazon SQS that enables you to leverage Amazon SQS in your applications that already use JMS. The interface enables you to use Amazon SQS as the JMS provider with minimal code changes. Along with the AWS SDK for Java, the Amazon SQS Java Messaging Library enables you to create JMS connections, sessions, producers, and consumers to send and receive messages to and from Amazon SQS queues.

The Amazon SQS Java Messaging Library supports sending and receiving messages to a queue (the JMS point-to-point model), as specified in the JMS 1.1 specification. More specifically, the Amazon SQS Java Messaging Library supports synchronously sending text, byte, or object messages to Amazon SQS queues, as well as receiving them synchronously or asynchronously. For more information about the supported features of the JMS 1.1 specification in the Amazon SQS Java Messaging Library, see Reference/Appendix and the Amazon SQS FAQs.

You must create an Amazon SQS queue to use with JMS. To create an Amazon SQS queue, you can use the AWS Management Console for Amazon SQS, the CreateQueue API, or the wrapped Amazon SQS client included in the Amazon SQS Java Messaging Library. For information about creating a queue with Amazon SQS using either the AWS Management Console or the CreateQueue API, go to Creating a Queue. For information about using the Amazon SQS Java Messaging Library, go to Getting Started with the Amazon SQS Java Messaging Library.

Amazon SQS and JMS Prerequisites

To use Amazon SQS with JMS, you must have the following:

  • SDK for Java – There are two different ways for including the SDK for Java in your project. You can either download and install the SDK for Java, or if you use Maven to obtain the Amazon SQS Java Messaging Library, then the SDK for Java is included as a dependency. The SDK for Java and Amazon SQS Java Messaging Library require J2SE Development Kit 6.0 or later. For information about downloading the SDK for Java, go to SDK for Java. For more information about using Maven see the note below.

  • Amazon SQS Java Messaging Library – If you do not use Maven, then you must add the package file, amazon-sqs-java-messaging-lib.jar, to the Java build class path. For information about downloading, go to Amazon SQS Java Messaging Library.

Note

The Amazon SQS Java Messaging Library includes support for Maven and the Spring Framework. For code samples that use Maven, the Spring Framework, and the Amazon SQS Java Messaging Library, see Code Samples.

<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>amazon-sqs-java-messaging-lib</artifactId>
  <version>1.0.0</version>
  <type>jar</type>
</dependency>

Getting Started with the Amazon SQS Java Messaging Library

After you have met the prerequisites, you can then use the following code examples to get started using Amazon SQS with JMS.

This example creates a JMS connection and session, and then sends and receives a message. The wrapped Amazon SQS client, which is included in the Amazon SQS Java Messaging Library, also checks if the Amazon SQS queue exists. It also uses the wrapped Amazon SQS client to check if the Amazon SQS queue exists. If it does not exist then the queue is created.

Creating a JMS Connection

In order to create a JMS connection, you must first create a connection factory and call the createConnection method against the factory:

// Create the connection factory using the environment variable credential provider.
// Connections this factory creates can talk to the queues in us-east-1 region. 
SQSConnectionFactory connectionFactory =
    SQSConnectionFactory.builder()
        .withRegion(Region.getRegion(Regions.US_EAST_1))
        .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
        .build();
 
// Create the connection.
SQSConnection connection = connectionFactory.createConnection();

Note

The EnvironmentVariableCredentialsProvider class in this example assumes that the AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY) environment variables are set. For more information, including other options for providing the required credentials to the factory, go to AWSCredentialsProvider.

The SQSConnection class extends javax.jms.Connection. Along with the JMS standard connection methods, SQSConnection offers additional methods, such as getAmazonSQSClient and getWrappedAmazonSQSClient. The returned client objects from getAmazonSQSClient and getWrappedAmazonSQSClient can be used to perform administrative operations that are not included in the JMS specification – for example, creating an Amazon SQS queue.

If you use getWrappedAmazonSQSClient, then the returned client object will transform all exceptions into JMS exceptions. On the other hand, if you use getAmazonSQSClient, then the exceptions will be Amazon SQS exceptions. Therefore, if you have existing code that is expecting JMS exceptions, then you should use getWrappedAmazonSQSClient.

Creating an Amazon SQS Queue

This example checks if an Amazon SQS queue exists, and if not then it creates one. Next, the wrapped client object is used to check if the Amazon SQS queue exists, and if not a new one is created:

// Get the wrapped client
AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
 
// Create an SQS queue named 'TestQueue' – if it does not already exist.
if (!client.queueExists("TestQueue")) {
    client.createQueue("TestQueue");
}

Sending Messages Synchronously

With the connection and underlying Amazon SQS queue ready, a non-transacted JMS session with AUTO_ACKNOWLEDGE mode is created:

// Create the non-transacted session with AUTO_ACKNOWLEDGE mode
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Next, in order to send a text message to the queue, a JMS queue identity and message producer are created:

// Create a queue identity with name 'TestQueue' in the session
Queue queue = session.createQueue("TestQueue");
 
// Create a producer for the 'TestQueue'.
MessageProducer producer = session.createProducer(queue);

A text message is created and sent to the queue:

// Create the text message.
TextMessage message = session.createTextMessage("Hello World!");
 
// Send the message.
producer.send(message);
System.out.println("JMS Message " + message.getJMSMessageID());

Receiving Messages Synchronously

To receive messages, a consumer is created on the same queue and the start method is invoked. The start method on the connection can be called anytime; however, the consumer will not start receiving messages until after it has been called.

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);
 
// Start receiving incoming messages.
connection.start();

The receive method is called on the consumer with a time-out set to 1 second and the content of the received message is printed:

// Receive a message from 'TestQueue' and wait up to 1 second
Message receivedMessage = consumer.receive(1000);
 
// Cast the received message as TextMessage and print the text to screen.
if (receivedMessage != null) {
    System.out.println("Received: " + ((TextMessage) receivedMessage).getText());
}

Finally, the connection is closed, which also closes the session:

// Close the connection (and the session).
connection.close();

The output will look similar to the following:

JMS Message ID:8example-588b-44e5-bbcf-d816example2
Received: Hello World!

Note

You can also use Spring to initialize these objects. For additional information, see SpringExampleConfig.xml, SpringExample.java, and other helper classes in ExampleConfiguration.java and ExampleCommon.java, located in Code Samples.

For complete examples of send and receive, see TextMessageSender.java and SyncMessageReceiver.java.

Receiving Messages Asynchronously

In the example provided in Getting Started with the Amazon SQS Java Messaging Library, a message is sent to TestQueue and received synchronously. This example shows how to receive the messages asynchronously through a listener.

First, the MessageListener interface is implemented:

class MyListener implements MessageListener {
 
    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

The onMessage method of the MessageListener interface is called when a message is received. In this listener implementation, the text stored in the message is printed.

Next, instead of explicitly calling the receive method on the consumer, the message listener of the consumer to an instance of the MyListener implementation is set. And, the main thread sleeps for a second:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);
 
// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());
 
// Start receiving incoming messages.
connection.start();
 
// Wait for 1 second. The listener onMessage() method will be invoked when a message is received.
Thread.sleep(1000);

The rest of the steps are identical to the ones provided in the example in Getting Started with the Amazon SQS Java Messaging Library. For a complete example of an asynchronous receiver, see AsyncMessageReceiver.java in Code Samples.

The output for this example will look similar to the following:

JMS Message ID:8example-588b-44e5-bbcf-d816example2
Received: Hello World!

Client Acknowledge Mode

In the example provided in Getting Started with the Amazon SQS Java Messaging Library, AUTO_ACKNOWLEDGE mode is used, where every received message is automatically acknowledged – and therefore deleted from the underlying Amazon SQS queue. If you want to explicitly acknowledge the messages after they are done being processed, then you must create the session with CLIENT_ACKNOWLEDGE mode:

// Create the non-transacted session with CLIENT_ACKNOWLEDGE mode.
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Next, when the message is received, it is both displayed and explicitly acknowledged:

// Cast the received message as TextMessage and print the text to screen. Also acknowledge the message.
if (receivedMessage != null) {
    System.out.println("Received: " + ((TextMessage) receivedMessage).getText());
    receivedMessage.acknowledge();
    System.out.println("Acknowledged: " + message.getJMSMessageID());
}

Note

In this mode, when a message is acknowledged, then all the messages received prior are implicitly acknowledged as well. For example, if 10 messages are received, and only the 10th message (in the order they are received) is acknowledged, then all the previous 9 messages are also acknowledged.

The rest of the steps are identical to the ones provided in the example in Getting Started with the Amazon SQS Java Messaging Library. For a complete example of a synchronous receiver with client acknowledge mode, see SyncMessageReceiverClientAcknowledge.java in Code Samples.

The output for this example will look similar to the following:

JMS Message ID:4example-aa0e-403f-b6df-5e02example5
Received: Hello World!
Acknowledged: ID:4example-aa0e-403f-b6df-5e02example5

Unordered Acknowledge Mode

In the previous example shown in Client Acknowledge Mode, (when using CLIENT_ACKNOWLEDGE mode) all the messages received prior to an explicitly acknowledged message are automatically acknowledged. In the JMS 1.1 specification this is referred to as the ACK-RANGE. The Amazon SQS Java Messaging Library provides another acknowledgement mode, UNORDERED_ACKNOWLEDGE, where all received messages must be explicitly acknowledged by the client – regardless of the order they are received. In other words, when using UNORDERED_ACKNOWLEDGE mode and a message is acknowledged, then only that specific message is acknowledged.

In this example, a session is created with UNORDERED_ACKNOWLEDGE mode:

// Create the non-transacted session with UNORDERED_ACKNOWLEDGE mode.
Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);

The rest of the steps are identical to the ones provided in the example in Client Acknowledge Mode. For a complete example of a synchronous receiver with unordered acknowledge mode, see SyncMessageReceiverUnorderedAcknowledge.java in Code Samples.

The output for this example will look similar to the following:

JMS Message ID:dexample-73ad-4adb-bc6c-4357example7
Received: Hello World!
Acknowledged: ID:dexample-73ad-4adb-bc6c-4357example7

Code Samples

The following code samples show how to use JMS with Amazon SQS.

ExampleConfiguration.java

The following Java code is a configuration example that can be used for all subsequent Java examples. For example, this code sets the default queue name, region, and credentials to be used with the other java examples.

public class ExampleConfiguration {
    public static final String DEFAULT_QUEUE_NAME = "SQSJMSClientExampleQueue";
    
    public static final Region DEFAULT_REGION = Region.getRegion(Regions.US_EAST_1);
    
    private static String getParameter( String args[], int i ) {
        if( i + 1 >= args.length ) {
            throw new IllegalArgumentException( "Missing parameter for " + args[i] );
        }
        return args[i+1];
    }
    
    /**
     * Parse the command line and return the resulting config. If the config parsing fails
     * print the error and the usage message and then call System.exit
     * 
     * @param app the app to use when printing the usage string
     * @param args the command line arguments
     * @return the parsed config
     */
    public static ExampleConfiguration parseConfig(String app, String args[]) {
        try {
            return new ExampleConfiguration(args);
        } catch (IllegalArgumentException e) {
            System.err.println( "ERROR: " + e.getMessage() );
            System.err.println();
            System.err.println( "Usage: " + app + " [--queue <queue>] [--region <region>] [--credentials <credentials>] ");
            System.err.println( "  or" );
            System.err.println( "       " + app + " <spring.xml>" );
            System.exit(-1);
            return null;
        }
    }
    
    private ExampleConfiguration(String args[]) {
        for( int i = 0; i < args.length; ++i ) {
            String arg = args[i];
            if( arg.equals( "--queue" ) ) {
                setQueueName(getParameter(args, i));
                i++;
            } else if( arg.equals( "--region" ) ) {
                String regionName = getParameter(args, i);
                try {
                    setRegion(Region.getRegion(Regions.fromName(regionName)));
                } catch( IllegalArgumentException e ) {
                    throw new IllegalArgumentException( "Unrecognized region " + regionName );  
                }
                i++;
            } else if( arg.equals( "--credentials" ) ) {
                String credsFile = getParameter(args, i);
                try {
                    setCredentialsProvider( new PropertiesFileCredentialsProvider(credsFile) );
                } catch (AmazonClientException e) {
                    throw new IllegalArgumentException("Error reading credentials from " + credsFile, e );
                }
                i++;
            } else {
                throw new IllegalArgumentException("Unrecognized option " + arg);
            }
        }
    }
    
    private String queueName = DEFAULT_QUEUE_NAME;
    private Region region = DEFAULT_REGION;
    private AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
    
    public String getQueueName() {
        return queueName;
    }
    
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
    
    public Region getRegion() {
        return region;
    }
    
    public void setRegion(Region region) {
        this.region = region;
    }

    public AWSCredentialsProvider getCredentialsProvider() {
        return credentialsProvider;
    }
    
    public void setCredentialsProvider(AWSCredentialsProvider credentialsProvider) {
        // Make sure they're usable first
        credentialsProvider.getCredentials();
        this.credentialsProvider = credentialsProvider;
    }
}

TextMessageSender.java

The following Java code shows how to create a text message sender.

public class TextMessageSender {
    public static void main(String args[]) throws JMSException {
        ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);
        
        ExampleCommon.setupLogging();
        
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory = 
                SQSConnectionFactory.builder()
                    .withRegion(config.getRegion())
                    .withAWSCredentialsProvider(config.getCredentialsProvider())
                    .build();
        
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
        
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
            
        // Create the session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );
        
        sendMessages(session, producer);

        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println( "Connection closed" );
    }

    private static void sendMessages( Session session, MessageProducer producer ) {
        BufferedReader inputReader = new BufferedReader(
                new InputStreamReader( System.in, Charset.defaultCharset() ) );
        
        try {
            String input;
            while( true ) { 
                System.out.print( "Enter message to send (leave empty to exit): " );
                input = inputReader.readLine();
                if( input == null || input.equals("" ) ) break;
                
                TextMessage message = session.createTextMessage(input);
                producer.send(message);
                System.out.println( "Send message " + message.getJMSMessageID() );
            }
        } catch (EOFException e) {
            // Just return on EOF
        } catch (IOException e) {
            System.err.println( "Failed reading input: " + e.getMessage() );
        } catch (JMSException e) {
            System.err.println( "Failed sending message: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}

AsyncMessageReceiver.java

The following Java code shows how to create an asynchronous message receiver.

public class AsyncMessageReceiver {
     public static void main(String args[]) throws JMSException, InterruptedException {
          ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);
          
          ExampleCommon.setupLogging();          
               
          // Create the session
          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
          MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );
          
          ReceiverCallback callback = new ReceiverCallback();
          consumer.setMessageListener( callback );

          // No messages will be processed until this is called
          connection.start();
          
          callback.waitForOneMinuteOfSilence();
          System.out.println( "Returning after one minute of silence" );

          // Close the connection. This will close the session automatically
          connection.close();
          System.out.println( "Connection closed" );
     }
     
     
     private static class ReceiverCallback implements MessageListener {
          // Used to listen for message silence
          private volatile long timeOfLastMessage = System.nanoTime();
          
          public void waitForOneMinuteOfSilence() throws InterruptedException {
               for(;;) {
                    long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;
                    long remainingTillOneMinuteOfSilence = 
                              TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;
                    if( remainingTillOneMinuteOfSilence < 0 ) {
                         break;
                    }
                    TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);
               }
          }
          

          @Override
          public void onMessage(Message message) {
               try {
                    ExampleCommon.handleMessage(message);
                    message.acknowledge();
                    System.out.println( "Acknowledged message " + message.getJMSMessageID() );
                    timeOfLastMessage = System.nanoTime();
               } catch (JMSException e) {
                    System.err.println( "Error processing message: " + e.getMessage() );
                    e.printStackTrace();
               }
          }
     }
}

SyncMessageReceiver.java

The following Java code shows how to create a synchronous message receiver.

public class SyncMessageReceiver {
    public static void main(String args[]) throws JMSException {
        ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);
        
        ExampleCommon.setupLogging();
        
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory = 
                SQSConnectionFactory.builder()
                    .withRegion(config.getRegion())
                    .withAWSCredentialsProvider(config.getCredentialsProvider())
                    .build();
        
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
        
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
            
        // Create the session
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

        connection.start();
        
        receiveMessages(session, consumer);

        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println( "Connection closed" );
    }

    private static void receiveMessages( Session session, MessageConsumer consumer ) {
        try {
            while( true ) {
                System.out.println( "Waiting for messages");
                // Wait 1 minute for a message
                Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
                if( message == null ) {
                    System.out.println( "Shutting down after 1 minute of silence" );
                    break;
                }
                ExampleCommon.handleMessage(message);
                message.acknowledge();
                System.out.println( "Acknowledged message " + message.getJMSMessageID() );
            }
        } catch (JMSException e) {
            System.err.println( "Error receiving from SQS: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}

SyncMessageReceiverClientAcknowledge.java

The following Java code shows an example of a synchronous receiver with client acknowledge mode.

/**
 * An example class to demonstrate the behavior of CLIENT_ACKNOWLEDGE mode for received messages. This example
 * complements the example given in {@link SyncMessageReceiverUnorderedAcknowledge} for UNORDERED_ACKNOWLEDGE mode.
 *
 * First, a session, a message producer, and a message consumer are created. Then, two messages are sent. Next, two messages
 * are received but only the second one is acknowledged. After waiting for the visibility time out period, an attempt to
 * receive another message is made. It is shown that no message is returned for this attempt since in CLIENT_ACKNOWLEDGE mode,
 * as expected, all the messages prior to the acknowledged messages are also acknowledged.
 *
 * This is NOT the behavior for UNORDERED_ACKNOWLEDGE mode. Please see {@link SyncMessageReceiverUnorderedAcknowledge}
 * for an example.
 */
public class SyncMessageReceiverClientAcknowledge {

    // Visibility time-out for the queue. It must match to the one set for the queue for this example to work.
    private static final long TIME_OUT_SECONDS = 1;

    public static void main(String args[]) throws JMSException, InterruptedException {
        // Create the configuration for the example
        ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiverClientAcknowledge", args);

        // Setup logging for the example
        ExampleCommon.setupLogging();

        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory =
                SQSConnectionFactory.builder()
                        .withRegion(config.getRegion())
                        .withAWSCredentialsProvider(config.getCredentialsProvider())
                        .build();

        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();

        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());

        // Create the session  with client acknowledge mode
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        // Create the producer and consume
        MessageProducer producer = session.createProducer(session.createQueue(config.getQueueName()));
        MessageConsumer consumer = session.createConsumer(session.createQueue(config.getQueueName()));

        // Open the connection
        connection.start();

        // Send two text messages
        sendMessage(producer, session, "Message 1");
        sendMessage(producer, session, "Message 2");

        // Receive a message and don't acknowledge it
        receiveMessage(consumer, false);

        // Receive another message and acknowledge it
        receiveMessage(consumer, true);

        // Wait for the visibility time out, so that unacknowledged messages reappear in the queue
        System.out.println("Waiting for visibility timeout...");
        Thread.sleep(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));

        // Attempt to receive another message and acknowledge it. This will result in receiving no messages since
        // we have acknowledged the second message. Although we did not explicitly acknowledge the first message,
        // in the CLIENT_ACKNOWLEDGE mode, all the messages received prior to the explicitly acknowledged message
        // are also acknowledged. Therefore, we have implicitly acknowledged the first message.
        receiveMessage(consumer, true);

        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println("Connection closed.");
    }

    /**
     * Sends a message through the producer.
     *
     * @param producer Message producer
     * @param session Session
     * @param messageText Text for the message to be sent
     * @throws JMSException
     */
    private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException {
        // Create a text message and send it
        producer.send(session.createTextMessage(messageText));
    }

    /**
     * Receives a message through the consumer synchronously with the default timeout (TIME_OUT_SECONDS).
     * If a message is received, the message is printed. If no message is received, "Queue is empty!" is
     * printed.
     *
     * @param consumer Message consumer
     * @param acknowledge If true and a message is received, the received message is acknowledged.
     * @throws JMSException
     */
    private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException {
        // Receive a message
        Message message = consumer.receive(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));

        if (message == null) {
            System.out.println("Queue is empty!");
        } else {
            // Since this queue has only text messages, cast the message object and print the text
            System.out.println("Received: " + ((TextMessage) message).getText());

            // Acknowledge the message if asked
            if (acknowledge) message.acknowledge();
        }
    }
}

SyncMessageReceiverUnorderedAcknowledge.java

The following Java code shows an example of a synchronous receiver with unordered acknowledge mode.

/**
 * An example class to demonstrate the behavior of UNORDERED_ACKNOWLEDGE mode for received messages. This example
 * complements the example given in {@link SyncMessageReceiverClientAcknowledge} for CLIENT_ACKNOWLEDGE mode.
 *
 * First, a session, a message producer, and a message consumer are created. Then, two messages are sent. Next, two messages
 * are received but only the second one is acknowledged. After waiting for the visibility time out period, an attempt to
 * receive another message is made. It is shown that the first message received in the prior attempt is returned again
 * for the second attempt. In UNORDERED_ACKNOWLEDGE mode, all the messages must be explicitly acknowledged no matter what
 * the order they are received.
 *
 * This is NOT the behavior for CLIENT_ACKNOWLEDGE mode. Please see {@link SyncMessageReceiverClientAcknowledge}
 * for an example.
 */
public class SyncMessageReceiverUnorderedAcknowledge {

    // Visibility time-out for the queue. It must match to the one set for the queue for this example to work.
    private static final long TIME_OUT_SECONDS = 1;

    public static void main(String args[]) throws JMSException, InterruptedException {
        // Create the configuration for the example
        ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiverUnorderedAcknowledge", args);

        // Setup logging for the example
        ExampleCommon.setupLogging();

        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory =
                SQSConnectionFactory.builder()
                        .withRegion(config.getRegion())
                        .withAWSCredentialsProvider(config.getCredentialsProvider())
                        .build();

        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();

        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());

        // Create the session  with unordered acknowledge mode
        Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);

        // Create the producer and consume
        MessageProducer producer = session.createProducer(session.createQueue(config.getQueueName()));
        MessageConsumer consumer = session.createConsumer(session.createQueue(config.getQueueName()));

        // Open the connection
        connection.start();

        // Send two text messages
        sendMessage(producer, session, "Message 1");
        sendMessage(producer, session, "Message 2");

        // Receive a message and don't acknowledge it
        receiveMessage(consumer, false);

        // Receive another message and acknowledge it
        receiveMessage(consumer, true);

        // Wait for the visibility time out, so that unacknowledged messages reappear in the queue
        System.out.println("Waiting for visibility timeout...");
        Thread.sleep(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));

        // Attempt to receive another message and acknowledge it. This will result in receiving the first message since
        // we have acknowledged only the second message. In the UNORDERED_ACKNOWLEDGE mode, all the messages must
        // be explicitly acknowledged.
        receiveMessage(consumer, true);

        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println("Connection closed.");
    }

    /**
     * Sends a message through the producer.
     *
     * @param producer Message producer
     * @param session Session
     * @param messageText Text for the message to be sent
     * @throws JMSException
     */
    private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException {
        // Create a text message and send it
        producer.send(session.createTextMessage(messageText));
    }

    /**
     * Receives a message through the consumer synchronously with the default timeout (TIME_OUT_SECONDS).
     * If a message is received, the message is printed. If no message is received, "Queue is empty!" is
     * printed.
     *
     * @param consumer Message consumer
     * @param acknowledge If true and a message is received, the received message is acknowledged.
     * @throws JMSException
     */
    private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException {
        // Receive a message
        Message message = consumer.receive(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));

        if (message == null) {
            System.out.println("Queue is empty!");
        } else {
            // Since this queue has only text messages, cast the message object and print the text
            System.out.println("Received: " + ((TextMessage) message).getText());

            // Acknowledge the message if asked
            if (acknowledge) message.acknowledge();
        }
    }
}

SpringExampleConfig.xml

The following shows an example Spring configuration file.

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
    ">
    
    <bean id="CredentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/>
    
    <bean id="ConnectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
    	<property name="regionName" value="us-east-1"/>
    	<property name="numberOfMessagesToPrefetch" value="5"/>
    	
    	<property name="awsCredentialsProvider" ref="CredentialsProviderBean"/>
    </bean>
    
	<bean id="ConnectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
		factory-bean="ConnectionFactoryBuilder"
		factory-method="build" />
		
	<bean id="Connection" class="javax.jms.Connection"
		factory-bean="ConnectionFactory"
		factory-method="createConnection"
		init-method="start"
		destroy-method="close" />
		 
    <bean id="QueueName" class="java.lang.String">
    	<constructor-arg value="SQSJMSClientExampleQueue"/>
    </bean>
    
</beans>

SpringExample.java

The following Java code shows an example that uses SpringExampleConfig.xml.

public class SpringExample {
    public static void main(String args[]) throws JMSException {
        if( args.length != 1 || !args[0].endsWith(".xml")) {
            System.err.println( "Usage: " + SpringExample.class.getName() + " <spring config.xml>" );
            System.exit(1);
        }
        
        File springFile = new File( args[0] );
        if( !springFile.exists() || !springFile.canRead() ) {
            System.err.println( "File " + args[0] + " does not exist or is not readable.");
            System.exit(2);
        }
        
        ExampleCommon.setupLogging();
        
        FileSystemXmlApplicationContext context = 
                new FileSystemXmlApplicationContext( "file://" + springFile.getAbsolutePath() );
        
        Connection connection;
        try {
            connection = context.getBean(Connection.class);
        } catch( NoSuchBeanDefinitionException e ) {
            System.err.println( "Could not find the JMS connection to use: " + e.getMessage() );
            System.exit(3);
            return;
        }
        
        String queueName;
        try {
            queueName = context.getBean("QueueName", String.class);
        } catch( NoSuchBeanDefinitionException e ) {
            System.err.println( "Could not find the name of the queue to use: " + e.getMessage() );
            System.exit(3);
            return;
        }
        
        if( connection instanceof SQSConnection ) {
            ExampleCommon.ensureQueueExists( (SQSConnection) connection, queueName );
        }
        
        // Create the session
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer( session.createQueue( queueName) );
        
        receiveMessages(session, consumer);

        // The context can be setup to close the connection for us
        context.close();
        System.out.println( "Context closed" );
    }

    private static void receiveMessages( Session session, MessageConsumer consumer ) {
        try {
            while( true ) {
                System.out.println( "Waiting for messages");
                // Wait 1 minute for a message
                Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
                if( message == null ) {
                    System.out.println( "Shutting down after 1 minute of silence" );
                    break;
                }
                ExampleCommon.handleMessage(message);
                message.acknowledge();
                System.out.println( "Acknowledged message" );
            }
        } catch (JMSException e) {
            System.err.println( "Error receiving from SQS: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}

ExampleCommon.java

The following Java code first checks to see if an Amazon SQS queue exists and then creates one if it does not already exist. In addition, example logging code is provided.

public class ExampleCommon {
    /**
     * A utility function to check the queue exists and create it if needed. For most  
     * use cases this will usually be done by an administrator before the application
     * is run. 
     */
    public static void ensureQueueExists(SQSConnection connection, String queueName) throws JMSException {
        AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
        
        /**
         * For most cases this could be done with just a createQueue call, but GetQueueUrl 
         * (called by queueExists) is a faster operation for the common case where the queue 
         * already exists. Also many users and roles have permission to call GetQueueUrl
         * but do not have permission to call CreateQueue.
         */
        if( !client.queueExists(queueName) ) {
            client.createQueue( queueName );
        }
    }

    public static void setupLogging() {
        // Setup logging
        BasicConfigurator.configure();
        Logger.getRootLogger().setLevel(Level.WARN);
    }

    public static void handleMessage(Message message) throws JMSException {
        System.out.println( "Got message " + message.getJMSMessageID() );
        System.out.println( "Content: ");
        if( message instanceof TextMessage ) {
            TextMessage txtMessage = ( TextMessage ) message;
            System.out.println( "\t" + txtMessage.getText() );
        } else if( message instanceof BytesMessage ){
            BytesMessage byteMessage = ( BytesMessage ) message;
            // Assume the length fits in an int - SQS only supports sizes up to 256k so that
            // should be true
            byte[] bytes = new byte[(int)byteMessage.getBodyLength()];
            byteMessage.readBytes(bytes);
            System.out.println( "\t" +  Base64.encodeAsString( bytes ) );
        } else if( message instanceof ObjectMessage ) {
            ObjectMessage objMessage = (ObjectMessage) message;
            System.out.println( "\t" + objMessage.getObject() );
        }
    }
}

Reference/Appendix

The following lists some of the supported JMS 1.1 specification implementations in the Amazon SQS Java Messaging Library. For more information about the supported features and capabilities of the Amazon SQS Java Messaging Library, see the Amazon SQS FAQ.

Supported JMS 1.1 Common Interfaces:

  • ConnectionFactory

  • Connection

  • Destination

  • Session

  • MessageProducer

  • MessageConsumer

Supported Message Types

  • TextMessage

  • ByteMessage

  • ObjectMessage

Supported Message Acknowledgment Modes

  • DUPS_OK_ACKNOWLEDGE

  • AUTO_ACKNOWLEDGE

  • CLIENT_ACKNOWLEDGE

  • UNORDERED_ACKNOWLEDGE

Note

The UNORDERED_ACKNOWLEDGE acknowledgment mode is not part of the JMS 1.1 specification. It is added by Amazon SQS to allow for a JMS client to explicitly acknowledge the message.

JMS Defined Headers and Reserved Properties

The Amazon SQS JMS client sets the following JMS defined headers:

  • JMSDestination

  • JMSMessageID

  • JMSRedelivered

The Amazon SQS JMS client sets the following JMS reserved property:

  • JMSXDeliveryCount