Python용 Amazon QLDB 드라이버 - Cookbook 참조 - Amazon Quantum Ledger Database(QLDB)

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Python용 Amazon QLDB 드라이버 - Cookbook 참조

이 참조 가이드는 Python용 Amazon QLDB 드라이버의 일반적인 사용 사례를 보여줍니다. 이 Python 코드 예제는 드라이버를 사용하여 기본 CRUD(생성, 읽기, 업데이트, 삭제) 작업을 실행하는 방법을 보여줍니다. 또한 Amazon Ion 데이터를 처리하기 위한 코드 예제도 포함되어 있습니다. 또한 이 가이드에서는 트랜잭션에 멱등성을 부여하고 고유성 제약하는 모범 사례를 중점적으로 설명합니다.

참고

해당하는 경우, 일부 사용 사례에서는 Python용 QLDB 드라이버의 지원되는 각 주요 버전마다 다른 코드 예제가 있습니다.

드라이버 가져오기

다음 코드 예제에서는 드라이브를 가져옵니다.

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
참고

이 예제에서는 Amazon Ion 패키지(amazon.ion.simpleion)도 가져옵니다. 이 참조에서 일부 데이터 작업을 실행할 때 Ion 데이터를 처리하려면 이 패키지가 필요합니다. 자세한 내용은 Amazon Ion 작업 섹션을 참조하세요.

드라이버 인스턴스화

다음 코드 예제는 기본 설정을 사용하여 지정된 원장 이름에 연결하는 드라이버 인스턴스를 만듭니다.

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD 작업

QLDB는 트랜잭션의 일부로 CRUD(생성, 읽기, 업데이트, 삭제) 작업을 실행합니다.

주의

가장 좋은 방법은 쓰기 트랜잭션이 완전한 멱등성을 부여하는 것입니다.

트랜잭션에 멱등성 부여하기

재시도 시 예상치 못한 부작용이 발생하지 않도록 쓰기 트랜잭션에 멱등성을 부여하는 것이 좋습니다. 여러 번 실행하여 매번 동일한 결과를 생성할 수 있는 트랜잭션은 멱등성을 가집니다.

이름이 Person인 테이블에 문서를 삽입하는 트랜잭션을 예로 들어 보겠습니다. 트랜잭션은 먼저 문서가 테이블에 이미 존재하는지 여부를 확인해야 합니다. 이렇게 확인하지 않으면 테이블에 문서가 중복될 수 있습니다.

QLDB가 서버 측에서 트랜잭션을 성공적으로 커밋했지만 응답을 기다리는 동안 클라이언트 제한 시간이 초과되었다고 가정해 보겠습니다. 트랜잭션이 멱등성을 가지지 않는 경우 재시도 시 동일한 문서가 두 번 이상 삽입될 수 있습니다.

인덱스를 사용하여 전체 테이블 스캔 방지

인덱싱된 필드 또는 문서 ID(예: WHERE indexedField = 123 또는 WHERE indexedField IN (456, 789))에서 동등 연산자를 사용하여 WHERE 조건자 절이 포함된 문을 실행하는 것이 좋습니다. 이 인덱싱된 조회가 없으면 QLDB는 테이블 스캔을 수행해야 하며, 이로 인해 트랜잭션 제한 시간이 초과되거나 OCC(낙관적 동시성 제어) 충돌이 발생할 수 있습니다.

OCC에 대한 자세한 내용은 Amazon QLDB 동시성 모델 단원을 참조하세요.

암시적으로 생성된 트랜잭션

pyqldb.driver.qldb_driver.execute_lambda 메서드는 pyqldb.execution.executor.Executor의 인스턴스를 수신하는 Lambda 함수를 받아들이며, 이 함수를 사용하여 명령문을 실행할 수 있습니다. Executor의 인스턴스는 암시적으로 생성된 트랜잭션을 래핑합니다.

트랜잭션 실행자의 execute_statement 메서드를 사용하여 Lambda 함수 내에서 명령문을 실행할 수 있습니다. 드라이버는 Lambda 함수가 반환될 때 트랜잭션을 암시적으로 커밋합니다.

참고

execute_statement 메서드는 Amazon Ion 유형과 Python 네이티브 유형을 모두 지원합니다. Python 네이티브 유형을 인수로 execute_statement에 전달하면 드라이버가 amazon.ion.simpleion 모듈을 사용하여 Ion 유형으로 변환합니다(지정된 Python 데이터 유형에 대한 변환이 지원되는 경우). 지원되는 데이터 유형 및 변환 규칙은 simpleion 소스 코드를 참조하세요.

다음 섹션에서는 기본 CRUD 작업을 실행하고, 사용자 지정 재시도 로직을 지정하고, 고유성 제약 조건을 구현하는 방법을 보여줍니다.

테이블 생성

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

인덱스 생성

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

문서 읽기

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

쿼리 파라미터 사용

다음 코드 예제는 네이티브 유형 쿼리 파라미터를 사용합니다.

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

다음 코드 예제는 Ion 유형 쿼리 파라미터를 사용합니다.

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

다음 코드 예제는 여러 쿼리 파라미터를 사용합니다.

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

다음 코드 예제는 쿼리 파라미터 목록을 사용합니다.

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
참고

인덱싱된 조회 없이 쿼리를 실행하면 전체 테이블 스캔이 호출됩니다. 이 예제에서는 성능을 최적화하기 위해 GovId 필드에 인덱스를 사용하는 것이 좋습니다. GovId에 인덱스를 사용하지 않으면 쿼리에 지연 시간이 길어지고 OCC 충돌 예외 또는 트랜잭션 시간 초과가 발생할 수도 있습니다.

문서 삽입하기

다음 코드 예제는 네이티브 데이터 유형을 삽입합니다.

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

다음 코드 예제는 Ion 데이터 유형을 삽입합니다.

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

이 트랜잭션은 문서를 Person 테이블에 삽입합니다. 삽입하기 전에 먼저 문서가 테이블에 이미 있는지 확인합니다. 이 검사를 통해 트랜잭션은 본질적으로 멱등성을 가지게 됩니다. 이 트랜잭션을 여러 번 실행하더라도 의도하지 않은 부작용이 발생하지는 않습니다.

참고

이 예제에서는 성능을 최적화하기 위해 GovId 필드에 인덱스를 사용하는 것이 좋습니다. GovId에 인덱스를 설정하지 않으면 명령문의 지연 시간이 길어지고 OCC 충돌 예외 또는 트랜잭션 시간 초과가 발생할 수도 있습니다.

하나의 명령문에 여러 문서 삽입

단일 INSERT 문을 사용하여 여러 문서를 삽입하려면 다음과 같이 목록 타입의 파라미터를 해당 문에 전달할 수 있습니다.

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

목록을 전달할 때 변수 자리 표시자(?)를 이중 꺾쇠 괄호( <<...>> )로 묶지 마세요. 수동 PartiQL 문에서 이중 꺾쇠 괄호는 으로 알려진 정렬되지 않은 모음을 의미합니다.

문서 업데이트

다음 코드 예제는 네이티브 데이터 유형을 사용합니다.

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

다음 코드 예제는 Ion 데이터 유형을 사용합니다.

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
참고

이 예제에서는 성능을 최적화하기 위해 GovId 필드에 인덱스를 사용하는 것이 좋습니다. GovId에 인덱스를 설정하지 않으면 명령문의 지연 시간이 길어지고 OCC 충돌 예외 또는 트랜잭션 시간 초과가 발생할 수도 있습니다.

문서 삭제

다음 코드 예제는 네이티브 데이터 유형을 사용합니다.

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

다음 코드 예제는 Ion 데이터 유형을 사용합니다.

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
참고

이 예제에서는 성능을 최적화하기 위해 GovId 필드에 인덱스를 사용하는 것이 좋습니다. GovId에 인덱스를 설정하지 않으면 명령문의 지연 시간이 길어지고 OCC 충돌 예외 또는 트랜잭션 시간 초과가 발생할 수도 있습니다.

하나의 트랜잭션에서 여러 명령문 실행

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

재시도 로직

드라이버의 execute_lambda 메서드에는 재시도 가능한 예외(예: 시간 초과 또는 OCC 충돌)가 발생할 경우 트랜잭션을 재시도하는 재시도 메커니즘이 내장되어 있습니다.

3.x

최대 재시도 횟수와 백오프 전략을 구성할 수 있습니다.

기본 재시도 제한은 4이며, 기본 백오프 전략은 10밀리초 단위의 지수 백오프 및 Jitter입니다. pyqldb.config.retry_config.RetryConfig 인스턴스를 사용하여 드라이버 인스턴스별 및 트랜잭션별 재시도 구성을 설정할 수 있습니다.

다음 코드 예제는 드라이버 인스턴스에 대한 사용자 지정 재시도 제한 및 사용자 지정 백오프 전략을 사용하여 재시도 로직을 지정합니다.

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

다음 코드 예제는 특정 Lambda 실행에 대한 사용자 지정 재시도와 사용자 지정 백오프 전략을 사용하여 재시도 로직을 지정합니다. execute_lambda에 대한 이 구성은 드라이버 인스턴스에 설정된 재시도 로직을 재정의합니다.

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

최대 재시도 횟수는 구성 가능합니다. PooledQldbDriver를 초기화할 때 retry_limit 속성을 설정하여 재시도 제한을 구성할 수 있습니다.

기본 재시도 한도는 4입니다.

고유성 제약 조건 구현

QLDB는 고유 인덱스를 지원하지 않지만 애플리케이션에서 이 동작을 구현할 수 있습니다.

Person 테이블의 GovId 필드에 고유성 제약 조건을 구현하려고 한다고 가정해 보겠습니다. 이렇게 하면 다음 작업을 수행하는 트랜잭션을 작성합니다.

  1. 테이블에 지정된 GovId가 있는 기존 문서가 없는지 확인합니다.

  2. 어설션이 통과하면 문서를 삽입합니다.

경쟁 트랜잭션이 어설션을 동시에 통과하면 트랜잭션 중 하나만 성공적으로 커밋됩니다. 다른 트랜잭션은 OCC 충돌 예외가 발생하여 실패합니다.

다음 코드 예제는 이 고유성 제약 조건 구현 방법을 보여줍니다.

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
참고

이 예제에서는 성능을 최적화하기 위해 GovId 필드에 인덱스를 사용하는 것이 좋습니다. GovId에 인덱스를 설정하지 않으면 명령문의 지연 시간이 길어지고 OCC 충돌 예외 또는 트랜잭션 시간 초과가 발생할 수도 있습니다.

Amazon Ion 작업

다음 섹션에서는 Amazon Ion 모듈을 사용하여 Ion 데이터를 처리하는 방법을 보여줍니다.

Ion 모듈 가져오기

import amazon.ion.simpleion as simpleion

Ion 유형 생성

다음 코드 예제는 Ion 텍스트에서 Ion 객체를 만듭니다.

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

다음 코드 예제는 Python dict에서 Ion 객체를 만듭니다.

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

Ion 이진 덤프 가져오기

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

Ion 텍스트 덤프 가져오기

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

Ion 작업에 대한 자세한 정보는 GitHub의 Amazon Ion 설명서를 참조하십시오. QLDB에서 Ion을 사용하는 방법에 대한 추가 코드 예제는 Amazon QLDB에서 Amazon Ion 데이터 타입을 사용한 작업 섹션을 참조하세요.