适用于 Python 的Amazon QLDB 驱动程序 — 说明书参考 - Amazon Quantum Ledger Database (Amazon QLDB)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Python 的Amazon QLDB 驱动程序 — 说明书参考

本参考指南显示了 Python 的 Amazon QLDB 驱动程序的常见用例。它提供了 Python 代码示例,演示了如何使用该驱动程序运行基本的创建、读取、更新和删除 (CRUD) 操作。它还包括用于处理 Amazon Ion 数据的代码示例。此外,本指南还重点介绍了使事务具有幂等性和实现唯一性约束的最佳实践。

注意

在适用的情况下,某些用例对于 Python 的 QLDB 驱动程序的每个支持主要版本都配备不同代码示例。

导入驱动程序

下面的代码示例导入驱动程序。

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
注意

此示例还导入了 Amazon Ion 软件包 (amazon.ion.simpleion)。在本参考中运行某些数据操作时,您需要此软件包来处理 Ion 数据。要了解更多信息,请参阅 使用 Amazon Ion

实例化驱动程序

以下代码示例使用默认设置创建连接到指定分类账名称的驱动程序实例。

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD 操作

QLDB 作为事务的一部分运行创建、读取、更新和删除 (CRUD) 操作。

警告

作为最佳实践,使写事务严格地幂等。

使事务幂等

我们建议将写事务设置为幂等,以避免重试时出现任何意想不到的副作用。如果事务可以运行多次并每次都产生相同的结果,则事务是幂等的

例如,假设有一个事务,要将文档插入名为 Person 的表中。事务应该首先检查文档是否已经存在于表中。如果没有这种检查,表最终可能会有重复的文档。

假设 QLDB 在服务器端成功提交了事务,但客户端在等待响应时超时。如果事务不是幂等的,则在重试的情况下可以多次插入相同的文档。

使用索引避免全表扫描

我们还建议您在索引字段或文档 ID 上使用相等运算符来运行带有WHERE谓词子句的语句;例如,WHERE indexedField = 123WHERE indexedField IN (456, 789)。如果没有这种索引查找,QLDB 需要进行表扫描,这可能会导致事务超时或乐观并发控制 (OCC) 冲突。

有关 OCC 的更多信息,请参阅Amazon QLDB 并发模型

隐式创建的事务

pyqldb.driver.qldb_driver.execute_lambda 方法接受一个 lambda 函数,该函数接收一个 TransactionExecutor 的实例,您可以用它来运行语句。Executor 的实例封装了隐式创建的事务。

您可以使用事务执行器的 execute_statement 法在 lambda 函数中运行语句。当 lambda 函数返回时,驱动程序会隐式提交事务。

注意

execute_statement 方法同时支持 Amazon Ion 类型和 Python 本机类型。如果您将 Python 本地类型作为参数传递给 execute_statement,则驱动程序会使用 amazon.ion.simpleion 模块将其转换为 Ion 类型(前提是支持对给定 Python 数据类型的转换)。有关支持的数据类型和转换规则,请参阅 simpleion 源代码

以下各节介绍如何运行基本的 CRUD 操作、指定自定义重试逻辑以及如何实现唯一性约束。

创建表

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

创建索引

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

阅读文档

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

使用查询参数

以下代码示例使用原生类型查询参数。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

以下代码示例使用 Ion 类型查询参数。

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

以下代码示例使用了多个查询参数。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

以下代码示例使用查询参数列表。

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
注意

当您在没有索引查找的情况下运行查询时,它会调用全表扫描。在此示例中,我们建议在GovId字段上设置 索引以优化性能。如果不开启GovId索引,查询可能会有更长的延迟,还可能导致 OCC 冲突异常或者事务超时。

插入文档

以下代码示例插入本地数据类型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

以下代码示例插入 Ion 数据类型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

此事务将文档插入 Person 表中。在插入之前,它首先检查文档是否已存在于表格内。此检查使事务本质上是幂等。即使您多次运行此事务,也不会造成任何异常副作用。

注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启GovId索引,语句可能会有更长的延迟,还可能导致 OCC 冲突异常或者事务超时。

在一条语句内插入多个文档

要使用单个 INSERT 语句插入多个文档,可以向该语句传递一个列表类型的参数,如下所示。

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

传递 Ion 列表时,不要将变量占位符 (?) 括在双尖括号 (<<...>>) 内。在手动 PartiQL 语句中,双尖括号表示名为bag的无序集合。

更新文档

以下代码示例使用原生数据类型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

以下代码示例使用 Ion 数据类型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启GovId索引,语句可能会有更长的延迟,还可能导致 OCC 冲突异常或者事务超时。

删除文档

以下代码示例使用原生数据类型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

以下代码示例使用 Ion 数据类型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启GovId索引,语句可能会有更长的延迟,还可能导致 OCC 冲突异常或者事务超时。

在一个事务中运行多条语句

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

重试逻辑

驱动程序 execute_lambda 方法具有内置的重试机制,如果发生可重试的异常(例如超时或 OCC 冲突),该机制可以重试事务。

3.x

最大重试次数和退避策略为可配置。

默认的重试限制为 4,默认的退避策略是以10毫秒为基数的 Exponential Backoff and Jitter。您可以使用RetryPolicy实例为每个驱动程序实例以及每个事务设置重试策略。

以下代码示例使用自定义重试限制和驱动程序实例的自定义退避策略指定重试逻辑。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

以下代码示例使用自定义重试限制和自定义回退策略为特定 lambda 执行指定重试逻辑。此execute_lambda配置将覆盖为驱动程序实例设置的重试逻辑。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

最大重试次数。您可以通过在初始化PooledQldbDriver 时设置 retry_limit 属性来配置重试限制。

默认重试限制为 4

实现唯一限制

QLDB 不支持唯一索引,但您可在应用程序中实现此行为。

假设您要对 Person 表中的 GovId 字段实现唯一性约束。据此,可以编写执行以下操作的事务:

  1. 断言该表中没有指定 GovId 的现有文档。

  2. 如果断言通过,请插入文档。

如果一个竞争事务同时通过断言,则只有一个事务将成功提交。另一笔事务将失败,并显示 OCC 冲突异常。

以下代码示例显示了如何实现此唯一约束。

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启GovId索引,语句可能会有更长的延迟,还可能导致 OCC 冲突异常或者事务超时。

使用 Amazon Ion

以下各节说明了如何使用 Amazon Ion 模块处理 Ion 数据。

导入 Ion 模块

import amazon.ion.simpleion as simpleion

创建 Ion 类型

以下代码示例从 Ion 文本创建 Ion 对象。

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

以下代码示例从 Python dict创建 Ion 对象。

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

获取 Ion 二进制转储

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

获取 Ion 文本转储

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

有关使用 Ion 的更多信息,请参阅 GitHub 上的 Amazon Ion 文档。有关在 QLDB 中使用 Ion 的更多代码示例,请参阅使用 Amazon QLDB 中的 Amazon Ion 数据类型