Using Python Pika with Amazon MQ for RabbitMQ
The following tutorial shows how you can set up a Python Pika
Topics
Prerequisites
To complete the steps in this tutorial, you need the following prerequisites:
An Amazon MQ for RabbitMQ broker. For more information, see Creating an Amazon MQ for RabbitMQ broker.
Python 3
installed for your operating system. -
Pika
installed using Python pip
. To install Pika, open a new terminal window and run the following.$
python3 -m pip install pika
Permissions
For this tutorial, you need at least one Amazon MQ for RabbitMQ broker user with permission to write to, and read from, a vhost. The following table describes the neccessary minimum permissions as regular expression (regexp) patterns.
Tags | Configure regexp | Write regexp | Read regexp |
---|---|---|---|
none |
|
.* |
.* |
The user permissions listed provide only read and write permissions to the user, without granting access to the management plugin to perform administrative
operations on the broker. You can further restrict permissions by providing regexp patterns that limit the user's access to specified queues. For example, if you
change the read regexp pattern to ^[hello world].*
, the user will only have permission to read from queues that start with hello world
.
For more information about creating RabbitMQ users and managing user tags and permissions, see User.
Step one: Create a basic Python Pika client
To create a Python Pika client base class that defines a constructor and provides the SSL context necessary for TLS configuration when interacting with an Amazon MQ for RabbitMQ broker, do the following.
-
Open a new terminal window, create a new directory for your project, and navigate to the directory.
$
mkdir pika-tutorial
$
cd pika-tutorial
-
Create a new file,
basicClient.py
, that contains the following Python code.import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()
You can now define additional classes for your publisher and consumer that inherit from BasicPikaClient
.
Step two: Create a publisher and send a message
To create a publisher that declares a queue, and sends a single message, do the following.
-
Copy the contents of the following code sample, and save locally as
publisher.py
in the same directory you created in the previous step.from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()
The
BasicMessageSender
class inherits fromBasicPikaClient
and implements additional methods for delaring a queue, sending a message to the queue, and closing connections. The code sample routes a message to the default exchange, with a routing key equal to the name of the queue. -
Under
if __name__ == "__main__":
, replace the parameters passed to theBasicMessageSender
constructor statement with the following information.-
<broker-id>
– The unique ID that Amazon MQ generates for the broker. You can parse the ID from your broker ARN. For example, given the following ARN,arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
, the broker ID would beb-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
. -
<username>
– The username for a broker user with sufficient permissions to write messages to the broker. -
<password>
– The password for a broker user with sufficient permissions to write messages to the broker. -
<region>
– The AWS region in which you created your Amazon MQ for RabbitMQ broker. For example,us-west-2
.
-
-
Run the following command in the same directory you created
publisher.py
.$
python3 publisher.py
If the code runs successfully, you will see the following output in your terminal window.
Trying to declare queue(hello world queue)... Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'
Step three: Create a consumer and recieve a message
To create a consumer that recieves a single message from the queue, do the following.
-
Copy the contents of the following code sample, and save locally as
consumer.py
in the same directory.from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()
Similar to the the publisher you created in the previous step,
BasicMessageReciever
inherits fromBasicPikaClient
and implements additional methods for recieving a single message, and closing connections. -
Under the
if __name__ == "__main__":
statement, replace the parameters passed to theBasicMessageReciever
constructor with your information. -
Run the following command in your project directory.
$
python3 consumer.py
If the code runs successfully, you will see the message body, and headers including the routing key, displayed in your terminal window.
<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'
Step four: (Optional) Set up an event loop and consume messages
To consume multiple messages from a queue, use Pika's basic_consume
-
In
consumer.py
, add the following method definition to theBasicMessageReceiver
class.def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
-
In
consumer.py
, underif __name__ == "__main__":
, invoke theconsume_messages
method you defined in the previous step.if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
-
Run
consumer.py
again, and if successful, the queued messages will be displayed in your terminal window.[*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!' [x] Received b'Hello World!' ...
What's next?
-
For more information about other supported RabbitMQ client libraries, see RabbitMQ Client Documentation
on the RabbitMQ website.