ActiveMQ와 함께 자바 메시지 서비스 (JMS) 를 사용하는 작업 예제 - Amazon MQ

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

ActiveMQ와 함께 자바 메시지 서비스 (JMS) 를 사용하는 작업 예제

다음 예제에서는 ActiveMQ를 프로그래밍 방식으로 사용하는 방법을 보여 줍니다.

  • OpenWire 예제 Java 코드는 브로커에 연결하고, 대기열을 만들고, 메시지를 보내고 받습니다. 자세한 설명과 분석은 Connecting a Java application to your broker 단원을 참조하세요.

  • MQTT예제 Java 코드는 브로커에 연결하여 주제를 생성하고 메시지를 게시하고 수신합니다.

  • STOMP+ WSS 예제 Java 코드는 브로커에 연결하여 큐를 만들고 메시지를 게시하고 수신합니다.

사전 조건

속성 활성화 VPC

사용자 내에서 브로커에 액세스할 수 있도록 VPC 하려면 enableDnsHostnamesenableDnsSupport VPC 속성을 활성화해야 합니다. 자세한 내용은 Amazon VPC 사용 설명서의 귀하의 DNS VPC Support를 참조하십시오.

인바운드 연결 활성화

  1. Amazon MQ 콘솔에 로그인합니다.

  2. 브로커 목록에서 브로커 이름 (예: MyBroker) 을 선택합니다.

  3. MyBroker페이지의 연결 섹션에서 브로커 웹 콘솔 URL 및 유선 수준 프로토콜의 주소와 포트를 기록해 둡니다.

  4. 세부 정보 섹션의 보안 및 네트워크에서 보안 그룹의 이름 또는 Blue square icon with a white cloud and up arrow. (AI generated) 을 선택합니다.

    EC2대시보드의 보안 그룹 페이지가 표시됩니다.

  5. 보안 그룹 목록에서 보안 그룹을 선택합니다.

  6. 페이지 하단에서 인바운드를 선택한 후 편집을 선택합니다.

  7. 인바운드 규칙 편집 대화 상자에서 공개적으로 액세스할 수 있도록 하려는 모든 URL 또는 엔드포인트에 대한 규칙을 추가합니다 (다음 예는 브로커 웹 콘솔에서 이 작업을 수행하는 방법을 보여줍니다).

    1. 규칙 추가(Add Rule)를 선택합니다.

    2. 유형에서 사용자 TCP 지정을 선택합니다.

    3. Port Range(포트 범위)에 웹 콘솔 포트(8162)를 입력합니다.

    4. Source(소스)에서 Custom(사용자 지정)을 선택한 상태에서 웹 콘솔에 액세스하는 데 사용할 시스템의 IP 주소(예: 192.0.2.1)를 입력합니다.

    5. 저장(Save)을 선택합니다.

      이제 브로커가 인바운드 연결을 허용할 수 있습니다.

Java 종속성 추가

OpenWire

activemq-client.jaractivemq-pool.jar 패키지를 Java 클래스 경로에 추가합니다. 다음 예제는 Maven 프로젝트의 pom.xml 파일 내에 존재하는 이러한 종속성을 보여줍니다.

<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>

activemq-client.jar에 대한 자세한 정보는 Apache ActiveMQ 설명서의 Initial Configuration을 참조하세요.

MQTT

org.eclipse.paho.client.mqttv3.jar 패키지를 Java 클래스 경로에 추가합니다. 다음 예제는 Maven 프로젝트 pom.xml 파일의 이 종속성을 보여줍니다.

<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies>

org.eclipse.paho.client.mqttv3.jar에 대한 자세한 내용은 Eclipse Paho Java Client를 참조하세요.

STOMP+WSS

다음 패키지를 Java 클래스 경로에 추가합니다.

  • spring-messaging.jar

  • spring-websocket.jar

  • javax.websocket-api.jar

  • jetty-all.jar

  • slf4j-simple.jar

  • jackson-databind.jar

다음 예제는 Maven 프로젝트의 pom.xml 파일 내에 존재하는 이러한 종속성을 보여줍니다.

<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>javax.websocket</groupId> <artifactId>javax.websocket-api</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <type>pom</type> <version>9.3.3.v20150827</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.5.0</version> </dependency> </dependencies>

자세한 내용은 Spring 프레임워크 문서의 STOMPSupport 문서를 참조하십시오.

mazonMQExample.java

중요

다음 예제 코드에서 생성자와 소비자는 단일 스레드에서 실행됩니다. 프로덕션 시스템(또는 브로커 인스턴스 장애 조치 테스트)의 경우 생산자와 소비자가 별도의 호스트 또는 스레드에서 실행되는지 확인합니다.

OpenWire
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; public class AmazonMQExample { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456"; public static void main(String[] args) throws JMSException { final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(); final PooledConnectionFactory pooledConnectionFactory = createPooledConnectionFactory(connectionFactory); sendMessage(pooledConnectionFactory); receiveMessage(connectionFactory); pooledConnectionFactory.stop(); } private static void sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException { // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory .createConnection(); producerConnection.start(); // Create a session. final Session producerSession = producerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession .createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession .createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a message. final String text = "Hello from Amazon MQ!"; final TextMessage producerMessage = producerSession .createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent."); // Clean up the producer. producer.close(); producerSession.close(); producerConnection.close(); } private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException { // Establish a connection for the consumer. // Note: Consumers should not use PooledConnectionFactory. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); // Create a session. final Session consumerSession = consumerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession .createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession .createConsumer(consumerDestination); // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText()); // Clean up the consumer. consumer.close(); consumerSession.close(); consumerConnection.close(); } private static PooledConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); return pooledConnectionFactory; } private static ActiveMQConnectionFactory createActiveMQConnectionFactory() { // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT); // Pass the sign-in credentials. connectionFactory.setUserName(ACTIVE_MQ_USERNAME); connectionFactory.setPassword(ACTIVE_MQ_PASSWORD); return connectionFactory; } }
MQTT
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.eclipse.paho.client.mqttv3.*; public class AmazonMQExampleMqtt implements MqttCallback { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456"; public static void main(String[] args) throws Exception { new AmazonMQExampleMqtt().run(); } private void run() throws MqttException, InterruptedException { // Specify the topic name and the message text. final String topic = "myTopic"; final String text = "Hello from Amazon MQ!"; // Create the MQTT client and specify the connection options. final String clientId = "abc123"; final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId); final MqttConnectOptions connOpts = new MqttConnectOptions(); // Pass the sign-in credentials. connOpts.setUserName(ACTIVE_MQ_USERNAME); connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray()); // Create a session and subscribe to a topic filter. client.connect(connOpts); client.setCallback(this); client.subscribe("+"); // Create a message. final MqttMessage message = new MqttMessage(text.getBytes()); // Publish the message to a topic. client.publish(topic, message); System.out.println("Published message."); // Wait for the message to be received. Thread.sleep(3000L); // Clean up the connection. client.disconnect(); } @Override public void connectionLost(Throwable cause) { System.out.println("Lost connection."); } @Override public void messageArrived(String topic, MqttMessage message) throws MqttException { System.out.println("Received message from topic " + topic + ": " + message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivered message."); } }
STOMP+WSS
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.simp.stomp.*; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; import java.lang.reflect.Type; public class AmazonMQExampleStompWss { // Specify the connection parameters. private final static String DESTINATION = "/queue"; private final static String WIRE_LEVEL_ENDPOINT = "wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456"; public static void main(String[] args) throws Exception { final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss(); final StompSession stompSession = example.connect(); System.out.println("Subscribed to a destination using session."); example.subscribeToDestination(stompSession); System.out.println("Sent message to session."); example.sendMessage(stompSession); Thread.sleep(60000); } private StompSession connect() throws Exception { // Create a client. final WebSocketClient client = new StandardWebSocketClient(); final WebSocketStompClient stompClient = new WebSocketStompClient(client); stompClient.setMessageConverter(new StringMessageConverter()); final WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); // Create headers with authentication parameters. final StompHeaders head = new StompHeaders(); head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME); head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD); final StompSessionHandler sessionHandler = new MySessionHandler(); // Create a connection. return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head, sessionHandler).get(); } private void subscribeToDestination(final StompSession stompSession) { stompSession.subscribe(DESTINATION, new MyFrameHandler()); } private void sendMessage(final StompSession stompSession) { stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes()); } private static class MySessionHandler extends StompSessionHandlerAdapter { public void afterConnected(final StompSession stompSession, final StompHeaders stompHeaders) { System.out.println("Connected to broker."); } } private static class MyFrameHandler implements StompFrameHandler { public Type getPayloadType(final StompHeaders headers) { return String.class; } public void handleFrame(final StompHeaders stompHeaders, final Object message) { System.out.print("Received message from topic: " + message); } } }