亞馬遜 QLDB 驅動程序的 Python-食譜參考 - Amazon Quantum Ledger Database (Amazon QLDB)

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

亞馬遜 QLDB 驅動程序的 Python-食譜參考

本參考指南顯示適用於 Python 的亞馬遜 QLDB 驅動程式的常見使用案例。該範例示範如何使用驅動程式碼範例會示範如何使用該驅動程式碼範例會示範如何使用驅動程式碼範例會示範如何使用驅動程式碼範例 它還包括用於處理 Amazon Ion 資料的程式碼範例。此外,本指南還重點介紹了使交易冪等和實施唯一性約束的最佳實踐。

注意

在適用的情況下,某些使用案例會針對每個支援的 Python QLDB 驅動程式主要版本提供不同的程式碼範例。

匯入驅動程式

下列程式碼範例會匯入驅動程式。

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
注意

此範例也會匯入 Amazon Ion 套件 (amazon.ion.simpleion)。在此參考中執行某些資料作業時,您需要此套件來處理 Ion 資料。如需進一步了解,請參閱 使用 Amazon Ion

實例化驅動程式

下列程式碼範例會建立使用預設設定連線至指定分類帳名稱的驅動程式執行環境。

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD 操作

QLDB 運行建立、讀取、更新和刪除 (CRUD) 操作。

警告

根據最佳實務,寫入交易必須使寫入交易嚴格的冪等。

使交易冪等

我們建議您將寫入事務設為冪等,以避免在重試的情況下出現任何意外的副作用。如果事務可以運行多次並每次產生相同的結果,則事務是冪等的。

例如,假設將文件插入名為的資料表中的交易Person。事務應該首先檢查該文檔是否已經存在於表中。如果沒有此檢查,表格最終可能會出現重複的文件。

假設 QLDB 成功地在服務器端提交事務,但在等待響應的客戶端超時。如果事務不是冪等的,則在重試的情況下,可以多次插入相同的文檔。

使用索引來避免完整表格掃描

我們也建議您在索引欄位或文件 ID 上使用相等運算子來執行具有述WHERE詞子句的陳述式;例如,WHERE indexedField = 123WHERE indexedField IN (456, 789)。如果沒有此索引查找,QLDB 需要進行表掃描,這可能會導致交易逾時或樂觀並發控制(OCC)衝突。

如需 OCC 的詳細資訊,請參閱「Amazon QLLDB 並行模型」。

隱含建立的交易

執行程序 .execute_lambda 方法接受一個 lambda 函數,該函數接受一個 lambda 函數,該函數接收執行程。執行程序,您可以使用它來運行語句的實例。Executor封裝隱含建立之交易的執行個體。

您可以使用交易執行程式的 execute_陳述式方法,在 Lambda 函數中執行陳述式。當 lambda 函數返回時,驅動程序隱式地提交交易。

注意

execute_statement方法同時支持亞馬遜離子類型和 Python 本機類型。如果您將 Python 原生類型作為引數傳遞給execute_statement,則驅動程序會使用amazon.ion.simpleion模塊將其轉換為 Ion 類型(前提是支持給定 Python 數據類型的轉換)。有關支持的數據類型和轉換規則,請參閱簡單源代碼

下列各節說明如何執行基本 CRUD 作業、指定自訂重試邏輯,以及實作唯一性限制。

建立資料表

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

建立索引

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

閱讀文件

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

使用查詢參數

下列程式碼範例會使用原生型別查詢參數。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

下列程式碼範例會使用 Ion 型別查詢參數。

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

下列程式碼範例會使用多個查詢參數。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

下列程式碼範例使用查詢參數清單。

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
注意

當您在沒有索引查閱的情況下執行查詢時,會叫用完整資料表掃描。在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,查詢可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

插入文件

下列程式碼範例會插入原生資料類型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

下列程式碼範例會插入 Ion 資料型別。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

此事務將一個文檔插入到Person表中。在插入之前,它會先檢查文件是否已存在於表格中。這個檢查使事務本質上是冪等的。即使您多次運行此事務,也不會造成任何意外的副作用。

注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

在一個語句中插入多個文檔

要通過使用單個INSERT語句插入多個文檔,你可以通過類型列表的參數到語句如下。

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

傳遞清單時,您不會將變數預留位置 (?<<...>>) 括在雙尖括號 () 中。在手動 PartiQL 陳述式中,雙角括號表示稱為袋子的無序集合。

更新文件

下列程式碼範例會使用原生資料類型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

下列程式碼範例會使用 Ion 資料型別。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

刪除文件

下列程式碼範例會使用原生資料類型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

下列程式碼範例會使用 Ion 資料型別。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

在交易中執行多個陳述式

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

重試邏輯

驅動程式的execute_lambda方法具有內建的重試機制,可在發生可重試的例外狀況 (例如逾時或 OCC 衝突) 時重試交易。

3.x

重試試試試次數上限的重數上限的重試次數上限的重試次數上限,

預設重試限制為4,預設輪詢策略為指數輪詢和抖動,以10毫秒為基礎。您可以通過使用 pyqldb.config.retry_config 的實例來設置每個驅動程序實例和每個事務的重試配置。 RetryConfig

下列程式碼範例會指定具有自訂重試限制的重試邏輯,以及驅動程式執行個體的自訂輪詢策略。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

下列程式碼範例會指定具有自訂重試限制的重試邏輯,以及針對特定 lambda 執行的自訂輪詢策略。此組態execute_lambda會覆寫為驅動程式執行個體設定的重試邏輯。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

重試試試試次數上限的重試試次數上限的重 您可以在初始化時設定retry_limit屬性來設定重試限制PooledQldbDriver

預設重試限制為4

實行唯一性限制

QLDB 不支援唯一索引,但您可以在應用程式中實作此行為。

假設您要在Person表中的GovId字段上實現唯一性約束。您可通過下列方式執行此操作:

  1. 斷言該表沒有具有指定的現有文檔GovId

  2. 插入文檔,如果斷言通過。

如果競爭交易同時通過斷言,則只有其中一個交易將成功提交。另一個交易將會失敗,並出現 OCC 衝突例外狀況。

下列程式碼範例示範如何實作此唯一性限制邏輯。

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

使用 Amazon Ion

下列幾節示範如何使用 Amazon Ion 模組來處理 Ion 資料。

匯入離子模組

import amazon.ion.simpleion as simpleion

建立 Ion 類型

下列程式碼範例會從 Ion 文字建立 Ion 物件。

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

下列程式碼範例會從 Python 建立離子物件dict

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

獲取離子二進制轉儲

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

獲取離子文本轉儲

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

如需有關使用 Ion 的詳細資訊,請參閱上的 Amazon Ion 文件 GitHub。如需在 QLDB 中使用 Ion 的更多程式碼範例,請參閱在亞馬遜 QLDB 中使用亞馬遜離子數據類型