使用 SDK for Python (Boto3) 的 Amazon SNS 範例 - AWS SDK 程式碼範例

文件 AWS 開發套件範例 GitHub 儲存庫中有更多可用的 AWS SDK 範例

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 SDK for Python (Boto3) 的 Amazon SNS 範例

下列程式碼範例示範如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 Amazon SNS 執行動作和實作常見案例。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會告訴您如何呼叫個別服務函數,但您可以在其相關情境中查看內容中的動作。

案例是向您展示如何呼叫服務中的多個函數或與其他 AWS 服務組合來完成特定任務的程式碼範例。

每個範例均包含完整原始碼的連結,您可在連結中找到如何設定和執行內容中程式碼的相關指示。

動作

以下程式碼範例顯示如何使用 CreateTopic

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_topic(self, name): """ Creates a notification topic. :param name: The name of the topic to create. :return: The newly created topic. """ try: topic = self.sns_resource.create_topic(Name=name) logger.info("Created topic %s with ARN %s.", name, topic.arn) except ClientError: logger.exception("Couldn't create topic %s.", name) raise else: return topic
class SnsWrapper: """Wrapper class for managing Amazon SNS operations.""" def __init__(self, sns_client: Any) -> None: """ Initialize the SnsWrapper. :param sns_client: A Boto3 Amazon SNS client. """ self.sns_client = sns_client @classmethod def from_client(cls) -> 'SnsWrapper': """ Create an SnsWrapper instance using a default boto3 client. :return: An instance of this class. """ sns_client = boto3.client('sns') return cls(sns_client) def create_topic( self, topic_name: str, is_fifo: bool = False, content_based_deduplication: bool = False ) -> str: """ Create an SNS topic. :param topic_name: The name of the topic to create. :param is_fifo: Whether to create a FIFO topic. :param content_based_deduplication: Whether to use content-based deduplication for FIFO topics. :return: The ARN of the created topic. :raises ClientError: If the topic creation fails. """ try: # Add .fifo suffix for FIFO topics if is_fifo and not topic_name.endswith('.fifo'): topic_name += '.fifo' attributes = {} if is_fifo: attributes['FifoTopic'] = 'true' if content_based_deduplication: attributes['ContentBasedDeduplication'] = 'true' response = self.sns_client.create_topic( Name=topic_name, Attributes=attributes ) topic_arn = response['TopicArn'] logger.info(f"Created topic: {topic_name} with ARN: {topic_arn}") return topic_arn except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error creating topic {topic_name}: {error_code} - {e}") raise
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的 CreateTopic

以下程式碼範例顯示如何使用 DeleteTopic

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def delete_topic(topic): """ Deletes a topic. All subscriptions to the topic are also deleted. """ try: topic.delete() logger.info("Deleted topic %s.", topic.arn) except ClientError: logger.exception("Couldn't delete topic %s.", topic.arn) raise
class SnsWrapper: """Wrapper class for managing Amazon SNS operations.""" def __init__(self, sns_client: Any) -> None: """ Initialize the SnsWrapper. :param sns_client: A Boto3 Amazon SNS client. """ self.sns_client = sns_client @classmethod def from_client(cls) -> 'SnsWrapper': """ Create an SnsWrapper instance using a default boto3 client. :return: An instance of this class. """ sns_client = boto3.client('sns') return cls(sns_client) def delete_topic(self, topic_arn: str) -> bool: """ Delete an SNS topic. :param topic_arn: The ARN of the topic to delete. :return: True if successful. :raises ClientError: If the topic deletion fails. """ try: self.sns_client.delete_topic(TopicArn=topic_arn) logger.info(f"Deleted topic: {topic_arn}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'NotFound': logger.warning(f"Topic not found: {topic_arn}") return True # Already deleted else: logger.error(f"Error deleting topic: {error_code} - {e}") raise
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的 DeleteTopic

以下程式碼範例顯示如何使用 ListSubscriptions

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def list_subscriptions(self, topic=None): """ Lists subscriptions for the current account, optionally limited to a specific topic. :param topic: When specified, only subscriptions to this topic are returned. :return: An iterator that yields the subscriptions. """ try: if topic is None: subs_iter = self.sns_resource.subscriptions.all() else: subs_iter = topic.subscriptions.all() logger.info("Got subscriptions.") except ClientError: logger.exception("Couldn't get subscriptions.") raise else: return subs_iter
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的 ListSubscriptions

以下程式碼範例顯示如何使用 ListTopics

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def list_topics(self): """ Lists topics for the current account. :return: An iterator that yields the topics. """ try: topics_iter = self.sns_resource.topics.all() logger.info("Got topics.") except ClientError: logger.exception("Couldn't get topics.") raise else: return topics_iter
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的 ListTopics

以下程式碼範例顯示如何使用 Publish

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

發佈具有屬性的郵件,以便訂閱可以根據屬性進行篩選。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def publish_message(topic, message, attributes): """ Publishes a message, with attributes, to a topic. Subscriptions can be filtered based on message attributes so that a subscription receives messages only when specified attributes are present. :param topic: The topic to publish to. :param message: The message to publish. :param attributes: The key-value attributes to attach to the message. Values must be either `str` or `bytes`. :return: The ID of the message. """ try: att_dict = {} for key, value in attributes.items(): if isinstance(value, str): att_dict[key] = {"DataType": "String", "StringValue": value} elif isinstance(value, bytes): att_dict[key] = {"DataType": "Binary", "BinaryValue": value} response = topic.publish(Message=message, MessageAttributes=att_dict) message_id = response["MessageId"] logger.info( "Published message with attributes %s to topic %s.", attributes, topic.arn, ) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id

發佈根據訂閱者的通訊協定採用不同形式的訊息。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def publish_multi_message( topic, subject, default_message, sms_message, email_message ): """ Publishes a multi-format message to a topic. A multi-format message takes different forms based on the protocol of the subscriber. For example, an SMS subscriber might receive a short version of the message while an email subscriber could receive a longer version. :param topic: The topic to publish to. :param subject: The subject of the message. :param default_message: The default version of the message. This version is sent to subscribers that have protocols that are not otherwise specified in the structured message. :param sms_message: The version of the message sent to SMS subscribers. :param email_message: The version of the message sent to email subscribers. :return: The ID of the message. """ try: message = { "default": default_message, "sms": sms_message, "email": email_message, } response = topic.publish( Message=json.dumps(message), Subject=subject, MessageStructure="json" ) message_id = response["MessageId"] logger.info("Published multi-format message to topic %s.", topic.arn) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id
class SnsWrapper: """Wrapper class for managing Amazon SNS operations.""" def __init__(self, sns_client: Any) -> None: """ Initialize the SnsWrapper. :param sns_client: A Boto3 Amazon SNS client. """ self.sns_client = sns_client @classmethod def from_client(cls) -> 'SnsWrapper': """ Create an SnsWrapper instance using a default boto3 client. :return: An instance of this class. """ sns_client = boto3.client('sns') return cls(sns_client) def publish_message( self, topic_arn: str, message: str, tone_attribute: Optional[str] = None, deduplication_id: Optional[str] = None, message_group_id: Optional[str] = None ) -> str: """ Publish a message to an SNS topic. :param topic_arn: The ARN of the SNS topic. :param message: The message content to publish. :param tone_attribute: Optional tone attribute for message filtering. :param deduplication_id: Optional deduplication ID for FIFO topics. :param message_group_id: Optional message group ID for FIFO topics. :return: The message ID of the published message. :raises ClientError: If the message publication fails. """ try: publish_args = { 'TopicArn': topic_arn, 'Message': message } # Add message attributes if tone is specified if tone_attribute: publish_args['MessageAttributes'] = { 'tone': { 'DataType': 'String', 'StringValue': tone_attribute } } # Add FIFO-specific parameters if message_group_id: publish_args['MessageGroupId'] = message_group_id if deduplication_id: publish_args['MessageDeduplicationId'] = deduplication_id response = self.sns_client.publish(**publish_args) message_id = response['MessageId'] logger.info(f"Published message to topic {topic_arn} with ID: {message_id}") return message_id except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error publishing message to topic: {error_code} - {e}") raise
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的發佈

以下程式碼範例顯示如何使用 SetSubscriptionAttributes

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def add_subscription_filter(subscription, attributes): """ Adds a filter policy to a subscription. A filter policy is a key and a list of values that are allowed. When a message is published, it must have an attribute that passes the filter or it will not be sent to the subscription. :param subscription: The subscription the filter policy is attached to. :param attributes: A dictionary of key-value pairs that define the filter. """ try: att_policy = {key: [value] for key, value in attributes.items()} subscription.set_attributes( AttributeName="FilterPolicy", AttributeValue=json.dumps(att_policy) ) logger.info("Added filter to subscription %s.", subscription.arn) except ClientError: logger.exception( "Couldn't add filter to subscription %s.", subscription.arn ) raise

以下程式碼範例顯示如何使用 Subscribe

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

透過電子郵件地址訂閱某個主題。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def subscribe(topic, protocol, endpoint): """ Subscribes an endpoint to the topic. Some endpoint types, such as email, must be confirmed before their subscriptions are active. When a subscription is not confirmed, its Amazon Resource Number (ARN) is set to 'PendingConfirmation'. :param topic: The topic to subscribe to. :param protocol: The protocol of the endpoint, such as 'sms' or 'email'. :param endpoint: The endpoint that receives messages, such as a phone number (in E.164 format) for SMS messages, or an email address for email messages. :return: The newly added subscription. """ try: subscription = topic.subscribe( Protocol=protocol, Endpoint=endpoint, ReturnSubscriptionArn=True ) logger.info("Subscribed %s %s to topic %s.", protocol, endpoint, topic.arn) except ClientError: logger.exception( "Couldn't subscribe %s %s to topic %s.", protocol, endpoint, topic.arn ) raise else: return subscription

使用篩選條件訂閱主題的佇列。

class SnsWrapper: """Wrapper class for managing Amazon SNS operations.""" def __init__(self, sns_client: Any) -> None: """ Initialize the SnsWrapper. :param sns_client: A Boto3 Amazon SNS client. """ self.sns_client = sns_client @classmethod def from_client(cls) -> 'SnsWrapper': """ Create an SnsWrapper instance using a default boto3 client. :return: An instance of this class. """ sns_client = boto3.client('sns') return cls(sns_client) def subscribe_queue_to_topic( self, topic_arn: str, queue_arn: str, filter_policy: Optional[str] = None ) -> str: """ Subscribe an SQS queue to an SNS topic. :param topic_arn: The ARN of the SNS topic. :param queue_arn: The ARN of the SQS queue. :param filter_policy: Optional JSON filter policy for message filtering. :return: The ARN of the subscription. :raises ClientError: If the subscription fails. """ try: attributes = {} if filter_policy: attributes['FilterPolicy'] = filter_policy response = self.sns_client.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn, Attributes=attributes ) subscription_arn = response['SubscriptionArn'] logger.info(f"Subscribed queue {queue_arn} to topic {topic_arn}") return subscription_arn except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error subscribing queue to topic: {error_code} - {e}") raise
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的訂閱

以下程式碼範例顯示如何使用 Unsubscribe

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def delete_subscription(subscription): """ Unsubscribes and deletes a subscription. """ try: subscription.delete() logger.info("Deleted subscription %s.", subscription.arn) except ClientError: logger.exception("Couldn't delete subscription %s.", subscription.arn) raise
class SnsWrapper: """Wrapper class for managing Amazon SNS operations.""" def __init__(self, sns_client: Any) -> None: """ Initialize the SnsWrapper. :param sns_client: A Boto3 Amazon SNS client. """ self.sns_client = sns_client @classmethod def from_client(cls) -> 'SnsWrapper': """ Create an SnsWrapper instance using a default boto3 client. :return: An instance of this class. """ sns_client = boto3.client('sns') return cls(sns_client) def unsubscribe(self, subscription_arn: str) -> bool: """ Unsubscribe from an SNS topic. :param subscription_arn: The ARN of the subscription to remove. :return: True if successful. :raises ClientError: If the unsubscribe operation fails. """ try: self.sns_client.unsubscribe(SubscriptionArn=subscription_arn) logger.info(f"Unsubscribed: {subscription_arn}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'NotFound': logger.warning(f"Subscription not found: {subscription_arn}") return True # Already unsubscribed else: logger.error(f"Error unsubscribing: {error_code} - {e}") raise
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的取消訂閱

案例

下列程式碼範例示範如何透過互動式應用程式探索 Amazon Textract 輸出。

適用於 Python 的 SDK (Boto3)

顯示如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 Amazon Textract 來偵測文件映像中的文字、表單和資料表元素。輸入影像和 Amazon Textract 輸出會顯示在 Tkinter 應用程式中,可讓您探索偵測到的元素。

  • 將文件影像提交到 Amazon Textract,並探索偵測到元素的輸出。

  • 將影像直接傳送至 Amazon Textract 或透過 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

  • 使用非同步 API 可以在任務完成時啟動將通知發布到 Amazon Simple Notification Service (Amazon SNS) 主題的任務。

  • 輪詢 Amazon Simple Queue Service (Amazon SQS) 佇列以取得任務完成訊息並顯示結果。

如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • Amazon Cognito Identity

  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

下列程式碼範例示範如何建立並發布到 FIFO Amazon SNS 主題。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立 Amazon SNS FIFO 主題,訂閱 Amazon SQS FIFO 和標準佇列到該主題,並向該主題發布訊息。

def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), "FifoThroughputScope": "MessageGroup", }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version":"2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS SDK API 參考》中的下列主題。

下列程式碼範例示範如何使用 Amazon Rekognition 偵測影片中的人物和物件。

適用於 Python (Boto3) 的 SDK

使用 Amazon Rekognition 透過啟動非同步偵測任務來偵測映像中的人臉、物件和人物。此範例也會設定 Amazon Rekognition 以在任務完成時通知 Amazon Simple Notification Service (Amazon SNS) 主題,並訂閱 Amazon Simple Queue Service (Amazon SQS) 佇列到該主題。當佇列收到有關任務的訊息時,會擷取任務並輸出結果。

這個範例在 GitHub 上的檢視效果最佳。如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • Amazon Rekognition

  • Amazon S3

  • Amazon SES

  • Amazon SNS

  • Amazon SQS

下列程式碼範例示範如何使用 Amazon SNS 發佈訊息。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def publish_text_message(self, phone_number, message): """ Publishes a text message directly to a phone number without need for a subscription. :param phone_number: The phone number that receives the message. This must be in E.164 format. For example, a United States phone number might be +12065550101. :param message: The message to send. :return: The ID of the message. """ try: response = self.sns_resource.meta.client.publish( PhoneNumber=phone_number, Message=message ) message_id = response["MessageId"] logger.info("Published message to %s.", phone_number) except ClientError: logger.exception("Couldn't publish message to %s.", phone_number) raise else: return message_id
  • 如需 API 詳細資訊,請參閱 AWS SDK for Python (Boto3) API 參考中的發佈

以下程式碼範例顯示做法:

  • 建立主題 (FIFO 或非 FIFO)。

  • 為主題訂閱多個佇列,並提供套用篩選條件的選擇。

  • 發佈訊息至主題。

  • 輪詢佇列以獲取收到的訊息。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

在命令提示中執行互動式案例。

class TopicsAndQueuesScenario: """Manages the Topics and Queues feature scenario.""" DASHES = "-" * 80 def __init__(self, sns_wrapper: SnsWrapper, sqs_wrapper: SqsWrapper) -> None: """ Initialize the Topics and Queues scenario. :param sns_wrapper: SnsWrapper instance for SNS operations. :param sqs_wrapper: SqsWrapper instance for SQS operations. """ self.sns_wrapper = sns_wrapper self.sqs_wrapper = sqs_wrapper # Scenario state self.use_fifo_topic = False self.use_content_based_deduplication = False self.topic_name = None self.topic_arn = None self.queue_count = 2 self.queue_urls = [] self.subscription_arns = [] self.tones = ["cheerful", "funny", "serious", "sincere"] def run_scenario(self) -> None: """Run the Topics and Queues feature scenario.""" print(self.DASHES) print("Welcome to messaging with topics and queues.") print(self.DASHES) print(f""" In this scenario, you will create an SNS topic and subscribe {self.queue_count} SQS queues to the topic. You can select from several options for configuring the topic and the subscriptions for the queues. You can then post to the topic and see the results in the queues. """) try: # Setup Phase print(self.DASHES) self._setup_topic() print(self.DASHES) self._setup_queues() print(self.DASHES) # Demonstration Phase self._publish_messages() print(self.DASHES) # Examination Phase self._poll_queues_for_messages() print(self.DASHES) # Cleanup Phase self._cleanup_resources() print(self.DASHES) except Exception as e: logger.error(f"Scenario failed: {e}") print(f"There was a problem with the scenario: {e}") print("\nInitiating cleanup...") try: self._cleanup_resources() except Exception as cleanup_error: logger.error(f"Error during cleanup: {cleanup_error}") print("Messaging with topics and queues scenario is complete.") print(self.DASHES) def _setup_topic(self) -> None: """Set up the SNS topic to be used with the queues.""" print("SNS topics can be configured as FIFO (First-In-First-Out).") print("FIFO topics deliver messages in order and support deduplication and message filtering.") print() self.use_fifo_topic = q.ask("Would you like to work with FIFO topics? (y/n): ", q.is_yesno) if self.use_fifo_topic: print(self.DASHES) self.topic_name = q.ask("Enter a name for your SNS topic: ", q.non_empty) print("Because you have selected a FIFO topic, '.fifo' must be appended to the topic name.") print() print(self.DASHES) print(""" Because you have chosen a FIFO topic, deduplication is supported. Deduplication IDs are either set in the message or automatically generated from content using a hash function. If a message is successfully published to an SNS FIFO topic, any message published and determined to have the same deduplication ID, within the five-minute deduplication interval, is accepted but not delivered. For more information about deduplication, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html. """) self.use_content_based_deduplication = q.ask( "Use content-based deduplication instead of entering a deduplication ID? (y/n): ", q.is_yesno ) else: self.topic_name = q.ask("Enter a name for your SNS topic: ", q.non_empty) print(self.DASHES) # Create the topic self.topic_arn = self.sns_wrapper.create_topic( self.topic_name, self.use_fifo_topic, self.use_content_based_deduplication ) print(f"Your new topic with the name {self.topic_name}") print(f" and Amazon Resource Name (ARN) {self.topic_arn}") print(f" has been created.") print() def _setup_queues(self) -> None: """Set up the SQS queues and subscribe them to the topic.""" print(f"Now you will create {self.queue_count} Amazon Simple Queue Service (Amazon SQS) queues to subscribe to the topic.") for i in range(self.queue_count): queue_name = q.ask(f"Enter a name for SQS queue #{i+1}: ", q.non_empty) if self.use_fifo_topic and i == 0: print("Because you have selected a FIFO topic, '.fifo' must be appended to the queue name.") # Create the queue queue_url = self.sqs_wrapper.create_queue(queue_name, self.use_fifo_topic) self.queue_urls.append(queue_url) print(f"Your new queue with the name {queue_name}") print(f" and queue URL {queue_url}") print(f" has been created.") print() if i == 0: print("The queue URL is used to retrieve the queue ARN,") print("which is used to create a subscription.") print(self.DASHES) # Get queue ARN queue_arn = self.sqs_wrapper.get_queue_arn(queue_url) if i == 0: print("An AWS Identity and Access Management (IAM) policy must be attached to an SQS queue,") print("enabling it to receive messages from an SNS topic.") # Set queue policy to allow SNS to send messages self.sqs_wrapper.set_queue_policy_for_topic(queue_arn, self.topic_arn, queue_url) # Set up message filtering if using FIFO subscription_arn = self._setup_subscription_with_filter(i, queue_arn, queue_name) self.subscription_arns.append(subscription_arn) def _setup_subscription_with_filter(self, queue_index: int, queue_arn: str, queue_name: str) -> str: """Set up subscription with optional message filtering.""" filter_policy = None if self.use_fifo_topic: print(self.DASHES) if queue_index == 0: print("Subscriptions to a FIFO topic can have filters.") print("If you add a filter to this subscription, then only the filtered messages") print("will be received in the queue.") print() print("For information about message filtering,") print("see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html") print() print("For this example, you can filter messages by a TONE attribute.") use_filter = q.ask(f"Filter messages for {queue_name}'s subscription to the topic? (y/n): ", q.is_yesno) if use_filter: filter_policy = self._create_filter_policy() subscription_arn = self.sns_wrapper.subscribe_queue_to_topic( self.topic_arn, queue_arn, filter_policy ) print(f"The queue {queue_name} has been subscribed to the topic {self.topic_name}") print(f" with the subscription ARN {subscription_arn}") return subscription_arn def _create_filter_policy(self) -> str: """Create a message filter policy based on user selections.""" print(self.DASHES) print("You can filter messages by one or more of the following TONE attributes.") filter_selections = [] selection_number = 0 while True: print("Enter a number to add a TONE filter, or enter 0 to stop adding filters.") for i, tone in enumerate(self.tones, 1): print(f" {i}. {tone}") selection = q.ask("Your choice: ", q.is_int, q.in_range(0, len(self.tones))) if selection == 0: break elif selection > 0 and self.tones[selection - 1] not in filter_selections: filter_selections.append(self.tones[selection - 1]) print(f"Added '{self.tones[selection - 1]}' to filter list.") if filter_selections: filters = {"tone": filter_selections} return json.dumps(filters) return None def _publish_messages(self) -> None: """Publish messages to the topic with various options.""" print("Now we can publish messages.") keep_sending = True while keep_sending: print() message = q.ask("Enter a message to publish: ", q.non_empty) message_group_id = None deduplication_id = None tone_attribute = None if self.use_fifo_topic: print("Because you are using a FIFO topic, you must set a message group ID.") print("All messages within the same group will be received in the order they were published.") print() message_group_id = q.ask("Enter a message group ID for this message: ", q.non_empty) if not self.use_content_based_deduplication: print("Because you are not using content-based deduplication,") print("you must enter a deduplication ID.") deduplication_id = q.ask("Enter a deduplication ID for this message: ", q.non_empty) # Ask about tone attribute add_attribute = q.ask("Add an attribute to this message? (y/n): ", q.is_yesno) if add_attribute: print("Enter a number for an attribute:") for i, tone in enumerate(self.tones, 1): print(f" {i}. {tone}") selection = q.ask("Your choice: ", q.is_int, q.in_range(1, len(self.tones))) if 1 <= selection <= len(self.tones): tone_attribute = self.tones[selection - 1] # Publish the message message_id = self.sns_wrapper.publish_message( self.topic_arn, message, tone_attribute, deduplication_id, message_group_id ) print(f"Message published with ID: {message_id}") keep_sending = q.ask("Send another message? (y/n): ", q.is_yesno) def _poll_queues_for_messages(self) -> None: """Poll all queues for messages and display results.""" for i, queue_url in enumerate(self.queue_urls): print(f"Polling queue #{i+1} at {queue_url} for messages...") q.ask("Press Enter to continue...") messages = self._poll_queue_for_messages(queue_url) if messages: print(f"{len(messages)} message(s) were received by queue #{i+1}") for j, message in enumerate(messages, 1): print(f" Message {j}:") # Parse the SNS message body to get the actual message try: sns_message = json.loads(message['Body']) actual_message = sns_message.get('Message', message['Body']) print(f" {actual_message}") except (json.JSONDecodeError, KeyError): print(f" {message['Body']}") # Delete the messages self.sqs_wrapper.delete_messages(queue_url, messages) print(f"Messages deleted from queue #{i+1}") else: print(f"No messages received by queue #{i+1}") print(self.DASHES) def _poll_queue_for_messages(self, queue_url: str) -> List[Dict[str, Any]]: """Poll a single queue for messages.""" all_messages = [] max_polls = 3 # Limit polling to avoid infinite loops for poll_count in range(max_polls): messages = self.sqs_wrapper.receive_messages(queue_url, 10) if messages: all_messages.extend(messages) print(f" Received {len(messages)} messages in poll {poll_count + 1}") # Small delay between polls time.sleep(1) else: print(f" No messages in poll {poll_count + 1}") break return all_messages def _cleanup_resources(self) -> None: """Clean up all resources created during the scenario.""" print("Cleaning up resources...") # Delete queues for i, queue_url in enumerate(self.queue_urls): if queue_url: delete_queue = q.ask(f"Delete queue #{i+1} with URL {queue_url}? (y/n): ", q.is_yesno) if delete_queue: try: self.sqs_wrapper.delete_queue(queue_url) print(f"Deleted queue #{i+1}") except Exception as e: print(f"Error deleting queue #{i+1}: {e}") # Unsubscribe from topic for i, subscription_arn in enumerate(self.subscription_arns): if subscription_arn: try: self.sns_wrapper.unsubscribe(subscription_arn) print(f"Unsubscribed subscription #{i+1}") except Exception as e: print(f"Error unsubscribing #{i+1}: {e}") # Delete topic if self.topic_arn: delete_topic = q.ask(f"Delete topic {self.topic_name}? (y/n): ", q.is_yesno) if delete_topic: try: self.sns_wrapper.delete_topic(self.topic_arn) print(f"Deleted topic {self.topic_name}") except Exception as e: print(f"Error deleting topic: {e}") print("Resource cleanup complete.")

建立包裝 Amazon SNS 和 Amazon SQS 操作以在案例中使用的類別。

class SnsWrapper: """Wrapper class for managing Amazon SNS operations.""" def __init__(self, sns_client: Any) -> None: """ Initialize the SnsWrapper. :param sns_client: A Boto3 Amazon SNS client. """ self.sns_client = sns_client @classmethod def from_client(cls) -> 'SnsWrapper': """ Create an SnsWrapper instance using a default boto3 client. :return: An instance of this class. """ sns_client = boto3.client('sns') return cls(sns_client) def create_topic( self, topic_name: str, is_fifo: bool = False, content_based_deduplication: bool = False ) -> str: """ Create an SNS topic. :param topic_name: The name of the topic to create. :param is_fifo: Whether to create a FIFO topic. :param content_based_deduplication: Whether to use content-based deduplication for FIFO topics. :return: The ARN of the created topic. :raises ClientError: If the topic creation fails. """ try: # Add .fifo suffix for FIFO topics if is_fifo and not topic_name.endswith('.fifo'): topic_name += '.fifo' attributes = {} if is_fifo: attributes['FifoTopic'] = 'true' if content_based_deduplication: attributes['ContentBasedDeduplication'] = 'true' response = self.sns_client.create_topic( Name=topic_name, Attributes=attributes ) topic_arn = response['TopicArn'] logger.info(f"Created topic: {topic_name} with ARN: {topic_arn}") return topic_arn except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error creating topic {topic_name}: {error_code} - {e}") raise def subscribe_queue_to_topic( self, topic_arn: str, queue_arn: str, filter_policy: Optional[str] = None ) -> str: """ Subscribe an SQS queue to an SNS topic. :param topic_arn: The ARN of the SNS topic. :param queue_arn: The ARN of the SQS queue. :param filter_policy: Optional JSON filter policy for message filtering. :return: The ARN of the subscription. :raises ClientError: If the subscription fails. """ try: attributes = {} if filter_policy: attributes['FilterPolicy'] = filter_policy response = self.sns_client.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn, Attributes=attributes ) subscription_arn = response['SubscriptionArn'] logger.info(f"Subscribed queue {queue_arn} to topic {topic_arn}") return subscription_arn except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error subscribing queue to topic: {error_code} - {e}") raise def publish_message( self, topic_arn: str, message: str, tone_attribute: Optional[str] = None, deduplication_id: Optional[str] = None, message_group_id: Optional[str] = None ) -> str: """ Publish a message to an SNS topic. :param topic_arn: The ARN of the SNS topic. :param message: The message content to publish. :param tone_attribute: Optional tone attribute for message filtering. :param deduplication_id: Optional deduplication ID for FIFO topics. :param message_group_id: Optional message group ID for FIFO topics. :return: The message ID of the published message. :raises ClientError: If the message publication fails. """ try: publish_args = { 'TopicArn': topic_arn, 'Message': message } # Add message attributes if tone is specified if tone_attribute: publish_args['MessageAttributes'] = { 'tone': { 'DataType': 'String', 'StringValue': tone_attribute } } # Add FIFO-specific parameters if message_group_id: publish_args['MessageGroupId'] = message_group_id if deduplication_id: publish_args['MessageDeduplicationId'] = deduplication_id response = self.sns_client.publish(**publish_args) message_id = response['MessageId'] logger.info(f"Published message to topic {topic_arn} with ID: {message_id}") return message_id except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error publishing message to topic: {error_code} - {e}") raise def unsubscribe(self, subscription_arn: str) -> bool: """ Unsubscribe from an SNS topic. :param subscription_arn: The ARN of the subscription to remove. :return: True if successful. :raises ClientError: If the unsubscribe operation fails. """ try: self.sns_client.unsubscribe(SubscriptionArn=subscription_arn) logger.info(f"Unsubscribed: {subscription_arn}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'NotFound': logger.warning(f"Subscription not found: {subscription_arn}") return True # Already unsubscribed else: logger.error(f"Error unsubscribing: {error_code} - {e}") raise def delete_topic(self, topic_arn: str) -> bool: """ Delete an SNS topic. :param topic_arn: The ARN of the topic to delete. :return: True if successful. :raises ClientError: If the topic deletion fails. """ try: self.sns_client.delete_topic(TopicArn=topic_arn) logger.info(f"Deleted topic: {topic_arn}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'NotFound': logger.warning(f"Topic not found: {topic_arn}") return True # Already deleted else: logger.error(f"Error deleting topic: {error_code} - {e}") raise def list_topics(self) -> list: """ List all SNS topics in the account using pagination. :return: List of topic ARNs. :raises ClientError: If listing topics fails. """ try: topics = [] paginator = self.sns_client.get_paginator('list_topics') for page in paginator.paginate(): topics.extend([topic['TopicArn'] for topic in page.get('Topics', [])]) logger.info(f"Found {len(topics)} topics") return topics except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'AuthorizationError': logger.error("Authorization error listing topics - check IAM permissions") else: logger.error(f"Error listing topics: {error_code} - {e}") raise class SqsWrapper: """Wrapper class for managing Amazon SQS operations.""" def __init__(self, sqs_client: Any) -> None: """ Initialize the SqsWrapper. :param sqs_client: A Boto3 Amazon SQS client. """ self.sqs_client = sqs_client @classmethod def from_client(cls) -> 'SqsWrapper': """ Create an SqsWrapper instance using a default boto3 client. :return: An instance of this class. """ sqs_client = boto3.client('sqs') return cls(sqs_client) def create_queue(self, queue_name: str, is_fifo: bool = False) -> str: """ Create an SQS queue. :param queue_name: The name of the queue to create. :param is_fifo: Whether to create a FIFO queue. :return: The URL of the created queue. :raises ClientError: If the queue creation fails. """ try: # Add .fifo suffix for FIFO queues if is_fifo and not queue_name.endswith('.fifo'): queue_name += '.fifo' attributes = {} if is_fifo: attributes['FifoQueue'] = 'true' response = self.sqs_client.create_queue( QueueName=queue_name, Attributes=attributes ) queue_url = response['QueueUrl'] logger.info(f"Created queue: {queue_name} with URL: {queue_url}") return queue_url except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error creating queue {queue_name}: {error_code} - {e}") raise def get_queue_arn(self, queue_url: str) -> str: """ Get the ARN of an SQS queue. :param queue_url: The URL of the queue. :return: The ARN of the queue. :raises ClientError: If getting queue attributes fails. """ try: response = self.sqs_client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=['QueueArn'] ) queue_arn = response['Attributes']['QueueArn'] logger.info(f"Queue ARN for {queue_url}: {queue_arn}") return queue_arn except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error getting queue ARN: {error_code} - {e}") raise def set_queue_policy_for_topic(self, queue_arn: str, topic_arn: str, queue_url: str) -> bool: """ Set the queue policy to allow SNS to send messages to the queue. :param queue_arn: The ARN of the SQS queue. :param topic_arn: The ARN of the SNS topic. :param queue_url: The URL of the SQS queue. :return: True if successful. :raises ClientError: If setting the queue policy fails. """ try: # Create policy that allows SNS to send messages to the queue policy = { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": "sqs:SendMessage", "Resource": queue_arn, "Condition": { "ArnEquals": { "aws:SourceArn": topic_arn } } } ] } self.sqs_client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': json.dumps(policy) } ) logger.info(f"Set queue policy for {queue_url} to allow messages from {topic_arn}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error setting queue policy: {error_code} - {e}") raise def receive_messages(self, queue_url: str, max_messages: int = 10) -> List[Dict[str, Any]]: """ Receive messages from an SQS queue. :param queue_url: The URL of the queue to receive messages from. :param max_messages: Maximum number of messages to receive (1-10). :return: List of received messages. :raises ClientError: If receiving messages fails. """ try: # Ensure max_messages is within valid range max_messages = max(1, min(10, max_messages)) response = self.sqs_client.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=max_messages, WaitTimeSeconds=2, # Short polling MessageAttributeNames=['All'] ) messages = response.get('Messages', []) logger.info(f"Received {len(messages)} messages from {queue_url}") return messages except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error receiving messages: {error_code} - {e}") raise def delete_messages(self, queue_url: str, messages: List[Dict[str, Any]]) -> bool: """ Delete messages from an SQS queue in batches. :param queue_url: The URL of the queue. :param messages: List of messages to delete. :return: True if successful. :raises ClientError: If deleting messages fails. """ try: if not messages: return True # Build delete entries for batch delete delete_entries = [] for i, message in enumerate(messages): delete_entries.append({ 'Id': str(i), 'ReceiptHandle': message['ReceiptHandle'] }) # Delete messages in batches of 10 (SQS limit) batch_size = 10 for i in range(0, len(delete_entries), batch_size): batch = delete_entries[i:i + batch_size] response = self.sqs_client.delete_message_batch( QueueUrl=queue_url, Entries=batch ) # Check for failures if 'Failed' in response and response['Failed']: for failed in response['Failed']: logger.warning(f"Failed to delete message: {failed}") logger.info(f"Deleted {len(messages)} messages from {queue_url}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error deleting messages: {error_code} - {e}") raise def delete_queue(self, queue_url: str) -> bool: """ Delete an SQS queue. :param queue_url: The URL of the queue to delete. :return: True if successful. :raises ClientError: If the queue deletion fails. """ try: self.sqs_client.delete_queue(QueueUrl=queue_url) logger.info(f"Deleted queue: {queue_url}") return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'AWS.SimpleQueueService.NonExistentQueue': logger.warning(f"Queue not found: {queue_url}") return True # Already deleted else: logger.error(f"Error deleting queue: {error_code} - {e}") raise def list_queues(self, queue_name_prefix: Optional[str] = None) -> List[str]: """ List all SQS queues in the account using pagination. :param queue_name_prefix: Optional prefix to filter queue names. :return: List of queue URLs. :raises ClientError: If listing queues fails. """ try: queue_urls = [] paginator = self.sqs_client.get_paginator('list_queues') page_params = {} if queue_name_prefix: page_params['QueueNamePrefix'] = queue_name_prefix for page in paginator.paginate(**page_params): queue_urls.extend(page.get('QueueUrls', [])) logger.info(f"Found {len(queue_urls)} queues") return queue_urls except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') if error_code == 'AccessDenied': logger.error("Access denied listing queues - check IAM permissions") else: logger.error(f"Error listing queues: {error_code} - {e}") raise def send_message(self, queue_url: str, message_body: str, **kwargs) -> str: """ Send a message to an SQS queue. :param queue_url: The URL of the queue. :param message_body: The message content. :param kwargs: Additional message parameters (DelaySeconds, MessageAttributes, etc.). :return: The message ID. :raises ClientError: If sending the message fails. """ try: send_params = { 'QueueUrl': queue_url, 'MessageBody': message_body, **kwargs } response = self.sqs_client.send_message(**send_params) message_id = response['MessageId'] logger.info(f"Sent message to {queue_url} with ID: {message_id}") return message_id except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') logger.error(f"Error sending message: {error_code} - {e}") raise

下列程式碼範例示範如何建立 Amazon API Gateway 調用的 AWS Lambda 函數。

適用於 Python 的 SDK (Boto3)

此範例顯示如何建立和使用目標為 AWS Lambda 函數的 Amazon API Gateway REST API。Lambda 處理常式會展示如何根據 HTTP 方法來路由;如何從查詢字串、標頭和本文中取得資料;以及如何傳回 JSON 回應。

  • 部署 Lambda 函式。

  • 建立 API Gateway REST API。

  • 建立目標為 Lambda 函式的 REST 資源。

  • 授與許可讓 API Gateway 調用 Lambda 函式。

  • 使用 Request 套件來將請求傳送到 REST API。

  • 清理示範期間建立的所有資源。

這個範例在 GitHub 上的檢視效果最佳。如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • API Gateway

  • DynamoDB

  • Lambda

  • Amazon SNS

下列程式碼範例示範如何建立由 Amazon EventBridge 排程事件調用的 AWS Lambda 函數。

適用於 Python 的 SDK (Boto3)

此範例說明如何將 AWS Lambda 函數註冊為排程 Amazon EventBridge 事件的目標。Lambda 處理常式會將合適的訊息和完整的事件資料寫入 Amazon CloudWatch Logs 中以供日後擷取。

  • 部署 Lambda 函式。

  • 建立一個 EventBridge 排程事件,並將 Lambda 函式做為目標。

  • 授予許可讓 EventBridge 調用 Lambda 函式。

  • 列印 CloudWatch Logs 中的最新資料,以顯示排程調用的結果。

  • 清理示範期間建立的所有資源。

這個範例在 GitHub 上的檢視效果最佳。如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • CloudWatch Logs

  • DynamoDB

  • EventBridge

  • Lambda

  • Amazon SNS

無伺服器範例

下列程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 SNS 主題的訊息來接收所觸發的事件。函數會從事件參數擷取訊息,並記錄每一則訊息的內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 SNS 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for record in event['Records']: process_message(record) print("done") def process_message(record): try: message = record['Sns']['Message'] print(f"Processed message {message}") # TODO; Process your record here except Exception as e: print("An error occurred") raise e