RabbitMQ ブローカーの作成と接続 - Amazon MQ

RabbitMQ ブローカーの作成と接続

ブローカーは、Amazon MQ で実行されるメッセージブローカー環境です。これは、Amazon MQ の基本的な構成要素です。ブローカーインスタンスのクラス (m5t3) およびサイズ (largemicro) を組み合わせた説明がブローカーインスタンスタイプ (mq.m5.large など) になります。

ステップ 1: RabbitMQ ブローカーを作成する

最初に実行する最も一般的な Amazon MQ タスクは、ブローカーの作成です。以下の例では、AWS Management Consoleを使用して基本的なブローカーを作成する方法を説明します。

  1. Amazon MQ コンソールにサインインします。

  2. [Select broker engine] (ブローカーエンジンの選択) ページで [RabbitMQ] を選択し、[Next] (次へ) をクリックします。

  3. [Select deployment mode] (デプロイモードの選択) ページで [Deployment mode] (デプロイモード) ([Cluster deployment] (クラスターのデプロイ) など) を選択して、[Next] (次へ) をクリックします。

    • 単一インスタンスブローカーは、Network Load Balancer (NLB) の内側にある 1 つのアベイラビリティーゾーン内の 1 つのブローカーで構成されます。ブローカーは、アプリケーション、および Amazon EBS ストレージボリュームと通信します。詳細については、「単一インスタンスブローカー」を参照してください。

    • 高可用性対応の RabbitMQ クラスターデプロイは、Network Load Balancer の内側にある 3 つの RabbitMQ ブローカーノードの論理グループで、それぞれがユーザー、キュー、および複数のアベイラビリティーゾーン (AZ) 間の分散状態を共有します。詳細については、「高可用性対応のクラスターデプロイ」を参照してください。

  4. [Configure settings] (設定の定義) ページの [Details] (詳細) セクションで、以下を実行します。

    1. [Broker name] (ブローカー名) を入力します。

      重要

      個人を特定できる情報 (PII) などの機密情報や秘匿性の高い情報はタグに追加しないでください。ブローカー名は、CloudWatch Logs を含む他の AWS サービスからアクセスできます。ブローカー名は、プライベートデータや機密データとして使用することを意図していません。

    2. [Broker instance type] (ブローカーインスタンスタイプ) を選択します (mq.m5.large など)。詳細については、「Broker instance types」を参照してください。

    注記

    [Additional settings] (追加設定) セクションには、CloudWatch Logs を有効化するオプションと、ブローカーのネットワークアクセスを設定するオプションがあります。パブリックアクセシビリティがないプライベート RabbitMQ ブローカーを作成する場合は、Virtual Private Cloud (VPC) を選択し、ブローカーにアクセスするためのセキュリティグループを設定する必要があります。

  5. [Configure settings] (設定の定義) ページの [RabbitMQ access] (RabbitMQ アクセス) セクションで、[Username] (ユーザーネーム) と [Password] (パスワード) を入力します。ブローカーのユーザー名とパスワードには、以下の制限が適用されます:

    • ユーザーネームに使用できるのは、英数字、ダッシュ、ピリオド、およびアンダースコア (- . _) のみです。この値にチルダ (~) 文字を含めることはできません。Amazon MQ では、ユーザーネームとしての guest の使用が禁止されています。

    • パスワードは 12 文字以上の長さで、一意の文字を少なくとも 4 つ含める必要があり、カンマ、コロン、または等号 (,:=) は使用できません。

    重要

    個人を特定できる情報 (PII) などの機密情報や秘匿性の高い情報はブローカーのユーザー名に追加しないでください。ブローカーのユーザー名は、CloudWatch Logs を含む他の AWS サービスからアクセスできます。ブローカーのユーザー名は、プライベートデータや機密データとして使用することを意図していません。

  6. [Next] を選択します。

  7. [Review and create] (確認と作成) ページで、選択内容を確認し、必要に応じて編集することができます。

  8. [Create broker] (ブローカーの作成) をクリックします。

    Amazon MQ がブローカーを作成している間は、[Creation in progress] (作成中) ステータスが表示されます。

    ブローカーの作成には約 15 分かかります。

    ブローカーが正常に作成されると、Amazon MQ が [Running] (実行中) ステータスを表示します。

  9. [MyBroker] を選択します。

    [MyBroker] ページの [Connect] (接続) セクションにあるブローカーの RabbitMQ web console URL をメモしておきます。以下はその例です。

    https://b-c8349341-ec91-4a78-ad9c-a57f23f235bb.mq.us-west-2.amazonaws.com

    ブローカーの secure-AMQP エンドポイントもメモしておきます。以下は、リスナーポート 5671 を公開する amqps エンドポイントの例です。

    amqps://b-c8349341-ec91-4a78-ad9c-a57f23f235bb.mq.us-west-2.amazonaws.com:5671

ステップ 2: ブローカーに JVM ベースのアプリケーションを接続する

RabbitMQ ブローカーを作成したら、ブローカーにアプリケーションを接続できます。以下の例では、RabbitMQ Java クライアントライブラリを使用してブローカーへの接続を作成し、キューを作成して、メッセージを送信する方法を説明します。RabbitMQ ブローカーには、サポートされているさまざまな言語の RabbitMQ クライアントライブラリを使用して接続することができます。サポートされている RabbitMQ クライアントライブラリの詳細については、「RabbitMQ client libraries and developer tools」を参照してください。

前提条件

注記

以下の前提条件ステップは、パブリックアクセシビリティなしで作成された RabbitMQ ブローカーのみに適用されます。パブリックアクセシビリティがあるブローカーを作成している場合は、スキップすることができます。

VPC 属性 を有効にする

VPC 内でブローカーにアクセスできることを確実にするには、enableDnsHostnames および enableDnsSupport VPC 属性を有効にする必要があります。詳細については、Amazon VPC ユーザーガイドの「VPC の DNS サポート」を参照してください。

インバウンド接続を有効にする

  1. Amazon MQ コンソールにサインインします。

  2. ブローカーのリストからブローカーの名前 (MyBroker など) を選択します。

  3. [MyBroker] ページの [Connections] (接続) セクションで、ブローカーのウェブコンソール URL とワイヤレベルプロトコルのアドレスとポートをメモします。

  4. [Details] (詳細) セクションの [Security and network] (セキュリティとネットワーク) で、セキュリティグループの名前または をクリックします。

    EC2 ダッシュボードの [セキュリティグループ] ページが表示されます。

  5. セキュリティグループのリストから、セキュリティグループを選択します。

  6. ページ下部で、[インバウンド] を選択し、次に [編集] を選択します。

  7. [Edit inbound rules] (インバウンドルールの編集) ダイアログボックスで、パブリックアクセスを許可する URL またはエンドポイントごとにルールを追加します (以下の例は、これをブローカーのウェブコンソールに対して行う方法を説明しています)。

    1. [Add Rule] (ルールの追加) をクリックします。

    2. [タイプ] で、[カスタム TCP] を選択します。

    3. [Source] (ソース) では、[Custom] (カスタム) が選択された状態のままにしておき、ウェブコンソールにアクセスできるようにするシステムの IP アドレスを入力します (192.0.2.1 など)。

    4. [Save] を選択します。

      これで、ブローカーはインバウンド接続を受け入れることができます。

Java の依存関係を追加する

ビルドの自動化のために Apache Maven を使用している場合は、以下の依存関係を pom.xml ファイルに追加します。Apache Maven のプロジェクトオブジェクトモデルファイルの詳細については、「Introduction to the POM」を参照してください。

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>

ビルドの自動化のために Gradle を使用している場合は、以下の依存関係を宣言します。

dependencies { compile 'com.rabbitmq:amqp-client:5.9.0' }

ConnectionChannel クラスをインポートする

RabbitMQ Java クライアントは、そのトップレベルパッケージとして com.rabbitmq.client を使用し、それぞれが AMQP 0-9-1 接続とチャネルを表す Connection および Channel API クラスがあります。以下の例にあるように、使用する前に ConnectionChannel クラスをインポートします。

import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;

ConnectionFactory を作成してブローカーに接続する

以下の例を使用して、所定のパラメータで ConnectionFactory クラスのインスタンスを作成します。setHost メソッドを使用して、先ほどメモしておいたブローカーエンドポイントを設定します。AMQPS のワイヤレベル接続には、ポート 5671 を使用します。

ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); //Replace the URL with your information factory.setHost("b-c8352341-ec91-4a78-ad9c-a43f23d325bb.mq.us-west-2.amazonaws.com"); factory.setPort(5671); // Allows client to establish a connection over TLS factory.useSslProtocol() // Create a connection Connection conn = factory.newConnection(); // Create a channel Channel channel = conn.createChannel();

エクスチェンジにメッセージを発行する

エクスチェンジにメッセージを発行するには、Channel.basicPublish を使用できます。以下の例では、AMQP Builder クラスを使用して、content-type が plain/text のメッセージプロパティオブジェクトを構築します。

byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .contentType("text/plain") .userId("userId") .build(), messageBodyBytes);
注記

BasicProperties は自動生成されたホルダークラス AMQP の内部クラスであることに注意してください。

キューにサブスクライブしてメッセージを受信する

メッセージは、Consumer インターフェイスを使用してキューにサブスクライブすることによって受信できます。サブスクライブすると、メッセージが到着すると同時に自動配信されます。

Consumer を実装する最も簡単な方法は、サブクラス DefaultConsumer の使用です。以下の例にあるように、DefaultConsumer オブジェクトは、サブスクリプションをセットアップするための basicConsume コールの一部として渡すことがきます。

boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });
注記

autoAck = false を指定したので、Consumer に配信されたメッセージを承認する必要があります。これは、上記の例にあるように、handleDelivery で実行することが最も便利です。

接続を閉じてブローカーへの接続を切断する

RabbitMQ ブローカーへの接続を切断するには、以下に示すように、チャネルと接続の両方を閉じます。

channel.close(); conn.close();
注記

RabbitMQ Java クライアントライブラリの使用に関する詳細については、RabbitMQ Java Client API Guide を参照してください

ステップ 3: (オプション) AWS Lambda 関数に接続する

AWS Lambda は、Amazon MQ ブローカーに接続し、ブローカーからメッセージを取り込むことができます。ブローカーを Lambda に接続するときは、キューからメッセージを読み取り、関数 synchronously を呼び出すイベントソースマッピングを作成します。作成するイベントソースマッピングは、ブローカーからメッセージをバッチで読み取り、それらを JSON オブジェクト形式の Lambda ペイロードに変換します。

ブローカーを Lambda 関数に接続する

  1. Lambda 関数 execution role に以下の IAM ロール許可を追加します。

    注記

    必要な IAM 許可がない場合、関数は Amazon MQ リソースからレコードを正常に読み取ることができません。

  2. (オプション) パブリックアクセシビリティがないブローカーを作成した場合は、次のいずれかを実行して、Lambda のブローカーへの接続を許可する必要があります。

  3. AWS Management Consoleを使用して、Lambda 関数のイベントソースとしてブローカーを設定します。create-event-source-mapping AWS Command Line Interface コマンドを使用することもできます。

  4. ブローカーから取り込まれたメッセージを処理するための Lambda 関数のコードをいくつか記述します。イベントソースマッピングによって取得される Lambda ペイロードは、ブローカーのエンジンタイプに依存します。以下は、Amazon MQ for RabbitMQ キューの Lambda ペイロードの例です。

    注記

    この例では、test がキューの名前で、/ がデフォルト仮想ホストの名前です。メッセージを受信すると、イベントソースは test::/ の下にメッセージを一覧表示します。

    { "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "test::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 } "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }

Amazon MQ の Lambda への接続、Amazon MQ イベントソースに対して Lambda がサポートするオプション、およびイベントソースマッピングエラーの詳細については、AWS Lambda デベロッパーガイドの「Amazon MQ で Lambda を使用する」を参照してください。

ステップ 4: ブローカーを削除する

Amazon MQ ブローカーを使用しない (かつ近い将来使用することがないと思われる) 場合は、ブローカーを Amazon MQ から削除して AWS 料金を削減することがベストプラクティスです。

以下の例では、AWS Management Consoleを使用してブローカーを削除する方法を説明します。

  1. Amazon MQ コンソールにサインインします。

  2. ブローカーのリストからブローカー (MyBroker など) を選択して、[Delete] (削除) をクリックします。

  3. [Delete MyBroker?] (MyBroker を削除しますか?) ダイアログボックスで、delete と入力してから [Delete] (削除) をクリックします。

    ブローカーの削除には約 5 分かかります。

次のステップ

ブローカーを作成してアプリケーションを接続し、メッセージを送受信したので、次の操作を試してください。

Amazon MQ のベストプラクティスAmazon MQ REST API をよく理解した上で、Amazon MQ​ への移行を計画することもできます。