test_queue_wrapper.py - AWS Code Sample

test_queue_wrapper.py

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """ Purpose Unit tests for queue_wrapper.py functions. Running the tests Tests can be run in two modes. By default, the tests use the botocore Stubber. This captures requests before they are sent to AWS and returns a mocked response. You can also run the tests against your AWS account. In this case, they will create and manipulate AWS resources, which may incur charges on your account. To run the tests for this module with the botocore Stubber, run the following in your <GitHub root>/python/example_code/sqs folder. python -m pytest -o log_cli=1 --log-cli-level=INFO test/test_queue_wrapper.py The '-o log_cli=1 --log-cli-level=INFO' flags configure pytest to output logs to stdout during the test run. Without them, pytest captures logs and prints them only when the test fails. To run the tests using your AWS account and default shared credentials, include the '--use-real-aws-may-incur-charges' flag. python -m pytest -o log_cli=1 --log-cli-level=INFO --use-real-aws-may-incur-charges test/test_queue_wrapper.py Note that this may incur charges to your AWS account. When run in this mode, a best effort is made to clean up any resources created during the test. But it's your responsibility to verify that all resources have actually been cleaned up. """ import json import time import pytest from botocore.exceptions import ClientError import queue_wrapper @pytest.mark.parametrize("attributes", [ ({}), ({ 'MaximumMessageSize': str(4096), 'ReceiveMessageWaitTimeSeconds': str(10), 'VisibilityTimeout': str(300) }) ]) def test_create_standard_queue(make_stubber, make_unique_name, attributes): """Test that creating a standard queue returns a queue with the expected form of URL.""" sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') url = 'url-' + queue_name sqs_stubber.stub_create_queue(queue_name, attributes, url) queue = queue_wrapper.create_queue(queue_name, attributes) assert queue_name in queue.url if not sqs_stubber.use_stubs: queue_wrapper.remove_queue(queue) @pytest.mark.parametrize("attributes", [ ({}), ({ 'MaximumMessageSize': str(1024), 'ReceiveMessageWaitTimeSeconds': str(20) }) ]) def test_create_fifo_queue(make_stubber, make_unique_name, attributes): """Test that creating a FIFO queue returns a queue with the expected form of URL.""" sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) # FIFO queues require a '.fifo' suffix on the queue name. queue_name = make_unique_name('queue') + '.fifo' attributes['FifoQueue'] = str(True) url = 'url-' + queue_name sqs_stubber.stub_create_queue(queue_name, attributes, url) queue = queue_wrapper.create_queue(queue_name, attributes) assert queue.url.endswith(queue_name) if not sqs_stubber.use_stubs: queue_wrapper.remove_queue(queue) def test_create_dead_letter_queue(make_stubber, make_unique_name): """ Test that creating a queue with an associated dead-letter queue results in the source queue being listed in the dead-letter queue's source queue list. A dead-letter queue is any queue that is designated as a dead-letter target by another queue's redrive policy. """ sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) dl_queue_name = make_unique_name('queue') + '_my_lost_messages' dl_url = 'url-' + dl_queue_name queue_name = make_unique_name('queue') url = 'url-' + queue_name # Create a dead-letter queue. sqs_stubber.stub_create_queue(dl_queue_name, {}, dl_url) dl_queue = queue_wrapper.create_queue(dl_queue_name) sqs_stubber.stub_get_queue_attributes(dl_url, 'arn:' + dl_queue_name) # Create the source queue that sends dead-letter messages to the dead-letter queue. # The redrive policy must be in JSON format. Note that its attribute names # start with lowercase letters. redrive_attributes = { 'RedrivePolicy': json.dumps({ 'deadLetterTargetArn': dl_queue.attributes['QueueArn'], 'maxReceiveCount': str(5) }) } sqs_stubber.stub_create_queue(queue_name, redrive_attributes, url) sqs_stubber.stub_list_dead_letter_source_queues(dl_url, [url]) queue = queue_wrapper.create_queue(queue_name, redrive_attributes) sources = list(dl_queue.dead_letter_source_queues.all()) assert queue in sources if not sqs_stubber.use_stubs: queue_wrapper.remove_queue(queue) queue_wrapper.remove_queue(dl_queue) def test_create_queue_bad_name(): """Test that creating a queue with invalid characters in the name raises an exception.""" with pytest.raises(ClientError): queue_wrapper.create_queue("Queue names cannot contain spaces!") def test_create_standard_queue_with_fifo_extension(make_unique_name): """Test that creating a standard queue with the '.fifo' extension raises an exception.""" with pytest.raises(ClientError): queue_wrapper.create_queue(make_unique_name('queue') + '.fifo') def test_create_fifo_queue_without_extension(make_unique_name): """Test that creating a FIFO queue without the '.fifo' extension raises an exception.""" with pytest.raises(ClientError): queue_wrapper.create_queue( make_unique_name('queue'), attributes={'FifoQueue': str(True)}) def test_get_queue(make_stubber, make_unique_name): """Test that creating a queue and then getting it by name returns the same queue.""" sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') url = 'url-' + queue_name sqs_stubber.stub_create_queue(queue_name, {}, url) sqs_stubber.stub_get_queue_url(queue_name, url) created = queue_wrapper.create_queue(queue_name) gotten = queue_wrapper.get_queue(queue_name) assert created == gotten if not sqs_stubber.use_stubs: queue_wrapper.remove_queue(created) def test_get_queue_nonexistent(make_stubber, make_unique_name): """Test that getting a nonexistent queue raises an exception.""" sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') url = 'url-' + queue_name sqs_stubber.stub_get_queue_url( queue_name, url, error_code='AWS.SimpleQueueService.NonExistentQueue') with pytest.raises(ClientError) as exc_info: queue_wrapper.get_queue(queue_name) assert exc_info.value.response['Error']['Code'] == \ 'AWS.SimpleQueueService.NonExistentQueue' def test_get_queues(make_stubber, make_unique_name): """ Test that creating some queues, then retrieving all the queues, returns a list that contains at least all of the newly created queues. """ sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') created_queues = [] for ind in range(1, 4): name = queue_name + str(ind) sqs_stubber.stub_create_queue(name, {}, 'url-' + name) created_queues.append(queue_wrapper.create_queue(name)) sqs_stubber.stub_list_queues([q.url for q in created_queues]) # When running against the real AWS service, SQS may not return new queues right # away. Use a simple exponential backoff to retry until it succeeds. retries = 0 intersect_queues = [] while not intersect_queues and retries <= 5: time.sleep(min(retries * 2, 32)) gotten_queues = queue_wrapper.get_queues() intersect_queues = [cq for cq in created_queues if cq in gotten_queues] retries += 1 assert created_queues == intersect_queues if not sqs_stubber.use_stubs: for queue in created_queues: queue_wrapper.remove_queue(queue) def test_get_queues_prefix(make_stubber, make_unique_name): """ Test that creating some queues with a unique prefix, then retrieving a list of queues that have that unique prefix, returns a list that contains exactly the newly created queues. """ sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') created_queues = [] for ind in range(1, 4): name = queue_name + str(ind) sqs_stubber.stub_create_queue(name, {}, 'url-' + name) created_queues.append(queue_wrapper.create_queue(name)) sqs_stubber.stub_list_queues([q.url for q in created_queues], queue_name) gotten_queues = queue_wrapper.get_queues(queue_name) assert created_queues == gotten_queues if not sqs_stubber.use_stubs: for queue in created_queues: queue_wrapper.remove_queue(queue) def test_get_queues_expect_none(make_stubber, make_unique_name): """Test that getting queues with a random prefix returns an empty list and doesn't raise an exception.""" sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') sqs_stubber.stub_list_queues([], queue_name) queues = queue_wrapper.get_queues(queue_name) assert not queues def test_remove_queue(make_stubber, make_unique_name): """Test that creating a queue, deleting it, then trying to get it raises an exception because the queue no longer exists.""" sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client) queue_name = make_unique_name('queue') url = 'url-' + queue_name sqs_stubber.stub_create_queue(queue_name, {}, url) sqs_stubber.stub_delete_queue(url) sqs_stubber.stub_get_queue_url( queue_name, url, error_code='AWS.SimpleQueueService.NonExistentQueue') queue = queue_wrapper.create_queue(queue_name) queue_wrapper.remove_queue(queue) with pytest.raises(ClientError) as exc_info: queue_wrapper.get_queue(queue_name) assert exc_info.value.response['Error']['Code'] == \ 'AWS.SimpleQueueService.NonExistentQueue'