Menggunakan Python Pika dengan Amazon MQ for RabbitMQ - Amazon MQ

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Python Pika dengan Amazon MQ for RabbitMQ

Tutorial berikut menunjukkan bagaimana Anda dapat mengatur klien Python Pika dengan TLS dikonfigurasi untuk terhubung ke Amazon MQ untuk broker RabbitMQ. Pika adalah implementasi Python dari AMQP 0-9-1 protokol untuk RabbitMQ. Tutorial ini memandu Anda melalui menginstal Pika, mendeklarasikan antrian, menyiapkan penerbit untuk mengirim pesan ke pertukaran default broker, dan menyiapkan konsumen untuk menerima pesan dari antrian.

Prasyarat

Untuk menyelesaikan langkah-langkah dalam tutorial ini, Anda memerlukan prasyarat berikut:

  • Broker Amazon MQ for RabbitMQ. Untuk informasi selengkapnya, lihat Membuat broker Amazon MQ untuk RabbitMQ.

  • Python 3 diinstal untuk sistem operasi Anda.

  • Pika diinstal menggunakan Pythonpip. Untuk menginstal Pika, buka jendela terminal baru dan jalankan yang berikut ini.

    $ python3 -m pip install pika

Izin

Untuk tutorial ini, Anda memerlukan setidaknya satu pengguna broker Amazon MQ for RabbitMQ dengan izin untuk menulis, dan membaca dari, vhost. Tabel berikut menjelaskan izin minimum yang diperlukan sebagai pola ekspresi reguler (regexp).

Tanda Konfigurasikan regexp Tulis regexp Baca regexp
none .* .*

Izin pengguna yang terdaftar hanya menyediakan izin baca dan tulis kepada pengguna, tanpa memberikan akses ke plugin manajemen untuk melakukan operasi administratif pada broker. Anda selanjutnya dapat membatasi izin dengan menyediakan pola regexp yang membatasi akses pengguna ke antrian yang ditentukan. Misalnya, jika Anda mengubah pola baca regexp menjadi^[hello world].*, pengguna hanya akan memiliki izin untuk membaca dari antrian yang dimulai denganhello world.

Untuk informasi selengkapnya tentang cara membuat pengguna RabbitMQ serta mengelola tanda dan izin pengguna, lihat Amazon MQ untuk pengguna broker RabbitMQ.

Langkah pertama: Buat klien Python Pika dasar

Untuk membuat kelas basis klien Python Pika yang mendefinisikan konstruktor dan menyediakan konteks SSL yang diperlukan untuk konfigurasi TLS saat berinteraksi dengan broker Amazon MQ untuk RabbitMQ, lakukan hal berikut.

  1. Buka jendela terminal baru, membuat direktori baru untuk proyek Anda, dan arahkan ke direktori.

    $ mkdir pika-tutorial $ cd pika-tutorial
  2. Buat file baru,basicClient.py, yang berisi kode Python berikut.

    import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()

Anda sekarang dapat menentukan kelas tambahan untuk penerbit dan konsumen Anda yang mewarisi dariBasicPikaClient.

Langkah kedua: Membuat penerbit dan mengirimkan pesan

Untuk membuat penerbit yang menyatakan antrian, dan mengirim satu pesan, lakukan hal berikut.

  1. Salin konten dari contoh kode berikut, dan simpan secara lokal sepertipublisher.py dalam direktori yang sama dengan yang Anda buat di langkah sebelumnya.

    from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()

    BasicMessageSenderKelas mewarisi dariBasicPikaClient dan mengimplementasikan metode tambahan untuk delaring antrian, mengirim pesan ke antrian, dan menutup koneksi. Contoh kode merutekan pesan ke pertukaran default, dengan kunci routing sama dengan nama antrian.

  2. Di bawahif __name__ == "__main__":, ganti parameter yang dilewatkan ke pernyataanBasicMessageSender konstruktor dengan informasi berikut.

    • <broker-id> – ID unik yang dihasilkan Amazon MQ untuk broker. Anda dapat mengurai ID dari ARN broker. Misalnya, dengan ARN berikut, arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9, ID broker akan menjadi b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9.

    • <username>- Nama pengguna untuk pengguna broker dengan izin yang cukup untuk menulis pesan ke broker.

    • <password>- Kata sandi untuk pengguna broker dengan izin yang cukup untuk menulis pesan ke broker.

    • <region>-AWS Wilayah tempat Anda membuat Amazon MQ untuk broker RabbitMQ. Sebagai contoh, us-west-2.

  3. Jalankan perintah berikut di direktori yang sama yang Anda buatpublisher.py.

    $ python3 publisher.py

    Jika kode berjalan dengan sukses, Anda akan melihat output sebagai berikut di jendela terminal Anda.

    Trying to declare queue(hello world queue)...
    Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'

Langkah ketiga: Buat konsumen dan terima pesan

Untuk membuat konsumen yang menerima satu pesan dari antrian, lakukan hal berikut.

  1. Salin konten dari contoh kode berikut, dan simpan secara lokal seperticonsumer.py dalam direktori yang sama.

    from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()

    Mirip dengan penerbit yang Anda buat pada langkah sebelumnya,BasicMessageReciever mewarisi dariBasicPikaClient dan mengimplementasikan metode tambahan untuk menerima satu pesan, dan menutup koneksi.

  2. Di bawahif __name__ == "__main__": pernyataan tersebut, ganti parameter yang diteruskan keBasicMessageReciever konstruktor dengan informasi Anda.

  3. Jalankan perintah berikut di direktori proyek Anda.

    $ python3 consumer.py

    Jika kode berjalan berhasil, Anda akan melihat isi pesan, dan header termasuk kunci routing, ditampilkan di jendela terminal Anda.

    <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'

Langkah keempat: (Opsional) Mengatur loop peristiwa dan mengkonsumsi pesan

Untuk mengkonsumsi beberapa pesan dari antrian, gunakan basic_consumemetode Pika dan fungsi callback seperti yang ditunjukkan dalam berikut ini

  1. Diconsumer.py, tambahkan definisi metode berikut keBasicMessageReceiver kelas.

    def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
  2. Diconsumer.py, di bawahif __name__ == "__main__":, panggilconsume_messages metode yang Anda tetapkan pada langkah sebelumnya.

    if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
  3. Jalankanconsumer.py lagi, dan jika berhasil, pesan antrian akan ditampilkan di jendela terminal Anda.

    [*] Waiting for messages. To exit press CTRL+C
    [x] Received b'Hello World!'
    [x] Received b'Hello World!'
    ...

Apa selanjutnya?