04-query-test.py - AWS Code Sample

04-query-test.py

#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """ Purpose Queries an Amazon DynamoDB table for a specified number of iterations. The total amount of time spent querying is measured and reported. When no arguments are specified, this script is run with the Boto3 client. When a DAX cluster endpoint is specified, the script uses the DAX client. Running in each mode lets you compare the performance of the two clients. """ import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') key_condition_expression = \ Key('partition_key').eq(partition_key) & \ Key('sort_key').between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")