Pilote Amazon QLDB pour Python — Référence du livre de recettes - Amazon Quantum Ledger Database (Amazon QLDB)

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Pilote Amazon QLDB pour Python — Référence du livre de recettes

Ce guide de référence présente des cas d'utilisation courants du pilote Amazon QLDB pour Python. Il fournit des exemples de code Python montrant comment utiliser le pilote pour exécuter des opérations de création, lecture, mise à jour et suppression (CRUD) typiques. Il inclut également des exemples de code pour le traitement des données Amazon Ion. En outre, ce guide met en évidence les meilleures pratiques pour rendre les transactions idempotentes et mettre en œuvre des contraintes d'unicité.

Note

Le cas échéant, certains cas d'utilisation comportent des exemples de code différents pour chaque version majeure prise en charge du pilote QLDB pour Python.

Importation du pilote

L'exemple de code suivant importe le pilote.

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
Note

Cet exemple importe également le package Amazon Ion (amazon.ion.simpleion). Vous avez besoin de ce package pour traiter les données Ion lors de l'exécution de certaines opérations de données dans cette référence. Pour en savoir plus, consultez Utilisation d'Amazon Ion.

Instanciation du pilote

L'exemple de code suivant crée une instance du pilote qui se connecte à un nom de registre spécifié à l'aide des paramètres par défaut.

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

Opérations CRUD

QLDB exécute des opérations de création, lecture, mise à jour et suppression (CRUD) dans le cadre d'une transaction.

Avertissement

À titre de bonne pratique, faites en sorte que vos transactions d'écriture soient strictement idempotentes.

Rendre les transactions idempotentes

Nous vous recommandons de rendre les transactions d'écriture idempotentes afin d'éviter tout effet secondaire inattendu en cas de nouvelles tentatives. Une transaction est idempotente si elle peut être exécutée plusieurs fois et produire des résultats identiques à chaque fois.

Prenons l'exemple d'une transaction qui insère un document dans une table nomméePerson. La transaction doit d'abord vérifier si le document existe déjà dans le tableau. Sans cette vérification, le tableau risque de contenir des documents dupliqués.

Supposons que QLDB valide avec succès la transaction côté serveur, mais que le client expire en attendant une réponse. Si la transaction n'est pas idempotente, le même document peut être inséré plusieurs fois en cas de nouvelle tentative.

Utilisation d'index pour éviter l'analyse complète des tables

Nous vous recommandons également d'exécuter des instructions avec une clause deWHERE prédicat à l'aide d'un opérateur d'égalité sur un champ indexé ou un identifiant de document, par exemple,WHERE indexedField = 123 ouWHERE indexedField IN (456, 789). Sans cette recherche indexée, QLDB doit effectuer une analyse des tables, ce qui peut entraîner des délais d'expiration des transactions ou des conflits de contrôle optimiste de la concurrence (OCC).

Pour plus d'informations sur l'OCC, consultezModèle de mise QLDB simultansimultansimultansimultansimultansimultansimultansimultansimultanéité.

Transactions créées implicitement

La méthode pyqldb.driver.qldb_driver.execute_lambda accepte une fonction lambda qui reçoit une instance de PyQLDB.Execution.Executor.Executor, que vous pouvez utiliser pour exécuter des instructions. L'instance deExecutor encapsule une transaction créée implicitement.

Vous pouvez exécuter des instructions dans la fonction lambda en utilisant la méthode execute_statement de l'exécuteur de transactions. Le pilote valide implicitement la transaction lorsque la fonction Lambda est renvoyée.

Note

Laexecute_statement méthode prend en charge à la fois les types Amazon Ion et les types natifs de Python. Si vous transmettez un type natif Python comme argument àexecute_statement, le pilote le convertit en type Ion à l'aide duamazon.ion.simpleion module (à condition que la conversion pour le type de données Python donné soit prise en charge). Pour connaître les types de données et les règles de conversion pris en charge, consultez le code source de Simpleion.

Les sections suivantes montrent comment exécuter des opérations CRUD de base, spécifier une logique de nouvelle tentative personnalisée et implémenter des contraintes d'unicité.

Création de tables

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

Création d'index

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

Lire des documents

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

Utilisation des paramètres de la requête

L'exemple de code suivant utilise un paramètre de requête de type natif.

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

L'exemple de code suivant utilise un paramètre de requête de type Ion.

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

L'exemple de code suivant utilise plusieurs paramètres de requête.

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

L'exemple de code suivant utilise une liste de paramètres de requête.

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
Note

Lorsque vous exécutez une requête sans recherche indexée, elle appelle une analyse complète de la table. Dans cet exemple, nous recommandons de disposer d'un index sur leGovId terrain afin d'optimiser les performances. Si aucun index n'est activéGovId, les requêtes peuvent présenter une latence plus importante et peuvent également entraîner des exceptions de conflit OCC ou des délais de transaction.

Insertion de documents

L'exemple de code suivant insère des types de données natifs.

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

L'exemple de code suivant insère des types de données Ion.

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

Cette transaction insère un document dans lePerson tableau. Avant de l'insérer, il vérifie d'abord si le document existe déjà dans le tableau. Ce contrôle confère à la transaction un caractère idempotent. Même si vous exécutez cette transaction plusieurs fois, elle n'aura aucun effet secondaire imprévu.

Note

Dans cet exemple, nous recommandons de disposer d'un index sur leGovId terrain afin d'optimiser les performances. Si aucun index n'est activéGovId, les instructions peuvent présenter une latence plus importante et peuvent également entraîner des exceptions de conflit OCC ou des délais de transaction.

Insertion de plusieurs documents dans une seule déclaration

Pour insérer plusieurs documents à l'aide d'une seuleINSERT instruction, vous pouvez transmettre un paramètre de type list à l'instruction comme suit.

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

Vous ne placez pas l'espace réservé à la variable (?) entre crochets (<<...>>) lorsque vous transmettez une liste. Dans les instructions manuelles de PartiQL, les crochets doubles indiquent une collection non ordonnée appelée sac.

Mise à jour des documents

L'exemple de code suivant utilise des types de données natifs.

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

L'exemple de code suivant utilise les types de données Ion.

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
Note

Dans cet exemple, nous recommandons de disposer d'un index sur leGovId terrain afin d'optimiser les performances. Si aucun index n'est activéGovId, les instructions peuvent présenter une latence plus importante et peuvent également entraîner des exceptions de conflit OCC ou des délais de transaction.

Supprimer des documents

L'exemple de code suivant utilise des types de données natifs.

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

L'exemple de code suivant utilise les types de données Ion.

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
Note

Dans cet exemple, nous recommandons de disposer d'un index sur leGovId terrain afin d'optimiser les performances. Si aucun index n'est activéGovId, les instructions peuvent présenter une latence plus importante et peuvent également entraîner des exceptions de conflit OCC ou des délais de transaction.

Exécution de plusieurs relevés dans une transaction

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

Nouvelle tentative de nouvelle tentative

Laexecute_lambda méthode du pilote possède un mécanisme de nouvelle tentative intégré qui réessaie la transaction si une exception pouvant être réessayée se produit (par exemple, des délais d'attente ou des conflits OCC).

3.x

Le nombre maximum de nouvelles tentatives et la stratégie de nouvelle tentative sont configurables.

La limite de tentatives par défaut est de4, et la stratégie d'arrêt par défaut est Exponential Backoff and Jitter, avec une base de10 millisecondes. Vous pouvez définir la configuration de nouvelle tentative par instance de pilote et également par transaction en utilisant une instance de pyqldb.config.retry_config. RetryConfig.

L'exemple de code suivant spécifie une logique de nouvelle tentative avec une limite de tentatives personnalisée et une stratégie d'arrêt personnalisée pour une instance du pilote.

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

L'exemple de code suivant spécifie une logique de nouvelle tentative avec une limite de tentatives personnalisée et une stratégie d'arrêt personnalisée pour une exécution Lambda particulière. Cette configurationexecute_lambda remplace la logique de nouvelle tentative définie pour l'instance du pilote.

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

Le nombre maximum de nouvelles tentatives maximum est configurable. Vous pouvez configurer la limite de tentatives en définissant laretry_limit propriété lors de l'initialisationPooledQldbDriver.

La limite de tentatives par défaut est de4.

Mise en œuvre des contraintes d'unicité

QLDB ne prend pas en charge les index uniques, mais vous pouvez implémenter ce comportement dans votre application.

Supposons que vous souhaitiez implémenter une contrainte d'unicité sur leGovId champ de laPerson table. Pour ce faire, vous pouvez écrire une transaction qui effectue les actions suivantes :

  1. Affirmez que la table ne contient aucun document existant portant une valeur spécifiéeGovId.

  2. Insérez le document si l'assertion est acceptée.

Si une transaction concurrente passe simultanément l'assertion, seule l'une des transactions sera validée avec succès. L'autre transaction échouera avec une exception de conflit OCC.

L'exemple de code suivant montre comment mettre en œuvre cette logique de contrainte d'unicité.

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
Note

Dans cet exemple, nous recommandons de disposer d'un index sur leGovId terrain afin d'optimiser les performances. Si aucun index n'est activéGovId, les instructions peuvent présenter une latence plus importante et peuvent également entraîner des exceptions de conflit OCC ou des délais de transaction.

Utilisation d'Amazon Ion

Les sections suivantes montrent comment utiliser le module Amazon Ion pour traiter les données Ion.

Importation du module Ion

import amazon.ion.simpleion as simpleion

Création de types Ion

L'exemple de code suivant crée un objet Ion à partir du texte Ion.

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

L'exemple de code suivant crée un objet Ion à partir d'un Pythondict.

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

Obtenir un dump binaire Ion

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

Obtenir un dump de texte Ion

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

Pour plus d'informations sur l'utilisation d'Ion, consultez la documentation Amazon Ion sur GitHub. Pour plus d'exemples de code illustrant l'utilisation d'Ion dans QLDB, consultezUtilisation des types de données Amazon Ion dans Amazon QLDB.