Amazon MQ for RabbitMQ でPython Pika を使う - Amazon MQ

Amazon MQ for RabbitMQ でPython Pika を使う

次のチュートリアルでは、Amazon MQ for RabbitMQ ブローカーに接続するように構成された TLS を使用して Python Pika クライアントをセットアップする方法を示しています。Pika は RabbitMQ のための AMQP 0-9-1 プロトコルの Python 実装です。このチュートリアルでは、Pika のインストール、キューの宣言、ブローカーのデフォルトエクスチェンジにメッセージを送信するパブリッシャーの設定、およびキューからメッセージを受信するようにコンシューマを設定する手順を説明します。

Prerequisites

このチュートリアルの最初のステップを完了するには、以下のものが必要です。

  • Amazon MQ for RabbitMQ ブローカー。詳細については、「Amazon MQ for RabbitMQ ブローカーを作成する」を参照してください。

  • オペレーティングシステム用に Python 3 がインストールされています。

  • Python pip を使用して、Pika がインストールされました。Pika をインストールするには、新しいターミナルウィンドウを開き、以下を実行します。

    $ python3 -m pip install pika

Permissions

このチュートリアルでは、vhost への書き込みおよび読み取りの許可を持つ Amazon MQ for RabbitMQ ブローカーユーザーが少なくとも 1 人必要です。以下の表は、正規表現 (regexp) パターンとして必要な最低限の許可を説明しています。

タグ 設定 regexp 書き込み regexp 読み込み regexp
none .* .*

リストされているユーザー許可は、ブローカで管理オペレーションを実行するための管理プラグインへのアクセスを付与することなく、ユーザーに読み取りおよび書き込み許可のみを提供します。特定のキューへのユーザーのアクセスを制限する正規表現パターンを提供することで、許可をさらに制限できます。例えば、読み取り regexp パターンを ^[hello world].* に変更する場合、ユーザーには hello world で始まるキューからの読み取り許可のみが付与されます。

RabbitMQ ユーザーの作成、およびユーザータグと許可の管理の詳細については、「ユーザー」を参照してください。

ステップ 1: 基本的な Python Pika クライアントを作成する

Amazon MQ for RabbitMQ ブローカーと対話するときに、コンストラクタを定義し、TLS 設定に必要な SSL コンテキストを提供する Python Pika クライアント基本クラスを作成するには、次の手順を実行します。

  1. 新しいターミナルウィンドウを開き、プロジェクトの新しいディレクトリを作成し、そのディレクトリに移動します。

    $ mkdir pika-tutorial $ cd pika-tutorial
  2. 以下の Python コードを含む basicClient.py というファイルを作成します。

    import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()

パブリッシャーとコンシューマに対して、BasicPikaClient から継承する追加のクラスを定義できるようになりました。

ステップ 2: パブリッシャーを作成してメッセージを送信する

キューを宣言し、1 つのメッセージを送信するパブリッシャを作成するには、次の手順を実行します。

  1. 次のコードサンプルの内容をコピーし、前のステップで作成した同じディレクトリで、publisher.py と名前を付けてローカルに保存します。

    from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()

    BasicMessageSender クラスは BasicPikaClient から継承され、キューの宣言、キューへのメッセージの送信、および接続を閉じるための追加のメソッドを実装します。コードサンプルでは、キューの名前と等しいルーティングキーを使用して、メッセージをデフォルトの交換にルーティングします。

  2. [if __name__ == "__main__":] で、渡されたパラメータを次の情報を含む BasicMessageSender コンストラクターステートメントで置換します。

    • <broker-id> – Amazon MQ がブローカー用に生成する一意の ID です。ID は、ブローカー ARN から解析できます。例えば、arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9 という ARN の場合、ブローカー ID は b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9 になります。

    • <username> - ブローカにメッセージを書き込むのに十分な許可を持つブローカユーザーのユーザー名。

    • <password> - ブローカにメッセージを書き込むのに十分な許可を持つブローカユーザーのパスワード。

    • <region> - Amazon MQ for RabbitMQ ブローカーを作成した AWS リージョン。例えば、 。us-west-2

  3. publisher.py を作成した同じディレクトリで次のコマンドを実行します。

    $ python3 publisher.py

    コードが正常に実行された場合、ターミナルウィンドウに次の出力が表示されます。

    Trying to declare queue(hello world queue)...
    Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'

ステップ 3: コンシューマを作成してメッセージを受信する

キューから 1 つのメッセージを受信するコンシューマを作成するには、次の手順を実行します。

  1. 次のコードサンプルの内容をコピーし、同じディレクトリで、consumer.py と名前を付けてローカルに保存します。

    from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()

    前のステップで作成したパブリッシャーと同様に、BasicMessageRecieverBasicPikaClient から継承し、単一のメッセージを受信し、接続を閉じるための追加のメソッドを実装します。

  2. if __name__ == "__main__": ステートメントで、渡されたパラメータを次の情報を含む BasicMessageReciever コンストラクターに置換します。

  3. プロジェクトディレクトリで次のコマンドを実行します。

    $ python3 consumer.py

    コードが正常に実行されると、メッセージ本文とルーティングキーを含むヘッダーがターミナルウィンドウに表示されます。

    <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'

ステップ 4: (オプション) イベントループを設定し、メッセージを消費する

キューから複数のメッセージを消費するには、Pika の basic_consume メソッドと、次に示すコールバック関数を使用します

  1. consumer.py で、BasicMessageReceiver クラスに以下のメソッド定義を追加します。

    def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
  2. consumer.pyif __name__ == "__main__": の下で、前のステップで定義した consume_msesages メソッドを呼び出します。

    if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
  3. consumer.py をもう一度実行し、成功すると、キューに入れられたメッセージがターミナルウィンドウに表示されます。

    [*] Waiting for messages. To exit press CTRL+C
    [x] Received b'Hello World!'
    [x] Received b'Hello World!'
    ...

次のステップ