Accélérer les lectures DynamoDB avec DAX à l'aide d'un kit AWS SDK - Amazon DynamoDB

Accélérer les lectures DynamoDB avec DAX à l'aide d'un kit AWS SDK

L'exemple de code suivant illustre comment :

  • Créez et écrivez des données dans une table avec les clients DAX et SDK.

  • Obtenez, interrogez et analysez la table avec les clients DAX et SDK et comparez leurs performances.

Pour plus d'informations, consultez Développement avec le client DynamoDB Accelerator.

Python
Kit SDK pour Python (Boto3)
Astuce

Pour découvrir les étapes de configuration et d’exécution de cet exemple, consultez GitHub.

Créez une table avec le client DAX ou Boto3.

import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table_name = 'TryDaxTable' params = { 'TableName': table_name, 'KeySchema': [ {'AttributeName': 'partition_key', 'KeyType': 'HASH'}, {'AttributeName': 'sort_key', 'KeyType': 'RANGE'} ], 'AttributeDefinitions': [ {'AttributeName': 'partition_key', 'AttributeType': 'N'}, {'AttributeName': 'sort_key', 'AttributeType': 'N'} ], 'ProvisionedThroughput': { 'ReadCapacityUnits': 10, 'WriteCapacityUnits': 10 } } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == '__main__': dax_table = create_dax_table() print(f"Created table.")

Écrivez les données de test dans la table.

import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') some_data = 'X' * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item(Item={ 'partition_key': partition_key, 'sort_key': sort_key, 'some_data': some_data }) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == '__main__': write_key_count = 10 write_item_size = 1000 print(f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters.") write_data_to_dax_table(write_key_count, write_item_size)

Obtenez des éléments pour un certain nombre d'itérations pour le client DAX et le client Boto3, et indiquez le temps passé pour chacun d'eux.

import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item(Key={ 'partition_key': partition_key, 'sort_key': sort_key }) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print(f"Getting each item from the table {test_iterations} times, " f"using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax) else: print(f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client.") test_start, test_end = get_item_test( test_key_count, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}.")

Interrogez la table pour un certain nombre d'itérations pour le client DAX et le client Boto3, et indiquez le temps passé pour chacune d'elles.

import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') key_condition_expression = \ Key('partition_key').eq(partition_key) & \ Key('sort_key').between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")

Analysez la table à la recherche d'un certain nombre d'itérations pour le client DAX et le client Boto3, et indiquez le temps passé pour chacune d'elles.

import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') start = time.perf_counter() for _ in range(iterations): table.scan() print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")

Supprimez la table .

import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == '__main__': delete_dax_table() print("Table deleted!")

Pour obtenir la liste complète des guides de développementAWS SDK et des exemples de code, consultez Utilisation de DynamoDB avec un kit AWS SDK. Cette rubrique comprend également des informations sur le démarrage et sur les versions précédentes du kit de développement logiciel (SDK).