Binding of SNS publish operations with subscribe
or create_topic
operations can cause latency issues with newly created topics.
1def sns_publish_noncompliant(self, sqs_arn: str, topic_arn: str) -> None:
2 import boto3
3 session = boto3.Session()
4 sns_client = session.client('sns')
5 sns_client.subscribe(TopicArn=topic_arn, Protocol='sqs',
6 Endpoint=sqs_arn,
7 ReturnSubscriptionArn=True)
8
9 # Noncompliant: incorrect binding of SNS publish operations
10 # with 'subscribe' or 'create_topic' operations.
11 sns_client.publish(TopicArn=topic_arn,
12 Message='test message for SQS',
13 MessageAttributes={'attr1': {
14 'DataType': 'String',
15 'StringValue': "short_uid"
16 }
17 }
18 )
1def sns_publish_compliant(self, sqs_arn: str, topic_arn: str) -> None:
2 import boto3
3 session = boto3.Session()
4 sns_client = session.client('sns')
5 response = sns_client.subscribe(TopicArn=topic_arn, Protocol='sqs',
6 Endpoint=sqs_arn,
7 ReturnSubscriptionArn=True)
8 # Compliant: avoids binding of SNS publish operations
9 # with 'subscribe' or 'create_topic' operations.
10 return response