Python と Amazon S3 を使用した大量の Amazon SQS メッセージの管理
Amazon S3 で Python 用 Amazon SQS Amazon SQS 拡張クライアントライブラリ
Python 用 Amazon SQS 拡張クライアントライブラリでは、次のことができます。
-
ペイロードを常に Amazon S3 に保存するか、ペイロードサイズが 256 KB を超えた場合にのみ Amazon S3 に保存するかを指定する
-
Amazon S3 バケットに保存されている単一のメッセージオブジェクトを参照するメッセージを送信する
-
Amazon S3 バケットから対応するペイロードオブジェクトを取得する
-
Amazon S3 バケットから対応するペイロードオブジェクトを削除する
前提条件
Python 用 Amazon SQS 拡張クライアントライブラリを使用するための前提条件は以下のとおりです。
-
必要な認証情報を持つ AWS アカウント。AWS アカウントを作成するには、AWS のホームページ
に移動して、[AWS アカウントを作成] を選択します。手順に従います。認証情報の詳細については、「認証情報 」を参照してください。 -
AWS SDK: このページの例では、AWS Python SDK Boto3 を使用しています。SDK をインストールしてセットアップするには、「AWS SDK for Python デベロッパーガイド」の「AWS SDK for Python
」ドキュメントを参照してください。 -
Python 3.x (または以降) および
pip。 -
Python 用 Amazon SQS 拡張クライアントライブラリ (PyPI
から利用可能)。
注記
Python 用 Amazon SQS 拡張クライアントライブラリを使用して Amazon S3 で Amazon SQS メッセージを管理できるのは、AWS SDK for Python を使用した場合のみです。AWS CLI、Amazon SQS コンソール、Amazon SQS HTTP API、またはその他いずれの SDK を使用しても管理できません。
メッセージストレージの設定
Amazon SQS 拡張クライアントは、以下のメッセージ属性を使用して Amazon S3 のメッセージストレージオプションを設定します。
-
large_payload_support: 大量のメッセージを保存するための Amazon S3 バケットの名前。 -
always_through_s3:Trueの場合、すべてのメッセージは Amazon S3 に保存されます。Falseの場合、256 KB 未満のメッセージは s3 バケットにシリアル化されません。デフォルトはFalseです。 -
use_legacy_attribute:Trueの場合、すべての発行済みメッセージは、現在の予約済みメッセージ属性 (ExtendedPayloadSize) ではなく、レガシーの予約済みメッセージ属性 (SQSLargePayloadSize) を使用します。
Python 用拡張クライアントライブラリを使用した大量の Amazon SQS メッセージの管理
次の例では、Amazon S3 バケットをランダムな名前で作成します。次に、MyQueue という名前の Amazon SQS を作成し、S3 バケットに保存されている 256 KB を超えるメッセージをキューに送信します。最後に、コードはメッセージを取得し、そのメッセージに関する情報を返してから、メッセージ、キュー、バケットを削除します。
import boto3 import sqs_extended_client #Set the Amazon SQS extended client configuration with large payload. sqs_extended_client = boto3.client("sqs", region_name="us-east-1") sqs_extended_client.large_payload_support = "amzn-s3-demo-bucket" sqs_extended_client.use_legacy_attribute = False # Create an SQS message queue for this example. Then, extract the queue URL. queue = sqs_extended_client.create_queue( QueueName = "MyQueue" ) queue_url = sqs_extended_client.get_queue_url( QueueName = "MyQueue" )['QueueUrl'] # Create the S3 bucket and allow message objects to be stored in the bucket. sqs_extended_client.s3_client.create_bucket(Bucket=sqs_extended_client.large_payload_support) # Sending a large message small_message = "s" large_message = small_message * 300000 # Shall cross the limit of 256 KB send_message_response = sqs_extended_client.send_message( QueueUrl=queue_url, MessageBody=large_message ) assert send_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Receiving the large message receive_message_response = sqs_extended_client.receive_message( QueueUrl=queue_url, MessageAttributeNames=['All'] ) assert receive_message_response['Messages'][0]['Body'] == large_message receipt_handle = receive_message_response['Messages'][0]['ReceiptHandle'] # Deleting the large message # Set to True for deleting the payload from S3 sqs_extended_client.delete_payload_from_s3 = True delete_message_response = sqs_extended_client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) assert delete_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Deleting the queue delete_queue_response = sqs_extended_client.delete_queue( QueueUrl=queue_url ) assert delete_queue_response['ResponseMetadata']['HTTPStatusCode'] == 200