Send and receive batches of messages with Amazon SQS using an AWS SDK - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

Send and receive batches of messages with Amazon SQS using an AWS SDK

The following code example shows how to:

  • Create an Amazon SQS queue.

  • Send batches of messages to the queue.

  • Receive batches of messages from the queue.

  • Delete batches of messages from the queue.

Python
SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create functions to wrap Amazon SQS message functions.

import logging import sys import boto3 from botocore.exceptions import ClientError import queue_wrapper logger = logging.getLogger(__name__) sqs = boto3.resource("sqs") def send_messages(queue, messages): """ Send a batch of messages in a single request to an SQS queue. This request may return overall success even when some messages were not sent. The caller must inspect the Successful and Failed lists in the response and resend any failed messages. :param queue: The queue to receive the messages. :param messages: The messages to send to the queue. These are simplified to contain only the message body and attributes. :return: The response from SQS that contains the list of successful and failed messages. """ try: entries = [ { "Id": str(ind), "MessageBody": msg["body"], "MessageAttributes": msg["attributes"], } for ind, msg in enumerate(messages) ] response = queue.send_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info( "Message sent: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Failed to send: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) except ClientError as error: logger.exception("Send messages failed to queue: %s", queue) raise error else: return response def receive_messages(queue, max_number, wait_time): """ Receive a batch of messages in a single request from an SQS queue. :param queue: The queue from which to receive messages. :param max_number: The maximum number of messages to receive. The actual number of messages received might be less. :param wait_time: The maximum time to wait (in seconds) before returning. When this number is greater than zero, long polling is used. This can result in reduced costs and fewer false empty responses. :return: The list of Message objects received. These each contain the body of the message and metadata and custom attributes. """ try: messages = queue.receive_messages( MessageAttributeNames=["All"], MaxNumberOfMessages=max_number, WaitTimeSeconds=wait_time, ) for msg in messages: logger.info("Received message: %s: %s", msg.message_id, msg.body) except ClientError as error: logger.exception("Couldn't receive messages from queue: %s", queue) raise error else: return messages def delete_messages(queue, messages): """ Delete a batch of messages from a queue in a single request. :param queue: The queue from which to delete the messages. :param messages: The list of messages to delete. :return: The response from SQS that contains the list of successful and failed message deletions. """ try: entries = [ {"Id": str(ind), "ReceiptHandle": msg.receipt_handle} for ind, msg in enumerate(messages) ] response = queue.delete_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle ) except ClientError: logger.exception("Couldn't delete messages from queue %s", queue) else: return response

Use the wrapper functions to send and receive messages in batches.

def usage_demo(): """ Shows how to: * Read the lines from this Python file and send the lines in batches of 10 as messages to a queue. * Receive the messages in batches until the queue is empty. * Reassemble the lines of the file and verify they match the original file. """ def pack_message(msg_path, msg_body, msg_line): return { "body": msg_body, "attributes": { "path": {"StringValue": msg_path, "DataType": "String"}, "line": {"StringValue": str(msg_line), "DataType": "String"}, }, } def unpack_message(msg): return ( msg.message_attributes["path"]["StringValue"], msg.body, int(msg.message_attributes["line"]["StringValue"]), ) print("-" * 88) print("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!") print("-" * 88) queue = queue_wrapper.create_queue("sqs-usage-demo-message-wrapper") with open(__file__) as file: lines = file.readlines() line = 0 batch_size = 10 received_lines = [None] * len(lines) print(f"Sending file lines in batches of {batch_size} as messages.") while line < len(lines): messages = [ pack_message(__file__, lines[index], index) for index in range(line, min(line + batch_size, len(lines))) ] line = line + batch_size send_messages(queue, messages) print(".", end="") sys.stdout.flush() print(f"Done. Sent {len(lines) - 1} messages.") print(f"Receiving, handling, and deleting messages in batches of {batch_size}.") more_messages = True while more_messages: received_messages = receive_messages(queue, batch_size, 2) print(".", end="") sys.stdout.flush() for message in received_messages: path, body, line = unpack_message(message) received_lines[line] = body if received_messages: delete_messages(queue, received_messages) else: more_messages = False print("Done.") if all([lines[index] == received_lines[index] for index in range(len(lines))]): print(f"Successfully reassembled all file lines!") else: print(f"Uh oh, some lines were missed!") queue.delete() print("Thanks for watching!") print("-" * 88)