Exemples de code pour les rubriques FIFO - Amazon Simple Notification Service

Exemples de code pour les rubriques FIFO

Vous pouvez utiliser les exemples de code suivants pour intégrer le cas d'utilisation de la gestion des prix des pièces automobiles avec les rubriques FIFO SNS et les files d'attente FIFO SQS.

Utilisation d'un kit SDK AWS

En utilisant un kit SDK AWS, vous créez une rubrique FIFO Amazon SNS en définissant son attribut FifoTopic sur true. Vous créez une file d'attente FIFO Amazon SQS en définissant son attribut FifoQueue sur true. Vous devez également ajouter le suffixe .fifo au nom de chaque ressource FIFO. Après avoir créé une rubrique ou une file d'attente FIFO, vous ne pouvez pas la convertir en une rubrique ou une file d'attente standard.

L'exemple de code suivant crée ces ressources FIFO :

  • La rubrique SNS FIFO qui distribue les mises à jour de prix

  • Les files d'attente FIFO SQS qui fournissent ces mises à jour pour les deux applications (gros et détail)

  • Les abonnements SNS FIFO qui connectent les deux files d'attente à la rubrique

Cet exemple définit les stratégies de filtrage sur les abonnements. Si vous testez l'exemple en publiant un message dans la rubrique, assurez-vous de publier le message avec l'attribut business. Spécifiez retail ou wholesale pour la valeur d'attribut. Sinon le message est filtré et n'est pas remis aux files d'attente abonnées. Pour plus d'informations, consultez Filtrage des messages pour les rubriques FIFO.

Java
Kit SDK pour Java 1.x
Astuce

Pour découvrir comment configurer et exécuter cet exemple, consultez GitHub.

Créez une rubrique FIFO et des files d'attente FIFO. Abonnez les files d'attente à la rubrique.

// Create API clients AWSCredentialsProvider credentials = getCredentials(); AmazonSNS sns = new AmazonSNSClient(credentials); AmazonSQS sqs = new AmazonSQSClient(credentials); // Create FIFO topic Map<String, String> topicAttributes = new HashMap<String, String>(); topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); String topicArn = sns.createTopic( new CreateTopicRequest() .withName("PriceUpdatesTopic.fifo") .withAttributes(topicAttributes) ).getTopicArn(); // Create FIFO queues Map<String, String> queueAttributes = new HashMap<String, String>(); queueAttributes.put("FifoQueue", "true"); // Disable content-based deduplication because messages published with the same body // might carry different attributes that must be processed independently. // The price management system uses the message attributes to define whether a given // price update applies to the wholesale application or to the retail application. queueAttributes.put("ContentBasedDeduplication", "false"); String wholesaleQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("WholesaleQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); String retailQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("RetailQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); // Subscribe FIFO queues to FIFO topic, setting required permissions String wholesaleSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, wholesaleQueueUrl); String retailSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, retailQueueUrl);

Définissez une stratégie de filtrage sur chaque abonnement afin que chaque application abonnée ne reçoive que les mises à jour de prix dont elle a besoin.

// Set the Amazon SNS subscription filter policies SNSMessageFilterPolicy wholesalePolicy = new SNSMessageFilterPolicy(); wholesalePolicy.addAttribute("business", "wholesale"); wholesalePolicy.apply(sns, wholesaleSubscriptionArn); SNSMessageFilterPolicy retailPolicy = new SNSMessageFilterPolicy(); retailPolicy.addAttribute("business", "retail"); retailPolicy.apply(sns, retailSubscriptionArn);

Composez et publiez un message qui met à jour le prix de gros.

// Publish message to FIFO topic String subject = "Price Update"; String payload = "{\"product\": 214, \"price\": 79.99}"; String groupId = "PID-214"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put( attributeName, new MessageAttributeValue() .withDataType("String") .withStringValue(attributeValue)); sns.publish( new PublishRequest() .withTopicArn(topicArn) .withSubject(subject) .withMessage(payload) .withMessageGroupId(groupId); .withMessageDeduplicationId(dedupId) .withMessageAttributes(attributes);

Réception de messages provenant d'abonnements FIFO

Vous pouvez maintenant recevoir des mises à jour de prix dans les applications de gros et de détail. Comme indiqué dans le Cas d'utilisation d'exemple de rubrique FIFO, le point d'entrée de chaque application grand public est la file d'attente FIFO SQS, dont le AWS Lambda peut interroger automatiquement. Lorsqu'une file d'attente FIFO SQS est une source d'évènements pour une fonction Lambda, Lambda met à l'échelle sa flotte de sondeurs en fonction des besoins pour consommer efficacement les messages.

Pour plus d'informations, consultez Utilisation de AWS Lambda avec Amazon SQS dans le Guide du développeur AWS Lambda. Pour plus d'informations sur l'écriture de vos propres sondeurs de file d'attente, consultez Recommandations pour les files d'attente standard et FIFO Amazon SQS dans le Guide du développeur Amazon Simple Queue Service et ReceiveMessage dans la Référence d'API Amazon Simple Queue Service.

Utiliser AWS CloudFormation

AWS CloudFormation vous permet d'utiliser un fichier de modèle pour créer et configurer simultanément un ensemble de ressources AWS en tant qu'unité unique. Cette section comporte un exemple de modèle qui crée les éléments suivants :

  • La rubrique SNS FIFO qui distribue les mises à jour de prix

  • Les files d'attente FIFO SQS qui fournissent ces mises à jour pour les deux applications (gros et détail)

  • Les abonnements SNS FIFO qui connectent les deux files d'attente à la rubrique

  • Une stratégie de filtrage qui spécifie que les applications abonnées reçoivent uniquement les mises à jour de prix dont elles ont besoin

Note

Si vous testez cet exemple de code en publiant un message dans la rubrique, assurez-vous de publier le message avec l'attribut business. Spécifiez retail ou wholesale pour la valeur d'attribut. Sinon le message est filtré et n'est pas remis aux files d'attente abonnées.

{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "PriceUpdatesTopic": { "Type": "AWS::SNS::Topic", "Properties": { "TopicName": "PriceUpdatesTopic.fifo", "FifoTopic": true, "ContentBasedDeduplication": false } }, "WholesaleQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "WholesaleQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "RetailQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "RetailQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "WholesaleSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "WholesaleQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "wholesale" ] } } }, "RetailSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "RetailQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "retail" ] } } }, "SalesQueuesPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": [ "sqs:SendMessage" ], "Resource": "*", "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "PriceUpdatesTopic" } } } } ] }, "Queues": [ { "Ref": "WholesaleQueue" }, { "Ref": "RetailQueue" } ] } } } }

Pour plus d'informations sur le déploiement de ressources AWS à l'aide d'un modèle AWS CloudFormation, consultez la page Démarrer du Guide de l'utilisateur AWS CloudFormation.