Amazon QLDB 驅動程式推薦 - Amazon Quantum Ledger Database (Amazon QLDB)

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon QLDB 驅動程式推薦

本節介紹為任何支持的語言配置和使用 Amazon QLDB 驅動程序的最佳實踐。提供的代碼示例專門用於 Java。

這些建議適用於大多數典型的使用案例,但一種尺寸並不適合所有情況。根據您認為適合您的應用程序,請使用以下建議。

設定 QldbDriver 對象

所以此QldbDriver對象通過維護會議,它們可以跨事務重複使用。一個會議表示與分類帳的單個連接。QLDB 支持每個會話一個主動運行的事務。

重要

對於較舊版本的驅動程序,會話池功能仍然在PooledQldbDriver物件而不是QldbDriver。如果您使用以下版本之一,請將QldbDriver取代為PooledQldbDriver瞭解本主題的其餘部分。

驅動程式 版本
Java 1.1.0或舊版
.NET 0.1.0-beta
Node.js 1.0.0-rc.1或舊版
Python 2.0.2或舊版

所以此PooledQldbDriver物件在最新版本的驅動程式中被棄用。建議您升級至最新版本,並將PooledQldbDriverQldbDriver

設定 QldbDriver 作為全局對象

要優化驅動程序和會話的使用,請確保您的應用程序實例中只有一個驅動程序的全局實例。例如,在 Java 中,您可以使用依賴注入框架,例如春天Google Guice, 或匕首。以下程式碼範例會示範如何將QldbDriver作為一個單身人士。

@Singleton public QldbDriver qldbDriver (AWSCredentialsProvider credentialsProvider, @Named(LEDGER_NAME_CONFIG_PARAM) String ledgerName) { QldbSessionClientBuilder builder = QldbSessionClient.builder(); if (null != credentialsProvider) { builder.credentialsProvider(credentialsProvider); } return QldbDriver.builder() .ledger(ledgerName) .transactionRetryPolicy(RetryPolicy .builder() .maxRetries(3) .build()) .sessionClientBuilder(builder) .build(); }

配置重試嘗試

當常見瞬態異常(例如SocketTimeoutException或者NoHttpResponseException)發生。要設定重試次數上限,您可以使用maxRetries參數的transactionRetryPolicy配置對象時創建QldbDriver。(對於上一節中列出的較舊版本的驅動程序,請使用retryLimit參數PooledQldbDriver。)

maxRetries 的預設值為 4

客户端錯誤,例如InvalidParameterException無法重試。當它們發生時,事務將中止,會話返回到池,並將異常拋出給驅動程序的客户端。

設定並行會話和事務的數目上限

實例使用的最大分類帳會話數QldbDriver來運行事務的定義是由其maxConcurrentTransactions參數。(對於上一節中列出的較舊的驅動程序版本,這是由poolLimit參數PooledQldbDriver。)

此限制必須大於零,且小於或等於會話客户端允許的打開 HTTP 連接上限,如特定AWS開發套件。例如,在 Java 中,最大連接數在ClientConfiguration物件。

預設值為maxConcurrentTransactions是您的AWS開發套件。

當您配置QldbDriver,請考慮以下縮放注意事項:

  • 池的會話數應始終至少與您計劃擁有的並行運行事務的數量相同。

  • 在主管線程委派給工作線程的多線程模型中,驅動程序應至少具有與工作線程數相同的會話數。否則,在峯值負載時,線程將排隊等待可用會話。

  • 每個分類帳的併發活動會話的服務限制在亞馬遜 QLDB 中的配額和限制。確保配置的併發會話不超過此限制,以用於所有客户端的單個分類帳。

在異常情況下重試

在重試 QLDB 中發生的異常時,請考慮以下建議。

在 OCC 衝突時重試

樂觀並行控制(OCC) 衝突異常發生,當事務正在訪問的數據自事務開始以來發生更改。QLDB 在嘗試提交事務時拋出此異常。驅動程序重試事務多達maxRetries已配置。

有關 OCC 和使用索引限制 OCC 衝突的最佳實踐的詳細信息,請參閲Amazon QLLDB 並行模型

重試 QLDB 驅動程序之外的其他異常

若要在運行時引發自定義的應用程序定義異常時在驅動程序之外重試事務,您必須包裝該事務。例如,在 Java 中,以下程式碼會示範如何使用雷斯利恩塞 4J庫來重試 QLDB 中的事務。

private final RetryConfig retryConfig = RetryConfig.custom() .maxAttempts(MAX_RETRIES) .intervalFunction(IntervalFunction.ofExponentialRandomBackoff()) // Retry this exception .retryExceptions(InvalidSessionException.class, MyRetryableException.class) // But fail for any other type of exception extended from RuntimeException .ignoreExceptions(RuntimeException.class) .build(); // Method callable by a client public void myTransactionWithRetries(Params params) { Retry retry = Retry.of("registerDriver", retryConfig); Function<Params, Void> transactionFunction = Retry.decorateFunction( retry, parameters -> transactionNoReturn(params)); transactionFunction.apply(params); } private Void transactionNoReturn(Params params) { try (driver.execute(txn -> { // Transaction code }); } return null; }
注意

在 QLDB 驅動程序之外重試事務會產生乘數效應。例如,如果QldbDriver配置為重試三次,並且自定義重試邏輯也重試三次,同一事務最多可以重試九次。

使交易冪等

作為最佳做法,使寫入事務具有冪等性,以避免在重試情況下出現任何意外的副作用。交易為冪等性如果它可以運行多次並且每次產生相同的結果。

如需進一步了解,請參閱 Amazon QLLDB 並行模型

最佳化效能

要在使用驅動程序運行事務時優化性能,請注意以下事項:

  • 所以此execute操作始終使至少三個SendCommandAPI 調用 QLDB,包括以下命令:

    1. StartTransaction

    2. ExecuteStatement

      此命令被調用為您在execute區塊。

    3. CommitTransaction

    計算應用程序的整體工作負載時,請考慮所進行的 API 調用總數。

  • 通常,我們建議從單線程編寫器開始,並通過批處理單個事務中的多個語句來優化事務。最大限度地提高事務處理大小、文檔大小和每個事務處理的文檔數量的配額,如亞馬遜 QLDB 中的配額和限制

  • 如果批處理不足以滿足大型事務加載,您可以通過添加其他寫入程序來嘗試多線程。但是,您應該仔細考慮您的應用程序對文檔和事務排序的要求以及這帶來的額外複雜性。

每個事務運行多個語句

上一節,您可以為每個事務運行多個語句以優化應用程序的性能。在下面的代碼示例中,查詢表,然後在事務中更新該表中的文檔。您可以通過將 lambda 表達式傳遞給executeoperation.

Java
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. public static boolean InsureCar(QldbDriver qldbDriver, final String vin) { final IonSystem ionSystem = IonSystemBuilder.standard().build(); final IonString ionVin = ionSystem.newString(vin); return qldbDriver.execute(txn -> { Result result = txn.execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", ionVin); if (!result.isEmpty()) { txn.execute("UPDATE Vehicles SET insured = TRUE WHERE vin = ?", ionVin); return true; } return false; }); }
.NET
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. public static async Task<bool> InsureVehicle(IAsyncQldbDriver driver, string vin) { ValueFactory valueFactory = new ValueFactory(); IIonValue ionVin = valueFactory.NewString(vin); return await driver.Execute(async txn => { // Check if the vehicle is insured. Amazon.QLDB.Driver.IAsyncResult result = await txn.Execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", ionVin); if (await result.CountAsync() > 0) { // If the vehicle is not insured, insure it. await txn.Execute( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", ionVin); return true; } return false; }); }
Go
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. func InsureCar(driver *qldbdriver.QLDBDriver, vin string) (bool, error) { insured, err := driver.Execute(context.Background(), func(txn qldbdriver.Transaction) (interface{}, error) { result, err := txn.Execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) if err != nil { return false, err } hasNext := result.Next(txn) if !hasNext && result.Err() != nil { return false, result.Err() } if hasNext { _, err = txn.Execute( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) if err != nil { return false, err } return true, nil } return false, nil }) if err != nil { panic(err) } return insured.(bool), err }
Node.js
// This code snippet is intentionally trivial. In reality you wouldn't do this because you'd // set your UPDATE to filter on vin and insured, and check if you updated something or not. async function insureCar(driver: QldbDriver, vin: string): Promise<boolean> { return await driver.executeLambda(async (txn: TransactionExecutor) => { const results: dom.Value[] = (await txn.execute( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin)).getResultList(); if (results.length > 0) { await txn.execute( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin); return true; } return false; }); };
Python
# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

驅動程式execute操作隱式啟動會話和該會話中的事務。在 lambda 表達式中運行的每個語句都包裝在事務中。所有語句運行後,驅動程序會自動提交事務。如果任何語句在用盡自動重試限制後失敗,則事務將中止。

傳播事務中的異常

當每個事務運行多個語句時,我們通常不建議您 catch 和吞嚥事務中的異常。

例如,在 Java 中,以下程序捕獲RuntimeException,記錄錯誤,然後繼續。此代碼示例被認為是不好的做法,因為即使UPDATE語句失敗。因此,客户端可能會假設更新成功,當它沒有成功。

警告

不要使用此代碼示例。它是為了顯示一個被認為是不好的做法的反模式示例。

// DO NOT USE this code example because it is considered bad practice public static void main(final String... args) { ConnectToLedger.getDriver().execute(txn -> { final Result selectTableResult = txn.execute("SELECT * FROM Vehicle WHERE VIN ='123456789'"); // Catching an error inside the transaction is an anti-pattern because the operation might // not succeed. // In this example, the transaction succeeds even when the update statement fails. // So, the client might assume that the update succeeded when it didn't. try { processResults(selectTableResult); String model = // some code that extracts the model final Result updateResult = txn.execute("UPDATE Vehicle SET model = ? WHERE VIN = '123456789'", Constants.MAPPER.writeValueAsIonValue(model)); } catch (RuntimeException e) { log.error("Exception when updating the Vehicle table {}", e.getMessage()); } }); log.info("Vehicle table updated successfully."); }

傳播(冒泡)異常。如果事務的任何部分失敗,請讓execute操作中止事務,以便客户端可以相應地處理異常。