Python 用 Amazon QLDB ドライバー — クックブックリファレンス - Amazon Quantum Ledger Database (Amazon QLDB)

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Python 用 Amazon QLDB ドライバー — クックブックリファレンス

このリファレンスガイドでは、Python 用 Amazon QLDB ドライバーの一般的なユースケースについて説明します。Python コード例を提供し、ドライバーを使用して作成、読み取り、更新、および削除 (CRUD) の基本的なオペレーションを実行する方法を説明しています。Amazon Ion データを処理するためのコード例も含まれています。さらに、このガイドでは、トランザクションをべき等にし、一意性の制約を実装するためのベストプラクティスを取り上げています。

注記

該当する場合、一部のユースケースでは、サポートされているメジャーバージョンの Python 用 QLDB ドライバーに対して異なるコード例があります。

ドライバーのインポート

次のコード例では、ドライバーをインポートします。

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
注記

この例では、Amazon Ion パッケージ (amazon.ion.simpleion) もインポートします。このリファレンスでいくつかのデータオペレーションを実行するときに Ion データを処理するには、このパッケージが必要です。詳細については、「Amazon Ion の操作」を参照してください。

ドライバーのインスタンス化

次のコード例は、デフォルト設定を使用して指定された台帳名に接続するドライバーのインスタンスを作成します。

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD オペレーション

QLDB は作成、読み取り、更新、および削除 (CRUD) オペレーションをトランザクションの一部として実行します。

警告

ベストプラクティスとして、書き込みトランザクションを厳密にべき等にしてください。

トランザクションをべき等にする

再試行の場合に予期しない結果を避けるために、書き込みトランザクションをべき等にすることをお勧めします。トランザクションは、複数回実行して毎回同じ結果を生成できる場合、idempotent です。

例えば、Person という名前のテーブルにドキュメントを挿入するトランザクションについて考えてみます。トランザクションは、まずドキュメントがテーブル内に既に存在するかどうかをチェックする必要があります。このチェックを行わないと、テーブルに重複するドキュメントができる可能性があります。

QLDB がサーバー側でトランザクションを正常にコミットできるが、レスポンスを待機中にクライアントはタイムアウトするとします。トランザクションがべき等でない場合、再試行時に同じドキュメントが複数回挿入される可能性があります。

インデックスを使用してテーブル全体のスキャンを回避する

インデックス付きフィールドまたはドキュメント ID で等価演算子を使用する WHERE 述語句でステートメントを実行する (例: WHERE indexedField = 123 または WHERE indexedField IN (456, 789)) こともお勧めします。このインデックス付きのルックアップがない場合、QLDB はテーブルスキャンを実行する必要があり、トランザクションタイムアウトやオプティミスティック同時実行制御 (OCC) 競合が発生する可能性があります。

OCC の詳細については、「Amazon QLDB 同時実行モデル」を参照してください。

暗黙的に作成されたトランザクション

pyqldb.driver.qldb_driver.execute_lambda メソッドは、pyqldb.execution.executor.Executor のインスタンスを受け取る Lambda 関数を受け入れます。これを使用してステートメントを実行できます。Executor のインスタンスは、暗黙的に作成されたトランザクションをラップします。

Lambda 関数内でステートメントを実行するには、トランザクションエグゼキューターの execute statement メソッドを使用します。ドライバーは、Lambda 関数が戻ったときに暗黙的にトランザクションをコミットします。

注記

execute_statement メソッドは、Amazon Ion 型と Python ネイティブ型の両方をサポートします。Python ネイティブ型を引数として execute_statement に渡すと、ドライバーが amazon.ion.simpleion モジュールを使用して Ion 型に変換します (指定された Python データ型の変換がサポートされている場合)。サポートされているデータ型と変換ルールについては、simpleion のソースコードを参照してください。

次のセクションでは、基本的な CRUD オペレーションの実行、カスタム再試行ロジックの指定、一意性制約の実装方法について説明します。

テーブルの作成

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

インデックスの作成

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

ドキュメントの読み取り

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

クエリパラメータの使用

次のコード例では、ネイティブ型のクエリパラメータを使用します。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

次のコード例では、Ion 型のクエリパラメータを使用します。

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

次のコード例では、複数のクエリパラメータを使用します。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

次のコード例では、クエリパラメータのリストを使用します。

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
注記

インデックス付きルックアップなしでクエリを実行すると、完全なテーブルスキャンが呼び出されます。この例では、GovId フィールドでインデックスを使用して、パフォーマンスを最適化することをお勧めします。GovId でインデックスをオンにしないと、クエリのレイテンシーが大きくなり、OCC 競合例外やトランザクションタイムアウトが発生する可能性があります。

ドキュメントの挿入

次のコード例では、ネイティブデータ型を挿入します。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

次のコード例では、Ion データ型を挿入します。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

このトランザクションは、ドキュメントを Person テーブルに挿入します。挿入する前に、まずドキュメントがテーブル内に既に存在しているかどうかをチェックします。このチェックにより、トランザクションは本質的にべき等になります。このトランザクションを複数回実行しても、意図しない副作用は発生しません。

注記

この例では、GovId フィールドでインデックスを使用して、パフォーマンスを最適化することをお勧めします。GovId でインデックスをオンにしないと、ステートメントのレイテンシーが大きくなり、OCC 競合例外やトランザクションタイムアウトが発生する可能性があります。

1 つのステートメントで複数のドキュメントの挿入

1 つの INSERT ステートメントを使用して複数のドキュメントを挿入するために、次のように型 list のパラメータをステートメントに渡すことができます。

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

リストを渡すときには、変数プレースホルダー (?) を二重山括弧 (<<...>>) で囲まないでください。マニュアルの PartiQL ステートメントでは、二重山括弧はバッグと呼ばれる順序付けされていないコレクションを表します。

ドキュメントの更新

次のコード例では、ネイティブデータ型を使用します。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

次のコード例では、Ion データ型を使用します。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
注記

この例では、GovId フィールドでインデックスを使用して、パフォーマンスを最適化することをお勧めします。GovId でインデックスをオンにしないと、ステートメントのレイテンシーが大きくなり、OCC 競合例外やトランザクションタイムアウトが発生する可能性があります。

ドキュメントの削除

次のコード例では、ネイティブデータ型を使用します。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

次のコード例では、Ion データ型を使用します。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
注記

この例では、GovId フィールドでインデックスを使用して、パフォーマンスを最適化することをお勧めします。GovId でインデックスをオンにしないと、ステートメントのレイテンシーが大きくなり、OCC 競合例外やトランザクションタイムアウトが発生する可能性があります。

トランザクションで複数のステートメントの実行

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

再試行ロジック

ドライバーの execute_lambda メソッドには、再試行可能な例外 (タイムアウトや OCC 競合など) が発生した場合にトランザクションを再試行する組み込みの再試行メカニズムがあります。

3.x

再試行の最大数とバックオフ戦略を設定できます。

デフォルトの再試行制限は 4 であり、デフォルトのバックオフ戦略は 10 ミリ秒のベースを使用するエクスポネンシャルバックオフとジッターです。再試行設定は、pyqldb.config.retry_config.RetryConfig のインスタンスを使用して、ドライバーインスタンスごとおよびトランザクションごとに設定できます。

次のコード例では、ドライバーのインスタンスのカスタム再試行制限とカスタムバックオフ戦略を使用して再試行ロジックを指定します。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

次のコード例では、特定の Lambda 実行のカスタム再試行制限とカスタムバックオフ戦略を使用して再試行ロジックを指定します。execute_lambda のこの設定は、ドライバーインスタンスに設定された再試行ロジックを上書きします。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

再試行の最大数は設定可能です。再試行制限を設定するには、PooledQldbDriver の初期化時に retry_limit プロパティを設定します。

デフォルトの再試行制限回数は 4 です。

一意性制約の実装

QLDB は現在、一意のインデックスをサポートしていません。しかし、この動作をアプリケーションに実装するのは簡単です。

Person テーブル内の GovId フィールドに対して一意性制約を実装するとします。これを行うには、以下を実行するトランザクションを記述します。

  1. テーブルに指定された GovId を持つ既存のドキュメントがないことをアサートします。

  2. アサーションに合格した場合は、ドキュメントを挿入します。

競合するトランザクションが同時にアサーションに合格すると、一方のトランザクションだけが正常にコミットされます。もう一方のトランザクションは OCC 競合例外で失敗します。

次のコード例は、この一意性制約ロジックを実装する方法を示しています。

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
注記

この例では、GovId フィールドでインデックスを使用して、パフォーマンスを最適化することをお勧めします。GovId でインデックスをオンにしないと、ステートメントのレイテンシーが大きくなり、OCC 競合例外やトランザクションタイムアウトが発生する可能性があります。

Amazon Ion の操作

以下のセクションでは、Amazon Ion モジュールを使用して Ion データを処理する方法について説明します。

Ion モジュールのインポート

import amazon.ion.simpleion as simpleion

Ion 型の作成

次のコード例では、Ion テキストから Ion オブジェクトを作成します。

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

次のコード例では、Python dict から Ion オブジェクトを作成します。

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

Ion バイナリダンプの取得

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

Ion テキストダンプの取得

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

Ion の操作方法の詳細については、GitHub で Amazon Ion のドキュメントを参照してください。QLDB で Ion を操作するコード例については、「Amazon QLDB での Amazon Ion のデータ型の操作」を参照してください。