Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplos de código de Amazon SNS para temas FIFO
Puede utilizar los siguientes ejemplos de código para integrar el caso de uso del ejemplo de administración de precios de piezas de automóviles mediante un tema FIFO de Amazon SNS con una cola FIFO o una cola estándar de Amazon SQS.
Uso de un AWS SDK
Con un AWS SDK, puede crear un tema FIFO de Amazon SNS estableciendo su FifoTopic
atributo en. true
Puede crear una cola FIFO de Amazon SQS al establecer su atributo FifoQueue
a true
. Además, debe agregar el sufijo .fifo
al nombre de cada recurso FIFO. Después de crear un tema o una cola FIFO, no puede convertirlo en un tema o cola estándar.
En el ejemplo de código siguiente, se crean estos recursos de cola FIFO y estándar:
-
El tema FIFO de Amazon SNS que distribuye las actualizaciones de precios
-
Las colas FIFO de Amazon SQS que proporcionan estas actualizaciones a las aplicaciones mayoristas y minoristas
-
La cola estándar de Amazon SQS para la aplicación de análisis que almacena registros, que pueden consultarse para inteligencia empresarial (BI)
-
Las suscripciones FIFO de Amazon SNS que conectan las tres colas al tema
En este ejemplo, se establecen las Políticas de filtrado en las suscripciones. Si prueba el ejemplo al publicar un mensaje en el tema, asegúrese de publicar el mensaje con el atributo de business
. Especifique retail
o wholesale
en el valor de atributo. De lo contrario, el mensaje se filtra y no se entrega a las colas suscritas. Para obtener más información, consulte Filtrado de mensajes de Amazon SNS para temas FIFO.
- Java
-
- SDK para Java 2.x
-
Este ejemplo
-
crea un tema FIFO de Amazon SNS, dos colas FIFO de Amazon SQS y una cola estándar.
-
suscribe las colas al tema y publica un mensaje en el tema.
La prueba verifica la recepción del mensaje en cada cola. El ejemplo completo también muestra la incorporación de políticas de acceso y, al final, elimina los recursos.
public class PriceUpdateExample {
public final static SnsClient snsClient = SnsClient.create();
public final static SqsClient sqsClient = SqsClient.create();
public static void main(String[] args) {
final String usage = "\n" +
"Usage: " +
" <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" +
"Where:\n" +
" fifoTopicName - The name of the FIFO topic that you want to create. \n\n" +
" wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n"
+
" retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" +
" analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n";
if (args.length != 4) {
System.out.println(usage);
System.exit(1);
}
final String fifoTopicName = args[0];
final String wholeSaleQueueName = args[1];
final String retailQueueName = args[2];
final String analyticsQueueName = args[3];
// For convenience, the QueueData class holds metadata about a queue: ARN, URL,
// name and type.
List<QueueData> queues = List.of(
new QueueData(wholeSaleQueueName, QueueType.FIFO),
new QueueData(retailQueueName, QueueType.FIFO),
new QueueData(analyticsQueueName, QueueType.Standard));
// Create queues.
createQueues(queues);
// Create a topic.
String topicARN = createFIFOTopic(fifoTopicName);
// Subscribe each queue to the topic.
subscribeQueues(queues, topicARN);
// Allow the newly created topic to send messages to the queues.
addAccessPolicyToQueuesFINAL(queues, topicARN);
// Publish a sample price update message with payload.
publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables");
// Clean up resources.
deleteSubscriptions(queues);
deleteQueues(queues);
deleteTopic(topicARN);
}
public static String createFIFOTopic(String topicName) {
try {
// Create a FIFO topic by using the SNS service client.
Map<String, String> topicAttributes = Map.of(
"FifoTopic", "true",
"ContentBasedDeduplication", "false");
CreateTopicRequest topicRequest = CreateTopicRequest.builder()
.name(topicName)
.attributes(topicAttributes)
.build();
CreateTopicResponse response = snsClient.createTopic(topicRequest);
String topicArn = response.topicArn();
System.out.println("The topic ARN is" + topicArn);
return topicArn;
} catch (SnsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
return "";
}
public static void subscribeQueues(List<QueueData> queues, String topicARN) {
queues.forEach(queue -> {
SubscribeRequest subscribeRequest = SubscribeRequest.builder()
.topicArn(topicARN)
.endpoint(queue.queueARN)
.protocol("sqs")
.build();
// Subscribe to the endpoint by using the SNS service client.
// Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO
// topic.
SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest);
System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]");
queue.subscriptionARN = subscribeResponse.subscriptionArn();
});
}
public static void publishPriceUpdate(String topicArn, String payload, String groupId) {
try {
// Create and publish a message that updates the wholesale price.
String subject = "Price Update";
String dedupId = UUID.randomUUID().toString();
String attributeName = "business";
String attributeValue = "wholesale";
MessageAttributeValue msgAttValue = MessageAttributeValue.builder()
.dataType("String")
.stringValue(attributeValue)
.build();
Map<String, MessageAttributeValue> attributes = new HashMap<>();
attributes.put(attributeName, msgAttValue);
PublishRequest pubRequest = PublishRequest.builder()
.topicArn(topicArn)
.subject(subject)
.message(payload)
.messageGroupId(groupId)
.messageDeduplicationId(dedupId)
.messageAttributes(attributes)
.build();
final PublishResponse response = snsClient.publish(pubRequest);
System.out.println(response.messageId());
System.out.println(response.sequenceNumber());
System.out.println("Message was published to " + topicArn);
} catch (SnsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
}
- Python
-
- SDK para Python (Boto3)
-
Crea un tema FIFO de Amazon SNS, suscribe una cola FIFO de Amazon SQS al tema y publica un mensaje en el tema.
def usage_demo():
"""Shows how to subscribe queues to a FIFO topic."""
print("-" * 88)
print("Welcome to the `Subscribe queues to a FIFO topic` demo!")
print("-" * 88)
sns = boto3.resource("sns")
sqs = boto3.resource("sqs")
fifo_topic_wrapper = FifoTopicWrapper(sns)
sns_wrapper = SnsWrapper(sns)
prefix = "sqs-subscribe-demo-"
queues = set()
subscriptions = set()
wholesale_queue = sqs.create_queue(
QueueName=prefix + "wholesale.fifo",
Attributes={
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
"FifoQueue": str(True),
"ContentBasedDeduplication": str(True),
},
)
queues.add(wholesale_queue)
print(f"Created FIFO queue with URL: {wholesale_queue.url}.")
retail_queue = sqs.create_queue(
QueueName=prefix + "retail.fifo",
Attributes={
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
"FifoQueue": str(True),
"ContentBasedDeduplication": str(True),
},
)
queues.add(retail_queue)
print(f"Created FIFO queue with URL: {retail_queue.url}.")
analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={})
queues.add(analytics_queue)
print(f"Created standard queue with URL: {analytics_queue.url}.")
topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo")
print(f"Created FIFO topic: {topic.attributes['TopicArn']}.")
for q in queues:
fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"])
print(f"Added access policies for topic: {topic.attributes['TopicArn']}.")
for q in queues:
sub = fifo_topic_wrapper.subscribe_queue_to_topic(
topic, q.attributes["QueueArn"]
)
subscriptions.add(sub)
print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.")
input("Press Enter to publish a message to the topic.")
message_id = fifo_topic_wrapper.publish_price_update(
topic, '{"product": 214, "price": 79.99}', "Consumables"
)
print(f"Published price update with message ID: {message_id}.")
# Clean up the subscriptions, queues, and topic.
input("Press Enter to clean up resources.")
for s in subscriptions:
sns_wrapper.delete_subscription(s)
sns_wrapper.delete_topic(topic)
for q in queues:
fifo_topic_wrapper.delete_queue(q)
print(f"Deleted subscriptions, queues, and topic.")
print("Thanks for watching!")
print("-" * 88)
class FifoTopicWrapper:
"""Encapsulates Amazon SNS FIFO topic and subscription functions."""
def __init__(self, sns_resource):
"""
:param sns_resource: A Boto3 Amazon SNS resource.
"""
self.sns_resource = sns_resource
def create_fifo_topic(self, topic_name):
"""
Create a FIFO topic.
Topic names must be made up of only uppercase and lowercase ASCII letters,
numbers, underscores, and hyphens, and must be between 1 and 256 characters long.
For a FIFO topic, the name must end with the .fifo suffix.
:param topic_name: The name for the topic.
:return: The new topic.
"""
try:
topic = self.sns_resource.create_topic(
Name=topic_name,
Attributes={
"FifoTopic": str(True),
"ContentBasedDeduplication": str(False),
},
)
logger.info("Created FIFO topic with name=%s.", topic_name)
return topic
except ClientError as error:
logger.exception("Couldn't create topic with name=%s!", topic_name)
raise error
@staticmethod
def add_access_policy(queue, topic_arn):
"""
Add the necessary access policy to a queue, so
it can receive messages from a topic.
:param queue: The queue resource.
:param topic_arn: The ARN of the topic.
:return: None.
"""
try:
queue.set_attributes(
Attributes={
"Policy": json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "test-sid",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SQS:SendMessage",
"Resource": queue.attributes["QueueArn"],
"Condition": {
"ArnLike": {"aws:SourceArn": topic_arn}
},
}
],
}
)
}
)
logger.info("Added trust policy to the queue.")
except ClientError as error:
logger.exception("Couldn't add trust policy to the queue!")
raise error
@staticmethod
def subscribe_queue_to_topic(topic, queue_arn):
"""
Subscribe a queue to a topic.
:param topic: The topic resource.
:param queue_arn: The ARN of the queue.
:return: The subscription resource.
"""
try:
subscription = topic.subscribe(
Protocol="sqs",
Endpoint=queue_arn,
)
logger.info("The queue is subscribed to the topic.")
return subscription
except ClientError as error:
logger.exception("Couldn't subscribe queue to topic!")
raise error
@staticmethod
def publish_price_update(topic, payload, group_id):
"""
Compose and publish a message that updates the wholesale price.
:param topic: The topic to publish to.
:param payload: The message to publish.
:param group_id: The group ID for the message.
:return: The ID of the message.
"""
try:
att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}}
dedup_id = uuid.uuid4()
response = topic.publish(
Subject="Price Update",
Message=payload,
MessageAttributes=att_dict,
MessageGroupId=group_id,
MessageDeduplicationId=str(dedup_id),
)
message_id = response["MessageId"]
logger.info("Published message to topic %s.", topic.arn)
except ClientError as error:
logger.exception("Couldn't publish message to topic %s.", topic.arn)
raise error
return message_id
@staticmethod
def delete_queue(queue):
"""
Removes an SQS queue. When run against an AWS account, it can take up to
60 seconds before the queue is actually deleted.
:param queue: The queue to delete.
:return: None
"""
try:
queue.delete()
logger.info("Deleted queue with URL=%s.", queue.url)
except ClientError as error:
logger.exception("Couldn't delete queue with URL=%s!", queue.url)
raise error
- SAP ABAP
-
- SDK para SAP ABAP
-
Crea un tema de FIFO, suscribe una cola FIFO de Amazon SQS al tema y publica un mensaje en un tema de Amazon SNS.
" Creates a FIFO topic. "
DATA lt_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>tt_topicattributesmap.
DATA ls_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>ts_topicattributesmap_maprow.
ls_tpc_attributes-key = 'FifoTopic'.
ls_tpc_attributes-value = NEW /aws1/cl_snstopicattrsmap_w( iv_value = 'true' ).
INSERT ls_tpc_attributes INTO TABLE lt_tpc_attributes.
TRY.
DATA(lo_create_result) = lo_sns->createtopic(
iv_name = iv_topic_name
it_attributes = lt_tpc_attributes
).
DATA(lv_topic_arn) = lo_create_result->get_topicarn( ).
ov_topic_arn = lv_topic_arn. " ov_topic_arn is returned for testing purposes. "
MESSAGE 'FIFO topic created' TYPE 'I'.
CATCH /aws1/cx_snstopiclimitexcdex.
MESSAGE 'Unable to create more topics. You have reached the maximum number of topics allowed.' TYPE 'E'.
ENDTRY.
" Subscribes an endpoint to an Amazon Simple Notification Service (Amazon SNS) topic. "
" Only Amazon Simple Queue Service (Amazon SQS) FIFO queues can be subscribed to an SNS FIFO topic. "
TRY.
DATA(lo_subscribe_result) = lo_sns->subscribe(
iv_topicarn = lv_topic_arn
iv_protocol = 'sqs'
iv_endpoint = iv_queue_arn
).
DATA(lv_subscription_arn) = lo_subscribe_result->get_subscriptionarn( ).
ov_subscription_arn = lv_subscription_arn. " ov_subscription_arn is returned for testing purposes. "
MESSAGE 'SQS queue was subscribed to SNS topic.' TYPE 'I'.
CATCH /aws1/cx_snsnotfoundexception.
MESSAGE 'Topic does not exist.' TYPE 'E'.
CATCH /aws1/cx_snssubscriptionlmte00.
MESSAGE 'Unable to create subscriptions. You have reached the maximum number of subscriptions allowed.' TYPE 'E'.
ENDTRY.
" Publish message to SNS topic. "
TRY.
DATA lt_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>tt_messageattributemap.
DATA ls_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>ts_messageattributemap_maprow.
ls_msg_attributes-key = 'Importance'.
ls_msg_attributes-value = NEW /aws1/cl_snsmessageattrvalue( iv_datatype = 'String' iv_stringvalue = 'High' ).
INSERT ls_msg_attributes INTO TABLE lt_msg_attributes.
DATA(lo_result) = lo_sns->publish(
iv_topicarn = lv_topic_arn
iv_message = 'The price of your mobile plan has been increased from $19 to $23'
iv_subject = 'Changes to mobile plan'
iv_messagegroupid = 'Update-2'
iv_messagededuplicationid = 'Update-2.1'
it_messageattributes = lt_msg_attributes
).
ov_message_id = lo_result->get_messageid( ). " ov_message_id is returned for testing purposes. "
MESSAGE 'Message was published to SNS topic.' TYPE 'I'.
CATCH /aws1/cx_snsnotfoundexception.
MESSAGE 'Topic does not exist.' TYPE 'E'.
ENDTRY.
Recepción de mensajes de suscripciones FIFO
Ahora puede recibir actualizaciones de precios en las tres aplicaciones suscritas. Como se muestra enEjemplo de caso de uso de temas FIFO de Amazon SNS, el punto de entrada de cada aplicación de consumo es la cola Amazon SQS, que su AWS Lambda función correspondiente puede sondear automáticamente. Cuando una cola de Amazon SQS es un origen de eventos para una función de Lambda, Lambda escala su flota de sondeadores según sea necesario para consumir mensajes de forma eficiente.
Para obtener más información, consulte Uso AWS Lambda con Amazon SQS en la Guía para AWS Lambda desarrolladores. Para obtener información sobre cómo escribir sus propios encuestadores de colas, consulte Recomendaciones para las colas estándar y FIFO de Amazon SQS en la Guía para desarrolladores de Amazon Simple Queue Service y ReceiveMessageen la Referencia de la API de Amazon Simple Queue Service.
¿Usando AWS CloudFormation
AWS CloudFormation le permite utilizar un archivo de plantilla para crear y configurar un conjunto de AWS recursos juntos como una sola unidad. En esta sección, se incluye una plantilla de ejemplo que crea lo siguiente:
-
El tema FIFO de Amazon SNS que distribuye las actualizaciones de precios
-
Las colas FIFO de Amazon SQS que proporcionan estas actualizaciones a las aplicaciones mayoristas y minoristas
-
La cola estándar de Amazon SQS para la aplicación de análisis que almacena registros, que pueden consultarse para inteligencia empresarial (BI)
-
Las suscripciones FIFO de Amazon SNS que conectan las tres colas al tema
-
Una política de filtro en la que se especifica que las aplicaciones de suscriptor reciben solo las actualizaciones de precios que necesitan.
Si prueba este código de ejemplo al publicar un mensaje en el tema, asegúrese de publicar el mensaje con el atributo de business
. Especifique retail
o wholesale
en el valor de atributo. De lo contrario, el mensaje se filtra y no se entrega a las colas suscritas.
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"PriceUpdatesTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"TopicName": "PriceUpdatesTopic.fifo",
"FifoTopic": true,
"ContentBasedDeduplication": false,
"ArchivePolicy": {
"MessageRetentionPeriod": "30"
}
}
},
"WholesaleQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "WholesaleQueue.fifo",
"FifoQueue": true,
"ContentBasedDeduplication": false
}
},
"RetailQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "RetailQueue.fifo",
"FifoQueue": true,
"ContentBasedDeduplication": false
}
},
"AnalyticsQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "AnalyticsQueue"
}
},
"WholesaleSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"WholesaleQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false",
"FilterPolicyScope": "MessageBody",
"FilterPolicy": {
"business": [
"wholesale"
]
}
}
},
"RetailSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"RetailQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false",
"FilterPolicyScope": "MessageBody",
"FilterPolicy": {
"business": [
"retail"
]
}
}
},
"AnalyticsSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"AnalyticsQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false"
}
},
"SalesQueuesPolicy": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": [
"sqs:SendMessage"
],
"Resource": "*",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "PriceUpdatesTopic"
}
}
}
}
]
},
"Queues": [
{
"Ref": "WholesaleQueue"
},
{
"Ref": "RetailQueue"
},
{
"Ref": "AnalyticsQueue"
}
]
}
}
}
}
Para obtener más información sobre la implementación de AWS recursos mediante una AWS CloudFormation plantilla, consulte Primeros pasos en la Guía del AWS CloudFormation usuario.