Exemplos funcionais de Como usar o Java Message Service (JMS) com o ActiveMQ - Amazon MQ

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplos funcionais de Como usar o Java Message Service (JMS) com o ActiveMQ

Os exemplos a seguir mostram como você pode trabalhar com o ActiveMQ programaticamente:

  • O código Java de OpenWire exemplo se conecta a um corretor, cria uma fila e envia e recebe uma mensagem. Para detalhamento e explicação, consulte Connecting a Java application to your broker.

  • O código do exemplo Java de MQTT faz uma conexão com um agente e cria um tópico além de enviar e receber e uma mensagem.

  • O código do exemplo Java de STOMP+WSS faz uma conexão com um agente e cria uma fila, além de enviar e receber e uma mensagem.

Pré-requisitos

Habilitar atributos da VPC

Para garantir que seu agente esteja acessível dentro da sua VPC, você deve habilitar os atributos VPC enableDnsHostnames e enableDnsSupport. Para obter mais informações, consulte Compatibilidade com DNS para a sua VPC no Manual do usuário da Amazon VPC.

Habilitar conexões de entrada

  1. Faça login no console do Amazon MQ.

  2. Na lista de corretores, escolha o nome do seu corretor (por exemplo, MyBroker).

  3. Na MyBrokerpágina, na seção Conexões, observe os endereços e portas do URL do console web e dos protocolos de nível de fio do broker.

  4. Na seção Details (Detalhes), em Security and network (Segurança e rede), escolha o nome do seu grupo de segurança ou .

    A página Grupos de segurança do painel do EC2 é exibida.

  5. Na lista de security group, escolha seu security group.

  6. Na parte inferior da página, escolha Inbound (Entrada) e a seguir selecione Edit (Editar).

  7. Na caixa de diálogo Edit inbound rules (Editar regras de entrada), adicione uma regra para cada URL ou endpoint que você deseja que seja acessível publicamente (o exemplo a seguir mostra como fazer isso para um console da Web do agente).

    1. Escolha Add Rule (Adicionar regra).

    2. Em Type (Tipo), selecione Custom TCP (TCP personalizado).

    3. Para o Intervalo de Portas, digite a porta do console da Web (8162).

    4. Para Source (Origem), deixe Custom (Personalizado) selecionado e, depois, digite o endereço IP do sistema ao qual deseja ser capaz de acessar o console da Web (por exemplo, 192.0.2.1).

    5. Selecione Save (Salvar).

      Agora seu agente pode aceitar conexões de entrada.

Adicionar dependências de Java

OpenWire

Adicione os pacotes activemq-client.jar e activemq-pool.jar ao caminho da classe Java. O exemplo a seguir mostra essas dependências em um arquivo pom.xml do projeto Maven.

<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>

Para obter mais informações sobre activemq-client.jar, consulte Initial Configuration (Configuração inicial) na documentação do Apache ActiveMQ.

MQTT

Adicione o pacote org.eclipse.paho.client.mqttv3.jar ao caminho da classe Java. O exemplo a seguir mostra essa dependência em um arquivo pom.xml do projeto Maven.

<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies>

Para obter mais informações sobre org.eclipse.paho.client.mqttv3.jar, consulte Cliente Java Eclipse Paho.

STOMP+WSS

Adicione os seguintes pacotes ao caminho da classe Java:

  • spring-messaging.jar

  • spring-websocket.jar

  • javax.websocket-api.jar

  • jetty-all.jar

  • slf4j-simple.jar

  • jackson-databind.jar

O exemplo a seguir mostra essas dependências em um arquivo pom.xml do projeto Maven.

<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>javax.websocket</groupId> <artifactId>javax.websocket-api</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <type>pom</type> <version>9.3.3.v20150827</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.5.0</version> </dependency> </dependencies>

Para obter mais informações, consulte Suporte de STOMP na documentação do Spring Framework.

AmazonMQExample.java

Importante

No código de exemplo a seguir, os produtores e consumidores são executados em um único thread. Para sistemas de produção (ou para testar o failover de instância do agente), certifique-se de que seus produtores e consumidores sejam executados em hosts ou threads separados.

OpenWire
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; public class AmazonMQExample { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456"; public static void main(String[] args) throws JMSException { final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(); final PooledConnectionFactory pooledConnectionFactory = createPooledConnectionFactory(connectionFactory); sendMessage(pooledConnectionFactory); receiveMessage(connectionFactory); pooledConnectionFactory.stop(); } private static void sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException { // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory .createConnection(); producerConnection.start(); // Create a session. final Session producerSession = producerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession .createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession .createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a message. final String text = "Hello from Amazon MQ!"; final TextMessage producerMessage = producerSession .createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent."); // Clean up the producer. producer.close(); producerSession.close(); producerConnection.close(); } private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException { // Establish a connection for the consumer. // Note: Consumers should not use PooledConnectionFactory. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); // Create a session. final Session consumerSession = consumerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession .createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession .createConsumer(consumerDestination); // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText()); // Clean up the consumer. consumer.close(); consumerSession.close(); consumerConnection.close(); } private static PooledConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); return pooledConnectionFactory; } private static ActiveMQConnectionFactory createActiveMQConnectionFactory() { // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT); // Pass the sign-in credentials. connectionFactory.setUserName(ACTIVE_MQ_USERNAME); connectionFactory.setPassword(ACTIVE_MQ_PASSWORD); return connectionFactory; } }
MQTT
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.eclipse.paho.client.mqttv3.*; public class AmazonMQExampleMqtt implements MqttCallback { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456"; public static void main(String[] args) throws Exception { new AmazonMQExampleMqtt().run(); } private void run() throws MqttException, InterruptedException { // Specify the topic name and the message text. final String topic = "myTopic"; final String text = "Hello from Amazon MQ!"; // Create the MQTT client and specify the connection options. final String clientId = "abc123"; final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId); final MqttConnectOptions connOpts = new MqttConnectOptions(); // Pass the sign-in credentials. connOpts.setUserName(ACTIVE_MQ_USERNAME); connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray()); // Create a session and subscribe to a topic filter. client.connect(connOpts); client.setCallback(this); client.subscribe("+"); // Create a message. final MqttMessage message = new MqttMessage(text.getBytes()); // Publish the message to a topic. client.publish(topic, message); System.out.println("Published message."); // Wait for the message to be received. Thread.sleep(3000L); // Clean up the connection. client.disconnect(); } @Override public void connectionLost(Throwable cause) { System.out.println("Lost connection."); } @Override public void messageArrived(String topic, MqttMessage message) throws MqttException { System.out.println("Received message from topic " + topic + ": " + message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivered message."); } }
STOMP+WSS
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.simp.stomp.*; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; import java.lang.reflect.Type; public class AmazonMQExampleStompWss { // Specify the connection parameters. private final static String DESTINATION = "/queue"; private final static String WIRE_LEVEL_ENDPOINT = "wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456"; public static void main(String[] args) throws Exception { final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss(); final StompSession stompSession = example.connect(); System.out.println("Subscribed to a destination using session."); example.subscribeToDestination(stompSession); System.out.println("Sent message to session."); example.sendMessage(stompSession); Thread.sleep(60000); } private StompSession connect() throws Exception { // Create a client. final WebSocketClient client = new StandardWebSocketClient(); final WebSocketStompClient stompClient = new WebSocketStompClient(client); stompClient.setMessageConverter(new StringMessageConverter()); final WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); // Create headers with authentication parameters. final StompHeaders head = new StompHeaders(); head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME); head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD); final StompSessionHandler sessionHandler = new MySessionHandler(); // Create a connection. return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head, sessionHandler).get(); } private void subscribeToDestination(final StompSession stompSession) { stompSession.subscribe(DESTINATION, new MyFrameHandler()); } private void sendMessage(final StompSession stompSession) { stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes()); } private static class MySessionHandler extends StompSessionHandlerAdapter { public void afterConnected(final StompSession stompSession, final StompHeaders stompHeaders) { System.out.println("Connected to broker."); } } private static class MyFrameHandler implements StompFrameHandler { public Type getPayloadType(final StompHeaders headers) { return String.class; } public void handleFrame(final StompHeaders stompHeaders, final Object message) { System.out.print("Received message from topic: " + message); } } }