Como usar Python Pika com o Amazon MQ para RabbitMQ - Amazon MQ

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como usar Python Pika com o Amazon MQ para RabbitMQ

O tutorial a seguir mostra como você pode configurar um cliente Python Pika com o TLS configurado para estabelecer conexão com o agente Amazon MQ para RabbitMQ. O Pika é uma implementação Python do protocolo AMQP 0-9-1 para RabbitMQ. Este tutorial orienta você durante a instalação do Pika, declarando uma fila, configurando um publicador para enviar mensagens para a troca padrão do agente e configurar um consumidor para receber mensagens da fila.

Pré-requisitos

Para concluir as etapas neste tutorial, você precisa dos seguintes pré-requisitos:

  • Um agente Amazon MQ para RabbitMQ. Para mais informações, consulte Criar um agente Amazon MQ para RabbitMQ.

  • O Python 3 instalado para o seu sistema operacional.

  • O Pika instalado usando o Python pip. Para instalar o Pika, abra uma nova janela de terminal e execute o seguinte.

    $ python3 -m pip install pika

Permissões

Para este tutorial, você precisa de pelo menos um usuário do agente Amazon MQ para RabbitMQ com permissão para gravação em e leitura de um vhost. A tabela a seguir descreve as permissões mínimas necessárias como padrões de expressão regular (regexp).

Tags Configurar regexp Escrever regexp Ler regexp
none .* .*

As permissões de usuário listadas fornecem apenas permissões de leitura e gravação para o usuário, sem conceder acesso ao plugin de gerenciamento para executar operações administrativas no agente. Você pode restringir ainda mais as permissões fornecendo padrões regexp que limitem o acesso do usuário às filas especificadas. Por exemplo, se você alterar o padrão regexp de leitura para ^[hello world].*, o usuário só terá permissão de leitura para as filas que começam com hello world.

Para obter mais informações sobre criar usuários RabbitMQ e gerenciar etiquetas e permissões de usuário, consulte Amazon MQ para usuários do broker RabbitMQ.

Etapa um: criar um cliente Python Pika básico

Para criar uma classe base de cliente Python Pika que define um construtor e fornece o contexto SSL necessário para a configuração TLS durante a interação com um agente Amazon MQ para RabbitMQ, faça o seguinte.

  1. Abra uma nova janela de terminal, crie um novo diretório para seu projeto e acesse o diretório.

    $ mkdir pika-tutorial $ cd pika-tutorial
  2. Crie um novo arquivo chamado basicClient.py contendo o seguinte código Python.

    import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()

Agora você pode definir classes adicionais para seu publicador e consumidor que herdam de BasicPikaClient.

Etapa dois: criar um publicador e enviar uma mensagem

Para criar um publicador que declara uma fila e envia uma única mensagem, faça o seguinte.

  1. Copie o conteúdo da amostra de código a seguir e salve localmente como publisher.py no mesmo diretório que você criou na etapa anterior.

    from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()

    A classe BasicMessageSender herda de BasicPikaClient e implementa métodos adicionais para declarar uma fila, enviar uma mensagem para a fila e fechar conexões. A amostra de código encaminha uma mensagem para a troca padrão, com uma chave de roteamento igual ao nome da fila.

  2. Em if __name__ == "__main__":, substitua os parâmetros transmitidos para a declaração do construtor BasicMessageSender com as seguintes informações.

    • <broker-id> — O ID exclusivo que o Amazon MQ gera para o agente. Você pode analisar o ID do ARN do seu agente. Por exemplo, considerando o seguinte ARN, arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9, o ID do agente seria b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9.

    • <username>: o nome de usuário de um usuário agente com permissões suficientes para gravação de mensagens no agente.

    • <password>: a senha de um usuário agente com permissões suficientes para gravação de mensagens no agente.

    • <region>: a região da AWS na qual você criou o agente do Amazon MQ para RabbitMQ. Por exemplo, us-west-2.

  3. Execute o seguinte comando no mesmo diretório que você criou publisher.py.

    $ python3 publisher.py

    Se o código for executado com êxito, você verá o resultado a seguir na janela do seu terminal.

    Trying to declare queue(hello world queue)...
    Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'

Etapa três: criar um consumidor e receber uma mensagem

Para criar um consumidor que recebe uma única mensagem da fila, faça o seguinte.

  1. Copie o conteúdo da amostra de código a seguir e salve localmente como consumer.py no mesmo diretório.

    from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()

    De modo semelhante ao publicador que você criou na etapa anterior, BasicMessageReciever herda de BasicPikaClient e implementa métodos adicionais para receber uma única mensagem e fechar conexões.

  2. Na declaração if __name__ == "__main__":, substitua os parâmetros transmitidos ao construtor BasicMessageReciever com suas informações.

  3. Execute o seguinte comando no diretório do projeto.

    $ python3 consumer.py

    Se o código for executado com êxito, você verá o corpo da mensagem e os cabeçalhos, incluindo a chave de roteamento, exibidos na janela do seu terminal.

    <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'

Etapa quatro: (opcional) configurar um loop de eventos e consumir mensagens

Para consumir várias mensagens de uma fila, use o método basic_consume do Pika e uma função de retorno de chamada conforme mostrado a seguir

  1. Em consumer.py, adicione a definição de método a seguir à classe BasicMessageReceiver.

    def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
  2. Em consumer.py, sob if __name__ == "__main__":, invoque o método consume_messages definido por você na etapa anterior.

    if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
  3. Execute consumer.py novamente e, se bem-sucedidas, as mensagens na fila serão exibidas na janela do seu terminal.

    [*] Waiting for messages. To exit press CTRL+C
    [x] Received b'Hello World!'
    [x] Received b'Hello World!'
    ...

Próximas etapas