Python と Boto3 による Amazon DynamoDB のプログラミング - Amazon DynamoDB

Python と Boto3 による Amazon DynamoDB のプログラミング

このガイドは、Python で Amazon DynamoDB を使用したいと考えているプログラマーを対象としています。さまざまな抽象化レイヤー、設定管理、エラー処理、再試行ポリシーの制御、キープアライブの管理などについて説明します。

Boto について

Python から DynamoDB にアクセスするには、Python 用の公式 AWS SDK (一般に Boto3 と呼ばれる) を使用します。Boto (ボトと発音) という名前は、アマゾン川に自生する淡水イルカに由来しています。Boto3 ライブラリは, ライブラリの 3 番目のメジャーバージョンであり、2015 年に初めてリリースされました。Boto3 ライブラリは、DynamoDB だけでなくすべての AWS サービスをサポートするため、非常に大きいです。このオリエンテーションは、DynamoDB に関連する Boto3 の部分のみを対象としています。

Boto は、GitHub でホストされるオープンソースプロジェクトとして AWS によって管理および公開されています。BotocoreBoto3 の 2 つのパッケージに分かれています。

  • Botocore は低レベルの機能を備えています。Botocore には、クライアント、セッション、認証情報、設定、および例外クラスがあります。

  • Boto3 は Botocore の上に構築されています。よりハイレベルで、より Python 的なインターフェイスを提供します。具体的には、DynamoDB テーブルをリソースとして公開し、低レベルのサービス指向のクライアントインターフェイスよりもシンプルで洗練されたインターフェイスを提供します。

これらのプロジェクトは GitHub でホストされているため、ソースコードを表示したり、未解決の問題を追跡したり、独自の問題を送信したりできます。

Boto ドキュメントの使用

以下のリソースを使って Boto ドキュメントを使い始めましょう。

  • まず、パッケージインストールの確実な出発点となる「クイックスタート」セクションから始めてください。まだ Boto3 をインストールしていない場合は、そちらを参照してください (Boto3 は多くの場合、AWS Lambda などの AWS サービス内で自動的に利用可能になります)。

  • その後、ドキュメントの DynamoDB ガイドに注目してください。テーブルの作成と削除、項目の操作、バッチ操作の実行、クエリの実行、スキャンの実行など、基本的な DynamoDB アクティビティを実行する方法を示しています。例ではリソースインターフェイスを使用しています。boto3.resource('dynamodb') が表示されれば、上位レベルのリソースインターフェイスを使用していることがわかります。

  • ガイドを読み終えたら、DynamoDB リファレンスを確認できます。このランディングページには、使用できるクラスとメソッドがすべて記載されています。上部には DynamoDB.Client クラスが表示されます。これにより、コントロールプレーンとデータプレーンのすべての操作に低レベルでアクセスできます。一番下にある DynamoDB.ServiceResource クラスを見てください。これは上位レベルの Python インターフェイスです。これを使用すると、テーブルを作成したり、複数のテーブルにわたってバッチ操作を実行したり、テーブル固有のアクションのために DynamoDB.ServiceResource.Table インスタンスを取得したりできます。

クライアントとリソースの抽象化レイヤーを理解する

使用する 2 つのインターフェイスは、クライアントインターフェイスとリソースインターフェイスです。

  • 低レベルのクライアントインターフェイスは、基盤となるサービス API に 1 対 1 で対応します。DynamoDB が提供するすべての API は、クライアントを通じて利用できます。つまり、クライアントインターフェイスは完全な機能を提供できますが、多くの場合、より冗長で使い方が複雑です。

  • 上位レベルのリソースインターフェイスは、基盤となるサービス API と 1 対 1 で対応するわけではありません。ただし、batch_writer などのサービスへのアクセスが便利になります。

クライアントインターフェイスを使用して項目を挿入する例を次に示します。すべての値が、型を示すキー (文字列は 'S'、数値は 'N') と文字列としての値のマップとして渡されることに注目してください。これは DynamoDB JSON 形式と呼ばれます。

import boto3 dynamodb = boto3.client('dynamodb') dynamodb.put_item( TableName='YourTableName', Item={ 'pk': {'S': 'id#1'}, 'sk': {'S': 'cart#123'}, 'name': {'S': 'SomeName'}, 'inventory': {'N': '500'}, # ... more attributes ... } )

リソースインターフェイスを使用した同じ PutItem 操作を次に示します。データ型の指定は暗黙的です。

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': 'id#1', 'sk': 'cart#123', 'name': 'SomeName', 'inventory': 500, # ... more attributes ... } )

必要に応じて、boto3 に用意されている TypeSerializer および TypeDeserializer クラスを使用して、通常の JSON と DynamoDB JSON の間で変換できます。

def dynamo_to_python(dynamo_object: dict) -> dict: deserializer = TypeDeserializer() return { k: deserializer.deserialize(v) for k, v in dynamo_object.items() } def python_to_dynamo(python_object: dict) -> dict: serializer = TypeSerializer() return { k: serializer.serialize(v) for k, v in python_object.items() }

クライアントインターフェイスを使用してクエリを実行する方法は次のとおりです。クエリは JSON コンストラクトとして表現されます。キーワードの競合が発生する可能性がある場合は、変数置換が必要な KeyConditionExpression 文字列を使用します。

import boto3 client = boto3.client('dynamodb') # Construct the query response = client.query( TableName='YourTableName', KeyConditionExpression='pk = :pk_val AND begins_with(sk, :sk_val)', FilterExpression='#name = :name_val', ExpressionAttributeValues={ ':pk_val': {'S': 'id#1'}, ':sk_val': {'S': 'cart#'}, ':name_val': {'S': 'SomeName'}, }, ExpressionAttributeNames={ '#name': 'name', } )

リソースインターフェイスを使った同じクエリ操作を短くして簡略化できます。

import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') response = table.query( KeyConditionExpression=Key('pk').eq('id#1') & Key('sk').begins_with('cart#'), FilterExpression=Attr('name').eq('SomeName') )

最後の例として、テーブルのおおよそのサイズ (テーブルに保持され、約 6 時間ごとに更新されるメタデータ) を取得したいとします。クライアントインターフェイスで、describe_table() 操作を行い、返された JSON 構造から回答を引き出します。

import boto3 dynamodb = boto3.client('dynamodb') response = dynamodb.describe_table(TableName='YourTableName') size = response['Table']['TableSizeBytes']

リソースインターフェイスでは、テーブルは describe 操作を暗黙的に実行し、データを属性として直接表示します。

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') size = table.table_size_bytes
注記

クライアントインターフェイスとリソースインターフェイスのどちらを使用して開発するかを検討する際、リソースドキュメントではリソースインターフェイスに新しい機能が追加されないことに注意してください。「AWS Python SDK チームは boto3 のリソースインターフェイスに新しい機能を追加する予定はありません。既存のインターフェイスは boto3 のライフサイクル中も引き続き動作します。顧客はクライアントインターフェイスを通じて新しいサービス機能にアクセスできます。」

テーブルリソース batch_writer の使用

上位レベルのテーブルリソースでのみ利用できる便利な機能の 1 つは、batch_writer です。DynamoDB はバッチ書き込み操作をサポートしており、1 つのネットワークリクエストで最大 25 件の入力または削除操作が可能です。このようなバッチ処理は、ネットワークラウンドトリップを最小限に抑えることで効率性を高めます。

低レベルのクライアントライブラリでは、client.batch_write_item() 操作を使用してバッチを実行します。作業は手動で 25 個のバッチに分割する必要があります。各操作の後に、未処理の項目のリストを受け取るように要求する必要もあります (書き込み操作の中には成功するものもあれば、失敗するものもあります)。その後、それらの未処理項目を後の batch_write_item() 操作に再度渡す必要があります。大量のボイラープレートコードがあります。

Table.batch_writer メソッドは、オブジェクトをバッチで書き込むためのコンテキストマネージャーを作成します。一度に 1 つずつ項目を書き込んでいるように見えても、内部的には項目をバッファリングしてバッチで送信するインターフェイスを提供します。また、未処理の項目の再試行も暗黙的に処理されます。

dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') movies = # long list of movies in {'pk': 'val', 'sk': 'val', etc} format with table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie)

クライアントレイヤーとリソースレイヤーを調べるその他のコード例

次のコードサンプルリポジトリを参照して、クライアントとリソースの両方を使用してさまざまな機能の使用法を調べることもできます。

Client オブジェクトと Resource オブジェクトがセッションやスレッドとどのように相互作用するかを理解する

Resource オブジェクトはスレッドセーフではないため、スレッドやプロセス間で共有しないでください。詳細については、Resource に関するガイドを参照してください。

これとは対照的に、Client オブジェクトは、特定の高度な機能を除いて、通常はスレッドセーフです。詳細については、Clients に関するガイドを参照してください。

Session オブジェクトはスレッドセーフではありません。そのため、マルチスレッド環境で Client または Resource を作成するたびに、まず、新しい Session を作成してから、その Session から Client または Resource を作成する必要があります。詳細については、Sessions に関するガイドを参照してください。

boto3.resource() を呼び出すとき、暗黙的にデフォルトの Session を使用することになります。これはシングルスレッドのコードを書くのに便利です。マルチスレッドのコードを書くときには、まず、スレッドごとに新しい Session を構築し、その Session からリソースを取得します。

# Explicitly create a new Session for this thread session = boto3.Session() dynamodb = session.resource('dynamodb')

Config オブジェクトのカスタマイズ

Client または Resource オブジェクトを作成するときに、オプションの名前付きパラメータを渡して動作をカスタマイズできます。config という名前のパラメータは、さまざまな機能をアンロックします。これは botocore.client.Config のインスタンスであり、Config のリファレンスドキュメントには、ユーザーが制御できるすべての情報が記載されています。設定ガイドには、概要がわかりやすく書かれています。

注記

これらの動作設定の多くは、Session レベル、AWS 構成ファイル内、または環境変数として変更できます。

タイムアウトの設定

カスタム設定の用途の 1 つは、ネットワークの動作を調整することです。

  • connect_timeout (float または int) — 接続を試みたときにタイムアウト例外がスローされるまでの時間 (秒単位)。デフォルト値は 60 秒です。

  • read_timeout (float または int) — 接続からの読み取りを試みたときにタイムアウト例外がスローされるまでの時間 (秒単位)。デフォルト値は 60 秒です。

DynamoDB では 60 秒のタイムアウトは過剰です。つまり、一時的なネットワークの不具合により、クライアントが再試行できるようになるまでに 1 分間の遅延が生じます。次のコードはタイムアウトを 1 秒に短縮します。

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0 ) dynamodb = boto3.resource('dynamodb', config=my_config)

タイムアウトの詳細については、「レイテンシーを考慮した DynamoDB アプリケーションのための AWS Java SDK HTTP リクエスト設定のチューニング」を参照してください。Java SDK には Python よりも多くのタイムアウト設定があることに注意してください。

キープアライブの設定

botocore 1.27.84 以降を使用している場合は、TCP キープアライブを制御することもできます。

  • tcp_keepalive (bool) - True に設定された場合、新しい接続の作成時に使用される TCP キープアライブソケットオプションを有効にします (デフォルトは False)。これは botocore 1.27.84 以降でのみ使用可能です。

TCP キープアライブを True に設定すると、平均レイテンシーを減らすことができます。以下は、適切な botocore バージョンを使用している場合に TCP キープアライブを条件付きで true に設定するサンプルコードです。

import botocore import boto3 from botocore.config import Config from distutils.version import LooseVersion required_version = "1.27.84" current_version = botocore.__version__ my_config = Config( connect_timeout = 0.5, read_timeout = 0.5 ) if LooseVersion(current_version) > LooseVersion(required_version): my_config = my_config.merge(Config(tcp_keepalive = True)) dynamodb = boto3.resource('dynamodb', config=my_config)
注記

TCP キープアライブは HTTP キープアライブとは異なります。TCP キープアライブでは、接続を維持してドロップを即座に検出するために、基盤となるオペレーティングシステムからソケット接続を介して小さなパケットが送信されます。HTTP キープアライブでは、基盤となるソケット上に構築された Web 接続が再利用されます。HTTP キープアライブは boto3 では常に有効になっています。

アイドル状態の接続を維持できる時間には制限があります。接続がアイドル状態で、次のリクエストではすでに確立されている接続を使いたい場合は、定期的に (たとえば 1 分ごとに) リクエストを送信することを検討してください。

再試行の設定

この設定では、希望する再試行動作を指定できる retries という辞書も使用できます。SDK がエラーを受信し、そのエラーが一時的なタイプの場合、SDK 内で再試行が行われます。エラーが内部で再試行された場合(そして再試行によって最終的に成功の応答が得られた場合)、呼び出し側のコードから見ると、エラーは発生せず、レイテンシーがわずかに増加するだけです。指定できる値は次のとおりです。

  • max_attempts — 1 回のリクエストで行われる最大再試行回数を表す整数。たとえば、この値を 2 に設定すると、最初のリクエストから最大 2 回リクエストが再試行されます。この値を 0 に設定すると、最初のリクエスト以降は再試行されません。

  • total_max_attempts — 1 回のリクエストで行われる最大再試行合計回数を表す整数。これには最初のリクエストが含まれるため、値が 1 の場合はリクエストが再試行されないことを示します。total_max_attemptsmax_attempts の両方が指定された場合、total_max_attempts が優先されます。total_max_attempts は、AWS_MAX_ATTEMPTS 環境変数と max_attempts 設定ファイルの値にマップされるので、max_attempts より優先されます。

  • mode — botocore が使用すべき再試行モードの種類を表す文字列。有効な値は次のとおりです。

    • legacy — デフォルトモード。最初の再試行は 50 ms 待機し、基本係数 2 でエクスポネンシャルバックオフを使用します。DynamoDB の場合、最大試行回数は合計 10 回です (上記でオーバーライドされない限り)。

      注記

      エクスポネンシャルバックオフでは、最後の試行ではほぼ 13 秒待機します。

    • standard — 他の AWS SDK との整合性が高いことから、standard と名付けられています。最初の再試行時には、0 ミリ秒から 1,000 ミリ秒までのランダムな時間だけ待ちます。再試行が必要な場合は、0 ミリ秒から 1,000 ミリ秒までの別のランダムな時間を選択し、その値に 2 を掛けます。さらに再試行が必要な場合は、同じようにランダムな時間を選択し、これに 4 を掛け、試行を続けます。各待機時間の上限は 20 秒です。このモードは、検出された障害条件の数が legacy モードよりも多い場合に再試行を実行します。DynamoDB の場合、最大試行回数は合計 3 回です (上記でオーバーライドされない限り)。

    • adaptive – standard モードのすべての機能を含みながら、クライアント側の自動スロットリングを含む実験的な再試行モード。適応型レート制限を使用すると、SDK は AWS のサービスの容量をより適切に処理するために、リクエストの送信速度を遅くすることができます。これは暫定モードであり、動作が変わる可能性があります。

これらの再試行モードの拡張定義は、再試行ガイドSDK リファレンスの「再試行動作」トピックに記載されています。

次の例は、legacy 再試行ポリシーを明示的に使用して、合計リクエスト 3 回 (再試行 2 回) までにした例です。

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0, retries = { 'mode': 'legacy', 'total_max_attempts': 3 } ) dynamodb = boto3.resource('dynamodb', config=my_config)

DynamoDB は可用性が高く、レイテンシーが低いシステムであるため、組み込みの再試行ポリシーで許可されているよりも積極的に再試行の速度を上げる必要がある場合があります。boto3 に頼って暗黙的な再試行を行うのではなく、最大試行回数を 0 に設定し、自分で例外をキャッチし、必要に応じて独自のコードから再試行することで、独自の再試行ポリシーを実装できます。

独自の再試行ポリシーを管理する場合は、スロットルとエラーを区別しておきましょう。

  • スロットル (ProvisionedThroughputExceededException または ThrottlingException で示される) は、DynamoDB テーブルまたはパーティションの読み取り容量または書き込み容量を超過したことを通知する正常なサービスを示します。1 ミリ秒が経過するごとに、読み取りまたは書き込み容量が少しずつ増えるため、すぐに (50 ミリ秒ごとなど) 再試行して、新しく解放された容量へのアクセスを試みることができます。スロットルについては、エクスポネンシャルバックオフは特に必要ありません。軽量で DynamoDB が返しやすく、リクエストごとの課金も発生しないためです。エクスポネンシャルバックオフでは、すでに最長の待機時間が過ぎたクライアントスレッドの遅延をさらに長くしていくため、統計的に p50 と p99 が外側に広がります。

  • エラー (InternalServerError または ServiceUnavailable などによって示される) は、サービスに一時的な問題があることを示します。これはテーブル全体の場合もあれば、読み取り元または書き込み先のパーティションだけに関する場合もあります。エラーが発生した場合は、再試行の前に少し長く一時停止し (250 ミリ秒や 500 ミリ秒など)、ジッターを使って再試行をずらすことができます。

最大プール接続数の設定

最後に、この構成では接続プールのサイズを制御できます。

  • max_pool_connections (int) — 接続プールに保持できる最大接続数。この値が設定されていない場合、デフォルト値の 10 が使用されます。

このオプションは、プールして再利用できるように保持する HTTP 接続の最大数を制御します。Session ごとに異なるプールが保持されます。10 個を超えるスレッドが、同じ Session から構築されたクライアントやリソースに対して実行されることが予想される場合は、プールされた接続を使用する他のスレッドに対してスレッドが待機する必要がないように、この値を増やすことを検討すべきです。

import boto3 from botocore.config import Config my_config = Config( max_pool_connections = 20 ) # Setup a single session holding up to 20 pooled connections session = boto3.Session(my_config) # Create up to 20 resources against that session for handing to threads # Notice the single-threaded access to the Session and each Resource resource1 = session.resource('dynamodb') resource2 = session.resource('dynamodb') # etc

エラー処理

Boto3 では AWS サービスの例外がすべて静的に定義されているわけではありません。これは、AWS サービスのエラーや例外は大きく異なり、変更される可能性があるためです。Boto3 はすべてのサービス例外を ClientError としてラップし、詳細を構造化された JSON として公開します。例えば、エラーレスポンスは以下のような構造になっているかもしれません。

{ 'Error': { 'Code': 'SomeServiceException', 'Message': 'Details/context around the exception or error' }, 'ResponseMetadata': { 'RequestId': '1234567890ABCDEF', 'HostId': 'host ID data will appear here as a hash', 'HTTPStatusCode': 400, 'HTTPHeaders': {'header metadata key/values will appear here'}, 'RetryAttempts': 0 } }

次のコードは、すべての ClientError 例外をキャッチし、Error 内の Code の文字列値を調べて実行するアクションを決定します。

import botocore import boto3 dynamodb = boto3.client('dynamodb') try: response = dynamodb.put_item(...) except botocore.exceptions.ClientError as err: print('Error Code: {}'.format(err.response['Error']['Code'])) print('Error Message: {}'.format(err.response['Error']['Message'])) print('Http Code: {}'.format(err.response['ResponseMetadata']['HTTPStatusCode'])) print('Request ID: {}'.format(err.response['ResponseMetadata']['RequestId'])) if err.response['Error']['Code'] in ('ProvisionedThroughputExceededException', 'ThrottlingException'): print("Received a throttle") elif err.response['Error']['Code'] == 'InternalServerError': print("Received a server error") else: raise err

一部の (すべてではない) 例外コードは最上位クラスとして実装されています。これらを直接処理することもできます。Client インターフェイスを使用する場合、これらの例外はクライアントに動的に入力され、以下のようにクライアントインスタンスを使用してこれらの例外をキャッチします。

except ddb_client.exceptions.ProvisionedThroughputExceededException:

Resource インターフェイスを使用しているときには、以下のように、.meta.client を使用してリソースから基盤となる Client までトラバースして例外にアクセスする必要があります。

except ddb_resource.meta.client.exceptions.ProvisionedThroughputExceededException:

マテリアライズド例外タイプのリストを確認するには、リストを動的に生成できます。

ddb = boto3.client("dynamodb") print([e for e in dir(ddb.exceptions) if e.endswith('Exception') or e.endswith('Error')])

条件式を使用して書き込み操作を行う場合、式が失敗した場合にその項目の値をエラーレスポンスで返すようにリクエストできます。

try: response = table.put_item( Item=item, ConditionExpression='attribute_not_exists(pk)', ReturnValuesOnConditionCheckFailure='ALL_OLD' ) except table.meta.client.exceptions.ConditionalCheckFailedException as e: print('Item already exists:', e.response['Item'])

エラー処理と例外について詳しくは、以下を参照してください。

ログ記録

boto3 ライブラリは Python の組み込みロギングモジュールと統合されており、セッション中に何が起こるかを追跡できます。ロギングレベルを制御するには、ロギングモジュールを設定します。

import logging logging.basicConfig(level=logging.INFO)

これにより、INFO 以上のレベルのメッセージを記録するようにルートロガーが設定されます。レベルよりも重大度の低いロギングメッセージは無視されます。ロギングレベルには、DEBUGINFOWARNINGERROR、および CRITICAL があります。デフォルトは WARNING です。

boto3 のロガーは階層構造になっています。ライブラリはいくつかの異なるロガーを使用しており、それぞれがライブラリの異なる部分に対応しています。それぞれの動作を個別に制御できます。

  • boto3: boto3 モジュールのメインロガー。

  • botocore: botocore パッケージのメインロガー。

  • botocore.auth: リクエストの AWS 署名作成を記録するために使用されます。

  • botocore.credentials: 認証情報の取得と更新のプロセスを記録するために使用されます。

  • botocore.endpoint: ネットワーク経由で送信される前に、リクエストの作成を記録するために使用されます。

  • botocore.hooks: ライブラリでトリガーされたイベントの記録に使用されます。

  • botocore.loaders: AWS サービスモデルの一部がロードされたときの記録に使用されます。

  • botocore.parsers: AWS サービスレスポンスを解析する前に記録するために使用されます。

  • botocore.retryhandler: AWS サービスリクエストの再試行の処理を記録するために使用されます (レガシーモード)。

  • botocore.retries.standard: AWS サービスリクエストの再試行の処理を記録するために使用されます (standard または adaptive モード)。

  • botocore.utils: ライブラリ内のさまざまなアクティビティを記録するために使用されます。

  • botocore.waiter: 特定の状態になるまで AWS サービスにポーリングするウェイターの機能をログに記録するために使用されます。

他のライブラリもログに記録されます。内部的には、boto3 は HTTP 接続処理にサードパーティの urllib3 を使用しています。レイテンシーが重要な場合は、ログを監視して、urllib3 が新しい接続を確立したり、アイドル状態の接続を閉じたりするタイミングを確認することで、プールが十分に利用されているかどうかを確認できます。

  • urllib3.connectionpool: 接続プールの処理イベントを記録するために使用されます。

以下のコードスニペットは、ほとんどのロギングを INFO に設定し、エンドポイントと接続プールのアクティビティの記録には DEBUG に設定しています。

import logging logging.getLogger('boto3').setLevel(logging.INFO) logging.getLogger('botocore').setLevel(logging.INFO) logging.getLogger('botocore.endpoint').setLevel(logging.DEBUG) logging.getLogger('urllib3.connectionpool').setLevel(logging.DEBUG)

イベントフック

Botocore は実行中のさまざまな段階でイベントを発生させます。これらのイベントのハンドラーを登録すると、イベントが発生するたびにハンドラーが呼び出されます。これにより、内部を変更しなくても botocore の動作を拡張できます。

例えば、アプリケーションの DynamoDB テーブルで PutItem 操作が呼び出されるたびに記録しておきたいとしましょう。'provide-client-params.dynamodb.PutItem' イベントに登録すると、関連する Session で PutItem 操作が呼び出されるたびにキャッチしてログに記録できます。例を示します。

import boto3 import botocore import logging def log_put_params(params, **kwargs): if 'TableName' in params and 'Item' in params: logging.info(f"PutItem on table {params['TableName']}: {params['Item']}") logging.basicConfig(level=logging.INFO) session = boto3.Session() event_system = session.events # Register our interest in hooking in when the parameters are provided to PutItem event_system.register('provide-client-params.dynamodb.PutItem', log_put_params) # Now, every time you use this session to put an item in DynamoDB, # it will log the table name and item data. dynamodb = session.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': '123', 'sk': 'cart#123', 'item_data': 'YourItemData', # ... more attributes ... } )

ハンドラー内で、パラメータをプログラムで操作して動作を変更することもできます。

params['TableName'] = "NewTableName"

イベントについて詳しくは、イベントに関する botocore ドキュメントと、イベントに関する boto3 ドキュメントを参照してください。

ページネーションとページネーター

Query や Scan などの一部のリクエストでは、1 回のリクエストで返されるデータのサイズが制限され、後続のページを取得するにはリクエストを繰り返し行う必要があります。

各ページで読み取られる項目の最大数は、limit パラメータを使用して制御できます。例えば、最後の 10 項目だけが必要な場合、limit を使用して最後の 10 項目だけを取得できます。制限は、フィルタリングが適用される前にテーブルから読み取られる量であることに注意してください。フィルタリング後に正確に 10 を指定する方法はありません。事前にフィルター処理される回数を制御し、実際に 10 個を取得した時点でクライアント側で確認することしかできません。制限にかかわらず、すべての応答の最大サイズは常に 1 MB です。

応答に LastEvaluatedKey が含まれている場合、それは件数またはサイズの制限に達したために応答が終了したことを示します。キーはレスポンスで評価された最後のキーです。この LastEvaluatedKey を取得して,フォローアップコールに ExclusiveStartKey として渡すと、その開始点から次のチャンクを読み取ることができます。LastEvaluatedKey が返されないということは、Query または Scan に一致するアイテムがもうないということです。

以下は、1 ページあたり最大 100 個の項目を読み込み、すべての項目が読み込まれるまでループする単純な例です (Resource インターフェイスを使用していますが、Client インターフェイスでも同じパターンです)。

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') query_params = { 'KeyConditionExpression': Key('pk').eq('123') & Key('sk').gt(1000), 'Limit': 100 } while True: response = table.query(**query_params) # Process the items however you like for item in response['Items']: print(item) # No LastEvaluatedKey means no more items to retrieve if 'LastEvaluatedKey' not in response: break # If there are possibly more items, update the start key for the next page query_params['ExclusiveStartKey'] = response['LastEvaluatedKey']

便宜上、boto3 はページネーターを使ってこれを行うことができます。ただし、Client インターフェイスでのみ機能します。以下はページネーターを使うように書き直されたコードです。

import boto3 dynamodb = boto3.client('dynamodb') paginator = dynamodb.get_paginator('query') query_params = { 'TableName': 'YourTableName', 'KeyConditionExpression': 'pk = :pk_val AND sk > :sk_val', 'ExpressionAttributeValues': { ':pk_val': {'S': '123'}, ':sk_val': {'N': '1000'}, }, 'Limit': 100 } page_iterator = paginator.paginate(**query_params) for page in page_iterator: # Process the items however you like for item in page['Items']: print(item)

詳細については、「ページネーターに関するガイド」と「DynamoDB.Paginator.Query の API リファレンス」を参照してください。

注記

ページネーターには、MaxItemsStartingToken、および PageSize という名前の独自の構成設定もあります。DynamoDB でページ分割を行う場合は、これらの設定は無視してください。

ウェイター

ウェイターは、処理が完了するのを待ってから次の処理を進めることができます。現在のところ、サポートしているのはテーブルが作成または削除されるのを待つことだけです。バックグラウンドでは、ウェイター操作は 20 秒ごとに最大 25 回までチェックを行います。これは自分で行うこともできますが、オートメーションを書くときはウェイターを使うのが便利です。

次のコードは、特定のテーブルが作成されるまで待機する方法を示しています。

# Create a table, wait until it exists, and print its ARN response = client.create_table(...) waiter = client.get_waiter('table_exists') waiter.wait(TableName='YourTableName') print('Table created:', response['TableDescription']['TableArn']

詳細については、「ウェイターガイド」と「ウェイターに関するリファレンス」を参照してください。