Amazon QLDB 驱动程序推荐 - 亚马逊 Quantum Ledger 数据库(亚马逊QLDB)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon QLDB 驱动程序推荐

重要

终止支持通知:现有客户可以在2025年7月31日终止支持QLDB之前使用亚马逊。有关更多详细信息,请参阅将亚马逊QLDB账本迁移到亚马逊 Aurora Postgr SQL e。

本节介绍配置和使用任何支持的语言的 Amazon QLDB 驱动程序的最佳实践。所提供的代码示例专门针对 Java。

这些建议适用于大多数典型用例,但一种方法并不适合所有情况。使用您认为适合您的应用程序的以下建议。

配置 QldbDriver 对象

QldbDriver对象通过维护可跨事务重复使用的会话池来管理与分类账的连接。会话代表与分类账的单个连接。QLDB每个会话支持一个正在运行的事务。

重要

对于较旧的驱动程序版本,会话池功能仍位于PooledQldbDriver对象中,而非QldbDriver。如果您使用的是以下版本之一,请在本主题的其余部分中将任何提及的 QldbDriver 内容替换为PooledQldbDriver

驱动程序 版本
Java 1.1.0 或更早
.NET 0.1.0-beta
Node.js 1.0.0-rc.1 或更早
Python 2.0.2 或更早

最新版本的驱动中已弃用该 PooledQldbDriver对象。建议升级至最新版本,并将的所有实例转换 PooledQldbDriverQldbDriver

配置 QldbDriver 为全局对象

若优化驱动程序和会话的使用,请确保您的应用程序实例中仅存在一个驱动程序的全局实例。例如,在 Java 中,你可以使用依赖注入框架,例如 SpringGoogle GuiceDagger。以下代码示例显示如何配置 QldbDriver 单例。

@Singleton public QldbDriver qldbDriver (AWSCredentialsProvider credentialsProvider, @Named(LEDGER_NAME_CONFIG_PARAM) String ledgerName) { QldbSessionClientBuilder builder = QldbSessionClient.builder(); if (null != credentialsProvider) { builder.credentialsProvider(credentialsProvider); } return QldbDriver.builder() .ledger(ledgerName) .transactionRetryPolicy(RetryPolicy .builder() .maxRetries(3) .build()) .sessionClientBuilder(builder) .build(); }

配置重试次数

当出现常见的瞬态异常(例如SocketTimeoutExceptionNoHttpResponseException)时,驱动程序会自动重试事务。要设置最大重试次数,可以在创建的实例时使用 transactionRetryPolicy 配置对象的 maxRetriesQldbDriver 参数。(对于上一节中列出的较旧驱动程序版本,请使用PooledQldbDriver的参数retryLimit。)

maxRetries 的默认值为 4

客户端错误,例如 “InvalidParameterException 无法重试”。当它们发生时,事务将中止,会话返回至池中,并将异常抛给驱动程序的客户端。

配置最大并行会话与事务数

QldbDriver的实例用于运行事务的最大分类账会话数由其maxConcurrentTransactions参数定义。(对于上一节中列出的较旧的驱动程序版本,这是由PooledQldbDriverpoolLimit参数定义的。)

此限制必须大于零且小于或等于会话客户端允许的最大打开HTTP连接数,具体定义为会话客户端 AWS SDK。例如,在 Java 中,最大连接数是在ClientConfiguration对象中设置的。

的默认值maxConcurrentTransactions是您的最大连接设置 AWS SDK。

在应用程序 QldbDriver 中配置时,请考虑以下扩展注意事项:

  • 您的池中的会话数应始终至少与您计划同时运行的事务数量一样多。

  • 在监督线程委托给工作线程的多线程模型中,驱动程序应至少拥有与工作线程数量一样多的会话。否则,在峰值负载时,线程将排队等待可用会话。

  • 每个分类账的并发活动会话的服务限制在Amazon 中的配额和限制 QLDB中定义。确保您配置的并发会话数不超过此限制,以便用于所有客户端单个分类账。

重试异常

重试中发生的异常时QLDB,请考虑以下建议。

正在重试 OccConflictException

当事务正在访问的数据自事务开始以来发生了变化时,就会发生乐观并发控制 (OCC) 冲突异常。QLDB在尝试提交事务时会抛出此异常。驱动程序最多会根据配置重试事务maxRetries次数。

有关使用索引限制OCC冲突的更多OCC信息和最佳实践,请参阅亚马逊QLDB并发模型

重试除外的其他异常 QldbDriver

要在运行时抛出自定义的应用程序定义的异常时在驱动程序外部重试事务,您必须包装该事务。例如,在 Java 中,以下代码显示了如何使用 Reslience4J 库在中重试事务。QLDB

private final RetryConfig retryConfig = RetryConfig.custom() .maxAttempts(MAX_RETRIES) .intervalFunction(IntervalFunction.ofExponentialRandomBackoff()) // Retry this exception .retryExceptions(InvalidSessionException.class, MyRetryableException.class) // But fail for any other type of exception extended from RuntimeException .ignoreExceptions(RuntimeException.class) .build(); // Method callable by a client public void myTransactionWithRetries(Params params) { Retry retry = Retry.of("registerDriver", retryConfig); Function<Params, Void> transactionFunction = Retry.decorateFunction( retry, parameters -> transactionNoReturn(params)); transactionFunction.apply(params); } private Void transactionNoReturn(Params params) { try (driver.execute(txn -> { // Transaction code }); } return null; }
注意

在QLDB驱动程序之外重试事务会产生乘数效应。例如,如果配置QldbDriver 为重试三次,并且自定义重试逻辑也重试三次,则同一事务最多可以重试九次。

使事务幂等

我们建议将写事务设置为幂等,以避免重试时出现任何意想不到的副作用。如果事务可以运行多次并每次都产生相同的结果,则事务是幂等的

要了解更多信息,请参阅亚马逊QLDB并发模型

优化性能

要在使用驱动程序运行事务时优化此性能,请考虑以下注意事项:

  • execute操作总是至少SendCommandAPI调用三次QLDB,包括以下命令:

    1. StartTransaction

    2. ExecuteStatement

      对于您在execute数据块中运行的每条 PartiQL 语句,都会调用此命令。

    3. CommitTransaction

    在计算应用程序的总体工作负载时,API请考虑发出的调用总数。

  • 一般来说,我们建议从单线程编写器开始,并通过在单个事务中批处理多个语句来优化事务。最大限度地提高事务大小、文档大小和每笔事务的文档数量的限额,如Amazon 中的配额和限制 QLDB中所定义。

  • 如果批处理不足以满足大型事务负载,您可以通过添加其他编写器来尝试多线程。但是,您应该仔细考虑您的应用程序对文档和事务排序的要求,以及由此带来的额外复杂性。

每笔事务运行多个语句

上一节所述,您可为每个事务运行多个语句以优化应用程序的性能。在以下代码示例中,您可查询到一个表,然后在事务中更新该表中的文档。您可以通过向execute操作传递 lambda 表达式来实现此目的。

Java
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. public static boolean InsureCar(QldbDriver qldbDriver, final String vin) { final IonSystem ionSystem = IonSystemBuilder.standard().build(); final IonString ionVin = ionSystem.newString(vin); return qldbDriver.execute(txn -> { Result result = txn.execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", ionVin); if (!result.isEmpty()) { txn.execute("UPDATE Vehicles SET insured = TRUE WHERE vin = ?", ionVin); return true; } return false; }); }
.NET
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. public static async Task<bool> InsureVehicle(IAsyncQldbDriver driver, string vin) { ValueFactory valueFactory = new ValueFactory(); IIonValue ionVin = valueFactory.NewString(vin); return await driver.Execute(async txn => { // Check if the vehicle is insured. Amazon.QLDB.Driver.IAsyncResult result = await txn.Execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", ionVin); if (await result.CountAsync() > 0) { // If the vehicle is not insured, insure it. await txn.Execute( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", ionVin); return true; } return false; }); }
Go
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. func InsureCar(driver *qldbdriver.QLDBDriver, vin string) (bool, error) { insured, err := driver.Execute(context.Background(), func(txn qldbdriver.Transaction) (interface{}, error) { result, err := txn.Execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) if err != nil { return false, err } hasNext := result.Next(txn) if !hasNext && result.Err() != nil { return false, result.Err() } if hasNext { _, err = txn.Execute( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) if err != nil { return false, err } return true, nil } return false, nil }) if err != nil { panic(err) } return insured.(bool), err }
Node.js
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. async function insureCar(driver: QldbDriver, vin: string): Promise<boolean> { return await driver.executeLambda(async (txn: TransactionExecutor) => { const results: dom.Value[] = (await txn.execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin)).getResultList(); if (results.length > 0) { await txn.execute( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin); return true; } return false; }); };
Python
# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

驱动程序的 execute 操作会隐式启动会话和该会话中的事务。您在 lambda 表达式中运行的每条语句都包含在此事务中。所有语句运行后,驱动程序将自动提交事务。如果在自动重试限制用尽后任何语句失败,则事务将会中止。

在事务中传播异常

当每个事务运行多个语句时,我们通常不建议您捕获并吞掉事务内的异常。

例如,在 Java 中,以下程序会捕获的任何RuntimeException实例,记录错误并继续。此代码示例被认为是错误做法,因为即使UPDATE语句失败,事务也会成功。因此,客户端可能会假定更新成功,而更新却没有成功。

警告

切勿使用此代码示例。提供它是为了展示被认为是不良做法的反模式示例。

// DO NOT USE this code example because it is considered bad practice public static void main(final String... args) { ConnectToLedger.getDriver().execute(txn -> { final Result selectTableResult = txn.execute("SELECT * FROM Vehicle WHERE VIN ='123456789'"); // Catching an error inside the transaction is an anti-pattern because the operation might // not succeed. // In this example, the transaction succeeds even when the update statement fails. // So, the client might assume that the update succeeded when it didn't. try { processResults(selectTableResult); String model = // some code that extracts the model final Result updateResult = txn.execute("UPDATE Vehicle SET model = ? WHERE VIN = '123456789'", Constants.MAPPER.writeValueAsIonValue(model)); } catch (RuntimeException e) { log.error("Exception when updating the Vehicle table {}", e.getMessage()); } }); log.info("Vehicle table updated successfully."); }

改为传播(冒泡)异常。如果事务的任何部分失败,则让execute操作中止事务,以便客户端可以相应地处理异常。