Menu
Amazon Simple Queue Service
Developer Guide (API Version 2012-11-05)

Using JMS with Amazon SQS

The Amazon SQS Java Messaging Library is a JMS interface for Amazon SQS that lets you take advantage of Amazon SQS in applications that already use JMS. The interface lets you use Amazon SQS as the JMS provider with minimal code changes. Together with the AWS SDK for Java, the Amazon SQS Java Messaging Library lets you create JMS connections and sessions, as well as producers and consumers that send and receive messages to and from Amazon SQS queues.

The library supports sending and receiving messages to a queue (the JMS point-to-point model) according to the JMS 1.1 specification. The library supports sending text, byte, or object messages synchronously to Amazon SQS queues. The library also supports receiving objects synchronously or asynchronously.

For information about features of the Amazon SQS Java Messaging Library that support the JMS 1.1 specification, see Supported JMS 1.1 Implementations and the Amazon SQS FAQs.

Prerequisites

Before you begin, you must have the following prerequisites:

  • SDK for Java

    There are two ways to include the SDK for Java in your project:

    • Download and install the SDK for Java.

    • Use Maven to get the Amazon SQS Java Messaging Library. (The SDK for Java is included as a dependency. The SDK for Java and Amazon SQS Java Messaging Library require J2SE Development Kit 6.0 or later.)

      For information about downloading the SDK for Java, see SDK for Java.

  • Amazon SQS Java Messaging Library

    If you do not use Maven, you must add the package file amazon-sqs-java-messaging-lib.jar to the Java build class path.

    For information about downloading the library, see Amazon SQS Java Messaging Library.

    Note

    The Amazon SQS Java Messaging Library includes support for Maven and the Spring Framework.

    For code samples that use Maven, the Spring Framework, and the Amazon SQS Java Messaging Library, see Code Samples.

    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>amazon-sqs-java-messaging-lib</artifactId>
      <version>1.0.0</version>
      <type>jar</type>
    </dependency>

  • Amazon SQS Queue

    Create a queue using the AWS Management Console for Amazon SQS, the CreateQueue API, or the wrapped Amazon SQS client included in the Amazon SQS Java Messaging Library.

    For information about creating a queue with Amazon SQS using either the AWS Management Console or the CreateQueue API, see Creating a Queue.

    For information about using the Amazon SQS Java Messaging Library, see Getting Started with the Amazon SQS Java Messaging Library.

Getting Started with the Amazon SQS Java Messaging Library

To get started using JMS with Amazon SQS, use the code examples in this section. The following sections show how to create a JMS connection and a session, and how to send and receive a message.

The wrapped Amazon SQS client object included in the Amazon SQS Java Messaging Library checks if an Amazon SQS queue exists. If the queue does not exist, the client creates it.

Creating a JMS Connection

  1. Create a connection factory and call the createConnection method against the factory.

    Note

    The EnvironmentVariableCredentialsProvider class in the following example assumes that the AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY) environment variables are set.

    For more information on providing the required credentials to the factory, see Interface AWSCredentialsProvider.

    // Create the connection factory using the environment variable credential provider.
    // Connections this factory creates can talk to the queues in us-east-1 region. 
    SQSConnectionFactory connectionFactory =
        SQSConnectionFactory.builder()
            .withRegion(Region.getRegion(Regions.US_EAST_1))
            .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
            .build();
     
    // Create the connection.
    SQSConnection connection = connectionFactory.createConnection();

    The SQSConnection class extends javax.jms.Connection. Together with the JMS standard connection methods, SQSConnection offers additional methods, such as getAmazonSQSClient and getWrappedAmazonSQSClient. Both methods let you perform administrative operations not included in the JMS specification, such as creating new queues. However, the getWrappedAmazonSQSClient method also provides a wrapped version of the Amazon SQS client used by the current connection. The wrapper transforms every exception from the client into an JMSException, allowing it to be more easily used by existing code that expects JMSException occurrences.

  2. You can use the client objects returned from getAmazonSQSClient and getWrappedAmazonSQSClient to perform administrative operations not included in the JMS specification (for example, you can create an Amazon SQS queue).

    If you have existing code that is expecting JMS exceptions, then you should use getWrappedAmazonSQSClient:

    • If you use getWrappedAmazonSQSClient, the returned client object transforms all exceptions into JMS exceptions.

    • If you use getAmazonSQSClient, the exceptions will be Amazon SQS exceptions.

Creating an Amazon SQS Queue

The wrapped client object checks if an Amazon SQS queue exists.

If a queue does not exist, the client creates it. If the queue does exist, then the function does not return anything. For more information, see the "Create the queue if needed" section in the TextMessageSender.java example.

// Get the wrapped client
AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
 
// Create an SQS queue named 'TestQueue' – if it does not already exist.
if (!client.queueExists("TestQueue")) {
    client.createQueue("TestQueue");
}

Sending Messages Synchronously

  1. When the connection and the underlying Amazon SQS queue are ready, create a non-transacted JMS session with AUTO_ACKNOWLEDGE mode.

    // Create the non-transacted session with AUTO_ACKNOWLEDGE mode
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  2. To send a text message to the queue, create a JMS queue identity and a message producer.

    // Create a queue identity with name 'TestQueue' in the session
    Queue queue = session.createQueue("TestQueue");
     
    // Create a producer for the 'TestQueue'.
    MessageProducer producer = session.createProducer(queue);
  3. Create a text message and send it to the queue.

    // Create the text message.
    TextMessage message = session.createTextMessage("Hello World!");
     
    // Send the message.
    producer.send(message);
    System.out.println("JMS Message " + message.getJMSMessageID());

Receiving Messages Synchronously

  1. To receive messages, create a consumer on the same queue and invoke the start method.

    You can call the start method on the connection at any time. However, the consumer will not begin to receiving messages until you call it.

    // Create a consumer for the 'TestQueue'.
    MessageConsumer consumer = session.createConsumer(queue);
     
    // Start receiving incoming messages.
    connection.start();
  2. Call the receive method on the consumer with a timeout set to 1 second and print the content of the received message.

    // Receive a message from 'TestQueue' and wait up to 1 second
    Message receivedMessage = consumer.receive(1000);
     
    // Cast the received message as TextMessage and print the text to screen.
    if (receivedMessage != null) {
        System.out.println("Received: " + ((TextMessage) receivedMessage).getText());
    }
  3. Close the connection and the session.

    // Close the connection (and the session).
    connection.close();

The output will look similar to the following:

JMS Message ID:8example-588b-44e5-bbcf-d816example2
Received: Hello World!

Note

You can use the Spring Framework to initialize these objects.

For additional information, see SpringExampleConfig.xml, SpringExample.java, and the other helper classes in ExampleConfiguration.java and ExampleCommon.java in Code Samples.

For complete examples of sending and receiving objects, see TextMessageSender.java and SyncMessageReceiver.java.

Receiving Messages Asynchronously

In the example in Getting Started with the Amazon SQS Java Messaging Library, a message is sent to TestQueue and received synchronously.

The following example shows how to receive the messages asynchronously through a listener.

  1. Implement the MessageListener interface.

    class MyListener implements MessageListener {
     
        @Override
        public void onMessage(Message message) {
            try {
                // Cast the received message as TextMessage and print the text to screen.
                if (message != null) {
                    System.out.println("Received: " + ((TextMessage) message).getText());
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    The onMessage method of the MessageListener interface is called when you receive a message. In this listener implementation, the text stored in the message is printed.

  2. Instead of explicitly calling the receive method on the consumer, set the message listener of the consumer to an instance of the MyListener implementation. The main thread sleeps for one second.

    // Create a consumer for the 'TestQueue'.
    MessageConsumer consumer = session.createConsumer(queue);
     
    // Instantiate and set the message listener for the consumer.
    consumer.setMessageListener(new MyListener());
     
    // Start receiving incoming messages.
    connection.start();
     
    // Wait for 1 second. The listener onMessage() method will be invoked when a message is received.
    Thread.sleep(1000);

The rest of the steps are identical to the ones in the Getting Started with the Amazon SQS Java Messaging Library example. For a complete example of an asynchronous receiver, see AsyncMessageReceiver.java in Code Samples.

The output for this example will look similar to the following:

JMS Message ID:8example-588b-44e5-bbcf-d816example2
Received: Hello World!

Using Client Acknowledge Mode

The example in Getting Started with the Amazon SQS Java Messaging Library uses AUTO_ACKNOWLEDGE mode where every received message is acknowledged automatically (and therefore deleted from the underlying Amazon SQS queue).

  1. To explicitly acknowledge the messages after they are processed, you must create the session with CLIENT_ACKNOWLEDGE mode.

    // Create the non-transacted session with CLIENT_ACKNOWLEDGE mode.
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  2. When the message is received, display it and then explicitly acknowledge it.

    // Cast the received message as TextMessage and print the text to screen. Also acknowledge the message.
    if (receivedMessage != null) {
        System.out.println("Received: " + ((TextMessage) receivedMessage).getText());
        receivedMessage.acknowledge();
        System.out.println("Acknowledged: " + message.getJMSMessageID());
    }

    Note

    In this mode, when a message is acknowledged, then all messages received prior to this message are implicitly acknowledged as well. For example, if 10 messages are received, and only the 10th message is acknowledged (in the order the messages are received), then all of the previous 9 messages are also acknowledged.

The rest of the steps are identical to the ones in the Getting Started with the Amazon SQS Java Messaging Library example. For a complete example of a synchronous receiver with client acknowledge mode, see SyncMessageReceiverClientAcknowledge.java in Code Samples.

The output for this example will look similar to the following:

JMS Message ID:4example-aa0e-403f-b6df-5e02example5
Received: Hello World!
Acknowledged: ID:4example-aa0e-403f-b6df-5e02example5

Using Unordered Acknowledge Mode

When using CLIENT_ACKNOWLEDGE mode, all messages received before an explicitly-acknowledged message are acknowledged automatically. For more information, see Using Client Acknowledge Mode.

The Amazon SQS Java Messaging Library provides another acknowledgement mode. When using UNORDERED_ACKNOWLEDGE mode, all received messages must be individually and explicitly acknowledged by the client, regardless of their reception order.

Create a session with UNORDERED_ACKNOWLEDGE mode.

// Create the non-transacted session with UNORDERED_ACKNOWLEDGE mode.
Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);

The remaining steps are identical to the ones in the Using Client Acknowledge Mode example. For a complete example of a synchronous receiver with UNORDERED_ACKNOWLEDGE mode, see SyncMessageReceiverUnorderedAcknowledge.java.

In this example, the output will look similar to the following:

JMS Message ID:dexample-73ad-4adb-bc6c-4357example7
Received: Hello World!
Acknowledged: ID:dexample-73ad-4adb-bc6c-4357example7

Important

When you use the JMS library with UNORDERED_ACKNOWLEDGE mode, visibility timeout always takes effect. However, in the case of asynchronous message processing, visibility timeout takes effect only when the message listener onMessage() returns successfully.

Code Samples

The following code samples show how to use JMS with Amazon SQS.

ExampleConfiguration.java

The following Java code example sets the default queue name, the region, and the credentials to be used with the other Java examples.

public class ExampleConfiguration {
    public static final String DEFAULT_QUEUE_NAME = "SQSJMSClientExampleQueue";
    
    public static final Region DEFAULT_REGION = Region.getRegion(Regions.US_EAST_1);
    
    private static String getParameter( String args[], int i ) {
        if( i + 1 >= args.length ) {
            throw new IllegalArgumentException( "Missing parameter for " + args[i] );
        }
        return args[i+1];
    }
    
    /**
     * Parse the command line and return the resulting config. If the config parsing fails
     * print the error and the usage message and then call System.exit
     * 
     * @param app the app to use when printing the usage string
     * @param args the command line arguments
     * @return the parsed config
     */
    public static ExampleConfiguration parseConfig(String app, String args[]) {
        try {
            return new ExampleConfiguration(args);
        } catch (IllegalArgumentException e) {
            System.err.println( "ERROR: " + e.getMessage() );
            System.err.println();
            System.err.println( "Usage: " + app + " [--queue <queue>] [--region <region>] [--credentials <credentials>] ");
            System.err.println( "  or" );
            System.err.println( "       " + app + " <spring.xml>" );
            System.exit(-1);
            return null;
        }
    }
    
    private ExampleConfiguration(String args[]) {
        for( int i = 0; i < args.length; ++i ) {
            String arg = args[i];
            if( arg.equals( "--queue" ) ) {
                setQueueName(getParameter(args, i));
                i++;
            } else if( arg.equals( "--region" ) ) {
                String regionName = getParameter(args, i);
                try {
                    setRegion(Region.getRegion(Regions.fromName(regionName)));
                } catch( IllegalArgumentException e ) {
                    throw new IllegalArgumentException( "Unrecognized region " + regionName );  
                }
                i++;
            } else if( arg.equals( "--credentials" ) ) {
                String credsFile = getParameter(args, i);
                try {
                    setCredentialsProvider( new PropertiesFileCredentialsProvider(credsFile) );
                } catch (AmazonClientException e) {
                    throw new IllegalArgumentException("Error reading credentials from " + credsFile, e );
                }
                i++;
            } else {
                throw new IllegalArgumentException("Unrecognized option " + arg);
            }
        }
    }
    
    private String queueName = DEFAULT_QUEUE_NAME;
    private Region region = DEFAULT_REGION;
    private AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
    
    public String getQueueName() {
        return queueName;
    }
    
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
    
    public Region getRegion() {
        return region;
    }
    
    public void setRegion(Region region) {
        this.region = region;
    }
 
    public AWSCredentialsProvider getCredentialsProvider() {
        return credentialsProvider;
    }
    
    public void setCredentialsProvider(AWSCredentialsProvider credentialsProvider) {
        // Make sure they're usable first
        credentialsProvider.getCredentials();
        this.credentialsProvider = credentialsProvider;
    }
}

TextMessageSender.java

The following Java code example creates a text message sender.

public class TextMessageSender {
    public static void main(String args[]) throws JMSException {
        ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);
        
        ExampleCommon.setupLogging();
        
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory = 
                SQSConnectionFactory.builder()
                    .withRegion(config.getRegion())
                    .withAWSCredentialsProvider(config.getCredentialsProvider())
                    .build();
        
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
        
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
            
        // Create the session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );
        
        sendMessages(session, producer);
 
        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println( "Connection closed" );
    }
 
    private static void sendMessages( Session session, MessageProducer producer ) {
        BufferedReader inputReader = new BufferedReader(
                new InputStreamReader( System.in, Charset.defaultCharset() ) );
        
        try {
            String input;
            while( true ) { 
                System.out.print( "Enter message to send (leave empty to exit): " );
                input = inputReader.readLine();
                if( input == null || input.equals("" ) ) break;
                
                TextMessage message = session.createTextMessage(input);
                producer.send(message);
                System.out.println( "Send message " + message.getJMSMessageID() );
            }
        } catch (EOFException e) {
            // Just return on EOF
        } catch (IOException e) {
            System.err.println( "Failed reading input: " + e.getMessage() );
        } catch (JMSException e) {
            System.err.println( "Failed sending message: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}

AsyncMessageReceiver.java

The following Java code example creates an asynchronous message receiver.

public class AsyncMessageReceiver {
     public static void main(String args[]) throws JMSException, InterruptedException {
          ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);
          
          ExampleCommon.setupLogging();          
               
          // Create the session
          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
          MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );
          
          ReceiverCallback callback = new ReceiverCallback();
          consumer.setMessageListener( callback );
 
          // No messages will be processed until this is called
          connection.start();
          
          callback.waitForOneMinuteOfSilence();
          System.out.println( "Returning after one minute of silence" );
 
          // Close the connection. This will close the session automatically
          connection.close();
          System.out.println( "Connection closed" );
     }
     
     
     private static class ReceiverCallback implements MessageListener {
          // Used to listen for message silence
          private volatile long timeOfLastMessage = System.nanoTime();
          
          public void waitForOneMinuteOfSilence() throws InterruptedException {
               for(;;) {
                    long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;
                    long remainingTillOneMinuteOfSilence = 
                              TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;
                    if( remainingTillOneMinuteOfSilence < 0 ) {
                         break;
                    }
                    TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);
               }
          }
          
 
          @Override
          public void onMessage(Message message) {
               try {
                    ExampleCommon.handleMessage(message);
                    message.acknowledge();
                    System.out.println( "Acknowledged message " + message.getJMSMessageID() );
                    timeOfLastMessage = System.nanoTime();
               } catch (JMSException e) {
                    System.err.println( "Error processing message: " + e.getMessage() );
                    e.printStackTrace();
               }
          }
     }
}

SyncMessageReceiver.java

The following Java code example creates a synchronous message receiver.

public class SyncMessageReceiver {
    public static void main(String args[]) throws JMSException {
        ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);
        
        ExampleCommon.setupLogging();
        
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory = 
                SQSConnectionFactory.builder()
                    .withRegion(config.getRegion())
                    .withAWSCredentialsProvider(config.getCredentialsProvider())
                    .build();
        
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
        
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
            
        // Create the session
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );
 
        connection.start();
        
        receiveMessages(session, consumer);
 
        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println( "Connection closed" );
    }
 
    private static void receiveMessages( Session session, MessageConsumer consumer ) {
        try {
            while( true ) {
                System.out.println( "Waiting for messages");
                // Wait 1 minute for a message
                Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
                if( message == null ) {
                    System.out.println( "Shutting down after 1 minute of silence" );
                    break;
                }
                ExampleCommon.handleMessage(message);
                message.acknowledge();
                System.out.println( "Acknowledged message " + message.getJMSMessageID() );
            }
        } catch (JMSException e) {
            System.err.println( "Error receiving from SQS: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}

SyncMessageReceiverClientAcknowledge.java

The following Java code example creates a synchronous receiver with client acknowledge mode.

/**
 * An example class to demonstrate the behavior of CLIENT_ACKNOWLEDGE mode for received messages. This example
 * complements the example given in {@link SyncMessageReceiverUnorderedAcknowledge} for UNORDERED_ACKNOWLEDGE mode.
 *
 * First, a session, a message producer, and a message consumer are created. Then, two messages are sent. Next, two messages
 * are received but only the second one is acknowledged. After waiting for the visibility time out period, an attempt to
 * receive another message is made. It is shown that no message is returned for this attempt since in CLIENT_ACKNOWLEDGE mode,
 * as expected, all the messages prior to the acknowledged messages are also acknowledged.
 *
 * This is NOT the behavior for UNORDERED_ACKNOWLEDGE mode. Please see {@link SyncMessageReceiverUnorderedAcknowledge}
 * for an example.
 */
public class SyncMessageReceiverClientAcknowledge {
 
    // Visibility time-out for the queue. It must match to the one set for the queue for this example to work.
    private static final long TIME_OUT_SECONDS = 1;
 
    public static void main(String args[]) throws JMSException, InterruptedException {
        // Create the configuration for the example
        ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiverClientAcknowledge", args);
 
        // Setup logging for the example
        ExampleCommon.setupLogging();
 
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory =
                SQSConnectionFactory.builder()
                        .withRegion(config.getRegion())
                        .withAWSCredentialsProvider(config.getCredentialsProvider())
                        .build();
 
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
 
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
 
        // Create the session  with client acknowledge mode
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
        // Create the producer and consume
        MessageProducer producer = session.createProducer(session.createQueue(config.getQueueName()));
        MessageConsumer consumer = session.createConsumer(session.createQueue(config.getQueueName()));
 
        // Open the connection
        connection.start();
 
        // Send two text messages
        sendMessage(producer, session, "Message 1");
        sendMessage(producer, session, "Message 2");
 
        // Receive a message and don't acknowledge it
        receiveMessage(consumer, false);
 
        // Receive another message and acknowledge it
        receiveMessage(consumer, true);
 
        // Wait for the visibility time out, so that unacknowledged messages reappear in the queue
        System.out.println("Waiting for visibility timeout...");
        Thread.sleep(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));
 
        // Attempt to receive another message and acknowledge it. This will result in receiving no messages since
        // we have acknowledged the second message. Although we did not explicitly acknowledge the first message,
        // in the CLIENT_ACKNOWLEDGE mode, all the messages received prior to the explicitly acknowledged message
        // are also acknowledged. Therefore, we have implicitly acknowledged the first message.
        receiveMessage(consumer, true);
 
        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println("Connection closed.");
    }
 
    /**
     * Sends a message through the producer.
     *
     * @param producer Message producer
     * @param session Session
     * @param messageText Text for the message to be sent
     * @throws JMSException
     */
    private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException {
        // Create a text message and send it
        producer.send(session.createTextMessage(messageText));
    }
 
    /**
     * Receives a message through the consumer synchronously with the default timeout (TIME_OUT_SECONDS).
     * If a message is received, the message is printed. If no message is received, "Queue is empty!" is
     * printed.
     *
     * @param consumer Message consumer
     * @param acknowledge If true and a message is received, the received message is acknowledged.
     * @throws JMSException
     */
    private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException {
        // Receive a message
        Message message = consumer.receive(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));
 
        if (message == null) {
            System.out.println("Queue is empty!");
        } else {
            // Since this queue has only text messages, cast the message object and print the text
            System.out.println("Received: " + ((TextMessage) message).getText());
 
            // Acknowledge the message if asked
            if (acknowledge) message.acknowledge();
        }
    }
}

SyncMessageReceiverUnorderedAcknowledge.java

The following Java code example creates a synchronous receiver with unordered acknowledge mode.

/**
 * An example class to demonstrate the behavior of UNORDERED_ACKNOWLEDGE mode for received messages. This example
 * complements the example given in {@link SyncMessageReceiverClientAcknowledge} for CLIENT_ACKNOWLEDGE mode.
 *
 * First, a session, a message producer, and a message consumer are created. Then, two messages are sent. Next, two messages
 * are received but only the second one is acknowledged. After waiting for the visibility time out period, an attempt to
 * receive another message is made. It is shown that the first message received in the prior attempt is returned again
 * for the second attempt. In UNORDERED_ACKNOWLEDGE mode, all the messages must be explicitly acknowledged no matter what
 * the order they are received.
 *
 * This is NOT the behavior for CLIENT_ACKNOWLEDGE mode. Please see {@link SyncMessageReceiverClientAcknowledge}
 * for an example.
 */
public class SyncMessageReceiverUnorderedAcknowledge {
 
    // Visibility time-out for the queue. It must match to the one set for the queue for this example to work.
    private static final long TIME_OUT_SECONDS = 1;
 
    public static void main(String args[]) throws JMSException, InterruptedException {
        // Create the configuration for the example
        ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiverUnorderedAcknowledge", args);
 
        // Setup logging for the example
        ExampleCommon.setupLogging();
 
        // Create the connection factory based on the config
        SQSConnectionFactory connectionFactory =
                SQSConnectionFactory.builder()
                        .withRegion(config.getRegion())
                        .withAWSCredentialsProvider(config.getCredentialsProvider())
                        .build();
 
        // Create the connection
        SQSConnection connection = connectionFactory.createConnection();
 
        // Create the queue if needed
        ExampleCommon.ensureQueueExists(connection, config.getQueueName());
 
        // Create the session  with unordered acknowledge mode
        Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
 
        // Create the producer and consume
        MessageProducer producer = session.createProducer(session.createQueue(config.getQueueName()));
        MessageConsumer consumer = session.createConsumer(session.createQueue(config.getQueueName()));
 
        // Open the connection
        connection.start();
 
        // Send two text messages
        sendMessage(producer, session, "Message 1");
        sendMessage(producer, session, "Message 2");
 
        // Receive a message and don't acknowledge it
        receiveMessage(consumer, false);
 
        // Receive another message and acknowledge it
        receiveMessage(consumer, true);
 
        // Wait for the visibility time out, so that unacknowledged messages reappear in the queue
        System.out.println("Waiting for visibility timeout...");
        Thread.sleep(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));
 
        // Attempt to receive another message and acknowledge it. This will result in receiving the first message since
        // we have acknowledged only the second message. In the UNORDERED_ACKNOWLEDGE mode, all the messages must
        // be explicitly acknowledged.
        receiveMessage(consumer, true);
 
        // Close the connection. This will close the session automatically
        connection.close();
        System.out.println("Connection closed.");
    }
 
    /**
     * Sends a message through the producer.
     *
     * @param producer Message producer
     * @param session Session
     * @param messageText Text for the message to be sent
     * @throws JMSException
     */
    private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException {
        // Create a text message and send it
        producer.send(session.createTextMessage(messageText));
    }
 
    /**
     * Receives a message through the consumer synchronously with the default timeout (TIME_OUT_SECONDS).
     * If a message is received, the message is printed. If no message is received, "Queue is empty!" is
     * printed.
     *
     * @param consumer Message consumer
     * @param acknowledge If true and a message is received, the received message is acknowledged.
     * @throws JMSException
     */
    private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException {
        // Receive a message
        Message message = consumer.receive(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS));
 
        if (message == null) {
            System.out.println("Queue is empty!");
        } else {
            // Since this queue has only text messages, cast the message object and print the text
            System.out.println("Received: " + ((TextMessage) message).getText());
 
            // Acknowledge the message if asked
            if (acknowledge) message.acknowledge();
        }
    }
}

SpringExampleConfig.xml

The following XML code example is a bean configuration file for SpringExample.java.

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
    ">
    
    <bean id="CredentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/>
    
    <bean id="ConnectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
    	<property name="regionName" value="us-east-1"/>
    	<property name="numberOfMessagesToPrefetch" value="5"/>
    	<property name="awsCredentialsProvider" ref="CredentialsProviderBean"/>
    </bean>
    
	<bean id="ConnectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
		factory-bean="ConnectionFactoryBuilder"
		factory-method="build" />
		
	<bean id="Connection" class="javax.jms.Connection"
		factory-bean="ConnectionFactory"
		factory-method="createConnection"
		init-method="start"
		destroy-method="close" />
		 
    <bean id="QueueName" class="java.lang.String">
    	<constructor-arg value="SQSJMSClientExampleQueue"/>
    </bean>
    
</beans>

SpringExample.java

The following Java code example uses the bean configuration file to initialize your objects.

public class SpringExample {
    public static void main(String args[]) throws JMSException {
        if( args.length != 1 || !args[0].endsWith(".xml")) {
            System.err.println( "Usage: " + SpringExample.class.getName() + " <spring config.xml>" );
            System.exit(1);
        }
        
        File springFile = new File( args[0] );
        if( !springFile.exists() || !springFile.canRead() ) {
            System.err.println( "File " + args[0] + " does not exist or is not readable.");
            System.exit(2);
        }
        
        ExampleCommon.setupLogging();
        
        FileSystemXmlApplicationContext context = 
                new FileSystemXmlApplicationContext( "file://" + springFile.getAbsolutePath() );
        
        Connection connection;
        try {
            connection = context.getBean(Connection.class);
        } catch( NoSuchBeanDefinitionException e ) {
            System.err.println( "Could not find the JMS connection to use: " + e.getMessage() );
            System.exit(3);
            return;
        }
        
        String queueName;
        try {
            queueName = context.getBean("QueueName", String.class);
        } catch( NoSuchBeanDefinitionException e ) {
            System.err.println( "Could not find the name of the queue to use: " + e.getMessage() );
            System.exit(3);
            return;
        }
        
        if( connection instanceof SQSConnection ) {
            ExampleCommon.ensureQueueExists( (SQSConnection) connection, queueName );
        }
        
        // Create the session
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer( session.createQueue( queueName) );
        
        receiveMessages(session, consumer);
 
        // The context can be setup to close the connection for us
        context.close();
        System.out.println( "Context closed" );
    }
 
    private static void receiveMessages( Session session, MessageConsumer consumer ) {
        try {
            while( true ) {
                System.out.println( "Waiting for messages");
                // Wait 1 minute for a message
                Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
                if( message == null ) {
                    System.out.println( "Shutting down after 1 minute of silence" );
                    break;
                }
                ExampleCommon.handleMessage(message);
                message.acknowledge();
                System.out.println( "Acknowledged message" );
            }
        } catch (JMSException e) {
            System.err.println( "Error receiving from SQS: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}

ExampleCommon.java

The following Java code example checks if an Amazon SQS queue exists and then creates one if it does not. It also includes example logging code.

public class ExampleCommon {
    /**
     * A utility function to check the queue exists and create it if needed. For most  
     * use cases this will usually be done by an administrator before the application
     * is run. 
     */
    public static void ensureQueueExists(SQSConnection connection, String queueName) throws JMSException {
        AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
        
        /**
         * For most cases this could be done with just a createQueue call, but GetQueueUrl 
         * (called by queueExists) is a faster operation for the common case where the queue 
         * already exists. Also many users and roles have permission to call GetQueueUrl
         * but do not have permission to call CreateQueue.
         */
        if( !client.queueExists(queueName) ) {
            client.createQueue( queueName );
        }
    }
 
    public static void setupLogging() {
        // Setup logging
        BasicConfigurator.configure();
        Logger.getRootLogger().setLevel(Level.WARN);
    }
 
    public static void handleMessage(Message message) throws JMSException {
        System.out.println( "Got message " + message.getJMSMessageID() );
        System.out.println( "Content: ");
        if( message instanceof TextMessage ) {
            TextMessage txtMessage = ( TextMessage ) message;
            System.out.println( "\t" + txtMessage.getText() );
        } else if( message instanceof BytesMessage ){
            BytesMessage byteMessage = ( BytesMessage ) message;
            // Assume the length fits in an int - SQS only supports sizes up to 256k so that
            // should be true
            byte[] bytes = new byte[(int)byteMessage.getBodyLength()];
            byteMessage.readBytes(bytes);
            System.out.println( "\t" +  Base64.encodeAsString( bytes ) );
        } else if( message instanceof ObjectMessage ) {
            ObjectMessage objMessage = (ObjectMessage) message;
            System.out.println( "\t" + objMessage.getObject() );
        }
    }
}

Supported JMS 1.1 Implementations

The Amazon SQS Java Messaging Library supports the following JMS 1.1 implementations.

For more information about the supported features and capabilities of the Amazon SQS Java Messaging Library, see the Amazon SQS FAQs.

Supported Common Interfaces

  • Connection

  • ConnectionFactory

  • Destination

  • Session

  • MessageConsumer

  • MessageProducer

Supported Message Types

  • ByteMessage

  • ObjectMessage

  • TextMessage

Supported Message Acknowledgment Modes

  • AUTO_ACKNOWLEDGE

  • CLIENT_ACKNOWLEDGE

  • DUPS_OK_ACKNOWLEDGE

  • UNORDERED_ACKNOWLEDGE

Note

The UNORDERED_ACKNOWLEDGE mode is not part of the JMS 1.1 specification. This mode helps Amazon SQS allow a JMS client to explicitly acknowledge a message.

JMS-Defined Headers and Reserved Properties

The Amazon SQS JMS client sets the following JMS-defined headers:

  • JMSDestination

  • JMSMessageID

  • JMSRedelivered

The Amazon SQS JMS client sets the following JMS reserved property:

  • JMSXDeliveryCount