AWS SDK を使用して DAX で DynamoDB の読み取りを高速化する - AWSSDK コードサンプル

AWSDocAWS SDKGitHub サンプルリポジトリには、さらに多くの SDK サンプルがあります

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS SDK を使用して DAX で DynamoDB の読み取りを高速化する

次のコード例は、以下の操作方法を示しています。

  • DAX クライアントと SDK クライアントの両方でデータを作成してテーブルに書き込みます。

  • 両方のクライアントでテーブルを取得、クエリ、スキャンし、パフォーマンスを比較します。

詳細については、「DynamoDB Accelerator クライアントで開発する」を参照してください。

Python
SDK for Python (Boto3)
注記

他にもありますGitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

DAX または Boto3 クライアントでテーブルを作成します。

import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table_name = 'TryDaxTable' params = { 'TableName': table_name, 'KeySchema': [ {'AttributeName': 'partition_key', 'KeyType': 'HASH'}, {'AttributeName': 'sort_key', 'KeyType': 'RANGE'} ], 'AttributeDefinitions': [ {'AttributeName': 'partition_key', 'AttributeType': 'N'}, {'AttributeName': 'sort_key', 'AttributeType': 'N'} ], 'ProvisionedThroughput': { 'ReadCapacityUnits': 10, 'WriteCapacityUnits': 10 } } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == '__main__': dax_table = create_dax_table() print(f"Created table.")

テーブルにテストデータを書き込みます。

import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') some_data = 'X' * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item(Item={ 'partition_key': partition_key, 'sort_key': sort_key, 'some_data': some_data }) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == '__main__': write_key_count = 10 write_item_size = 1000 print(f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters.") write_data_to_dax_table(write_key_count, write_item_size)

DAX クライアントと Boto3 クライアントの両方について、いくつかのイテレーションの項目を取得し、それぞれに費やした時間を報告します。

import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item(Key={ 'partition_key': partition_key, 'sort_key': sort_key }) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print(f"Getting each item from the table {test_iterations} times, " f"using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax) else: print(f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client.") test_start, test_end = get_item_test( test_key_count, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}.")

DAX クライアントと Boto3 クライアントの両方について、いくつかのイテレーションのテーブルに対してクエリを実行し、それぞれに費やした時間を報告します。

import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') key_condition_expression = \ Key('partition_key').eq(partition_key) & \ Key('sort_key').between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")

DAX クライアントと Boto3 クライアントの両方について、いくつかのイテレーションのテーブルをスキャンし、それぞれに費やした時間を報告します。

import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') start = time.perf_counter() for _ in range(iterations): table.scan() print('.', end='') sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == '__main__': # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( 'endpoint_url', nargs='?', help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.") args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print(f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}.")

テーブルを削除する。

import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource('dynamodb') table = dyn_resource.Table('TryDaxTable') table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == '__main__': delete_dax_table() print("Table deleted!")