Amazon Simple Notification Service Construct Library

Add an SNS Topic to your stack:

topic = sns.Topic(self, "Topic",
    display_name="Customer subscription topic"
)

Add a FIFO SNS topic with content-based de-duplication to your stack:

topic = sns.Topic(self, "Topic",
    content_based_deduplication=True,
    display_name="Customer subscription topic",
    fifo=True
)

Add an SNS Topic to your stack with a specified signature version, which corresponds to the hashing algorithm used while creating the signature of the notifications, subscription confirmations, or unsubscribe confirmation messages sent by Amazon SNS.

The default signature version is 1 (SHA1). SNS also supports signature version 2 (SHA256).

topic = sns.Topic(self, "Topic",
    signature_version="2"
)

Note that FIFO topics require a topic name to be provided. The required .fifo suffix will be automatically generated and added to the topic name if it is not explicitly provided.

Subscriptions

Various subscriptions can be added to the topic by calling the .addSubscription(...) method on the topic. It accepts a subscription object, default implementations of which can be found in the aws-cdk-lib/aws-sns-subscriptions package:

Add an HTTPS Subscription to your topic:

my_topic = sns.Topic(self, "MyTopic")

my_topic.add_subscription(subscriptions.UrlSubscription("https://foobar.com/"))

Subscribe a queue to the topic:

# queue: sqs.Queue

my_topic = sns.Topic(self, "MyTopic")

my_topic.add_subscription(subscriptions.SqsSubscription(queue))

Note that subscriptions of queues in different accounts need to be manually confirmed by reading the initial message from the queue and visiting the link found in it.

Filter policy

A filter policy can be specified when subscribing an endpoint to a topic.

Example with a Lambda subscription:

import aws_cdk.aws_lambda as lambda_
# fn: lambda.Function


my_topic = sns.Topic(self, "MyTopic")

# Lambda should receive only message matching the following conditions on attributes:
# color: 'red' or 'orange' or begins with 'bl'
# size: anything but 'small' or 'medium'
# price: between 100 and 200 or greater than 300
# store: attribute must be present
my_topic.add_subscription(subscriptions.LambdaSubscription(fn,
    filter_policy={
        "color": sns.SubscriptionFilter.string_filter(
            allowlist=["red", "orange"],
            match_prefixes=["bl"],
            match_suffixes=["ue"]
        ),
        "size": sns.SubscriptionFilter.string_filter(
            denylist=["small", "medium"]
        ),
        "price": sns.SubscriptionFilter.numeric_filter(
            between=sns.BetweenCondition(start=100, stop=200),
            greater_than=300
        ),
        "store": sns.SubscriptionFilter.exists_filter()
    }
))

Payload-based filtering

To filter messages based on the payload or body of the message, use the filterPolicyWithMessageBody property. This type of filter policy supports creating filters on nested objects.

Example with a Lambda subscription:

import aws_cdk.aws_lambda as lambda_
# fn: lambda.Function


my_topic = sns.Topic(self, "MyTopic")

# Lambda should receive only message matching the following conditions on message body:
# color: 'red' or 'orange'
my_topic.add_subscription(subscriptions.LambdaSubscription(fn,
    filter_policy_with_message_body={
        "background": sns.FilterOrPolicy.policy({
            "color": sns.FilterOrPolicy.filter(sns.SubscriptionFilter.string_filter(
                allowlist=["red", "orange"]
            ))
        })
    }
))

Example of Firehose Subscription

from aws_cdk.aws_kinesisfirehose_alpha import DeliveryStream
# stream: DeliveryStream


topic = sns.Topic(self, "Topic")

sns.Subscription(self, "Subscription",
    topic=topic,
    endpoint=stream.delivery_stream_arn,
    protocol=sns.SubscriptionProtocol.FIREHOSE,
    subscription_role_arn="SAMPLE_ARN"
)

DLQ setup for SNS Subscription

CDK can attach provided Queue as DLQ for your SNS subscription. See the SNS DLQ configuration docs for more information about this feature.

Example of usage with user provided DLQ.

topic = sns.Topic(self, "Topic")
dl_queue = sqs.Queue(self, "DeadLetterQueue",
    queue_name="MySubscription_DLQ",
    retention_period=Duration.days(14)
)

sns.Subscription(self, "Subscription",
    endpoint="endpoint",
    protocol=sns.SubscriptionProtocol.LAMBDA,
    topic=topic,
    dead_letter_queue=dl_queue
)

CloudWatch Event Rule Target

SNS topics can be used as targets for CloudWatch event rules.

Use the aws-cdk-lib/aws-events-targets.SnsTopic:

import aws_cdk.aws_codecommit as codecommit
import aws_cdk.aws_events_targets as targets

# repo: codecommit.Repository

my_topic = sns.Topic(self, "Topic")

repo.on_commit("OnCommit",
    target=targets.SnsTopic(my_topic)
)

This will result in adding a target to the event rule and will also modify the topic resource policy to allow CloudWatch events to publish to the topic.

Topic Policy

A topic policy is automatically created when addToResourcePolicy is called, if one doesn’t already exist. Using addToResourcePolicy is the simplest way to add policies, but a TopicPolicy can also be created manually.

topic = sns.Topic(self, "Topic")
topic_policy = sns.TopicPolicy(self, "TopicPolicy",
    topics=[topic]
)

topic_policy.document.add_statements(iam.PolicyStatement(
    actions=["sns:Subscribe"],
    principals=[iam.AnyPrincipal()],
    resources=[topic.topic_arn]
))

A policy document can also be passed on TopicPolicy construction

topic = sns.Topic(self, "Topic")
policy_document = iam.PolicyDocument(
    assign_sids=True,
    statements=[
        iam.PolicyStatement(
            actions=["sns:Subscribe"],
            principals=[iam.AnyPrincipal()],
            resources=[topic.topic_arn]
        )
    ]
)

topic_policy = sns.TopicPolicy(self, "Policy",
    topics=[topic],
    policy_document=policy_document
)

Enforce encryption of data in transit when publishing to a topic

You can enforce SSL when creating a topic policy by setting the enforceSSL flag:

topic = sns.Topic(self, "Topic")
policy_document = iam.PolicyDocument(
    assign_sids=True,
    statements=[
        iam.PolicyStatement(
            actions=["sns:Publish"],
            principals=[iam.ServicePrincipal("s3.amazonaws.com")],
            resources=[topic.topic_arn]
        )
    ]
)

topic_policy = sns.TopicPolicy(self, "Policy",
    topics=[topic],
    policy_document=policy_document,
    enforce_sSL=True
)

Similiarly you can enforce SSL by setting the enforceSSL flag on the topic:

topic = sns.Topic(self, "TopicAddPolicy",
    enforce_sSL=True
)

topic.add_to_resource_policy(iam.PolicyStatement(
    principals=[iam.ServicePrincipal("s3.amazonaws.com")],
    actions=["sns:Publish"],
    resources=[topic.topic_arn]
))

Delivery status logging

Amazon SNS provides support to log the delivery status of notification messages sent to topics with the following Amazon SNS endpoints:

  • HTTP

  • Amazon Kinesis Data Firehose

  • AWS Lambda

  • Platform application endpoint

  • Amazon Simple Queue Service

Example with a delivery status logging configuration for SQS:

# role: iam.Role

topic = sns.Topic(self, "MyTopic",
    logging_configs=[sns.LoggingConfig(
        protocol=sns.LoggingProtocol.SQS,
        failure_feedback_role=role,
        success_feedback_role=role,
        success_feedback_sample_rate=50
    )
    ]
)

A delivery status logging configuration can also be added to your topic by addLoggingConfig method:

# role: iam.Role

topic = sns.Topic(self, "MyTopic")

topic.add_logging_config(
    protocol=sns.LoggingProtocol.SQS,
    failure_feedback_role=role,
    success_feedback_role=role,
    success_feedback_sample_rate=50
)

Note that valid values for successFeedbackSampleRate are integer between 0-100.

Archive Policy

Message archiving provides the ability to archive a single copy of all messages published to your topic. You can store published messages within your topic by enabling the message archive policy on the topic, which enables message archiving for all subscriptions linked to that topic. Messages can be archived for a minimum of one day to a maximum of 365 days.

Example with an archive policy:

topic = sns.Topic(self, "MyTopic",
    fifo=True,
    message_retention_period_in_days=7
)

Note: The messageRetentionPeriodInDays property is only available for FIFO topics.

TracingConfig

Tracing mode of an Amazon SNS topic.

If PassThrough, the topic passes trace headers received from the Amazon SNS publisher to its subscription. If set to Active, Amazon SNS will vend X-Ray segment data to topic owner account if the sampled flag in the tracing header is true.

The default TracingConfig is TracingConfig.PASS_THROUGH.

Example with a tracingConfig set to Active:

topic = sns.Topic(self, "MyTopic",
    tracing_config=sns.TracingConfig.ACTIVE
)