Step 5: Modify documents in a ledger - Amazon Quantum Ledger Database (Amazon QLDB)

Step 5: Modify documents in a ledger

Now that you have data to work with, you can start making changes to documents in the vehicle-registration ledger in Amazon QLDB. In this step, the following code examples demonstrate how to run data manipulation language (DML) statements. These statements update the primary owner of one vehicle and add a secondary owner to another vehicle.

To modify documents
  1. Use the following program (transfer_vehicle_ownership.py) to update the primary owner of the vehicle with VIN 1N4AL11D75C109151 in your ledger.

    3.x
    # Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # # This code expects that you have AWS credentials setup per: # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html from logging import basicConfig, getLogger, INFO from pyqldbsamples.add_secondary_owner import get_document_ids, print_result, SampleData from pyqldbsamples.constants import Constants from pyqldbsamples.model.sample_data import convert_object_to_ion from pyqldbsamples.connect_to_ledger import create_qldb_driver logger = getLogger(__name__) basicConfig(level=INFO) def find_person_from_document_id(transaction_executor, document_id): """ Query a driver's information using the given ID. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type document_id: :py:class:`amazon.ion.simple_types.IonPyText` :param document_id: The document ID required to query for the person. :rtype: :py:class:`amazon.ion.simple_types.IonPyDict` :return: The resulting document from the query. """ query = 'SELECT p.* FROM Person AS p BY pid WHERE pid = ?' cursor = transaction_executor.execute_statement(query, document_id) return next(cursor) def find_primary_owner_for_vehicle(driver, vin): """ Find the primary owner of a vehicle given its VIN. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: The VIN to find primary owner for. :rtype: :py:class:`amazon.ion.simple_types.IonPyDict` :return: The resulting document from the query. """ logger.info('Finding primary owner for vehicle with VIN: {}.'.format(vin)) query = "SELECT Owners.PrimaryOwner.PersonId FROM VehicleRegistration AS v WHERE v.VIN = ?" cursor = driver.execute_lambda(lambda executor: executor.execute_statement(query, convert_object_to_ion(vin))) try: return driver.execute_lambda(lambda executor: find_person_from_document_id(executor, next(cursor).get('PersonId'))) except StopIteration: logger.error('No primary owner registered for this vehicle.') return None def update_vehicle_registration(driver, vin, document_id): """ Update the primary owner for a vehicle using the given VIN. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: The VIN for the vehicle to operate on. :type document_id: :py:class:`amazon.ion.simple_types.IonPyText` :param document_id: New PersonId for the primary owner. :raises RuntimeError: If no vehicle registration was found using the given document ID and VIN. """ logger.info('Updating the primary owner for vehicle with Vin: {}...'.format(vin)) statement = "UPDATE VehicleRegistration AS r SET r.Owners.PrimaryOwner.PersonId = ? WHERE r.VIN = ?" cursor = driver.execute_lambda(lambda executor: executor.execute_statement(statement, document_id, convert_object_to_ion(vin))) try: print_result(cursor) logger.info('Successfully transferred vehicle with VIN: {} to new owner.'.format(vin)) except StopIteration: raise RuntimeError('Unable to transfer vehicle, could not find registration.') def validate_and_update_registration(driver, vin, current_owner, new_owner): """ Validate the current owner of the given vehicle and transfer its ownership to a new owner. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: The VIN of the vehicle to transfer ownership of. :type current_owner: str :param current_owner: The GovId of the current owner of the vehicle. :type new_owner: str :param new_owner: The GovId of the new owner of the vehicle. :raises RuntimeError: If unable to verify primary owner. """ primary_owner = find_primary_owner_for_vehicle(driver, vin) if primary_owner is None or primary_owner['GovId'] != current_owner: raise RuntimeError('Incorrect primary owner identified for vehicle, unable to transfer.') document_ids = driver.execute_lambda(lambda executor: get_document_ids(executor, Constants.PERSON_TABLE_NAME, 'GovId', new_owner)) update_vehicle_registration(driver, vin, document_ids[0]) def main(ledger_name=Constants.LEDGER_NAME): """ Find primary owner for a particular vehicle's VIN. Transfer to another primary owner for a particular vehicle's VIN. """ vehicle_vin = SampleData.VEHICLE[0]['VIN'] previous_owner = SampleData.PERSON[0]['GovId'] new_owner = SampleData.PERSON[1]['GovId'] try: with create_qldb_driver(ledger_name) as driver: validate_and_update_registration(driver, vehicle_vin, previous_owner, new_owner) except Exception as e: logger.exception('Error updating VehicleRegistration.') raise e if __name__ == '__main__': main()
    2.x
    # Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # # This code expects that you have AWS credentials setup per: # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html from logging import basicConfig, getLogger, INFO from pyqldbsamples.add_secondary_owner import get_document_ids, print_result, SampleData from pyqldbsamples.constants import Constants from pyqldbsamples.model.sample_data import convert_object_to_ion from pyqldbsamples.connect_to_ledger import create_qldb_session logger = getLogger(__name__) basicConfig(level=INFO) def find_person_from_document_id(transaction_executor, document_id): """ Query a driver's information using the given ID. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type document_id: :py:class:`amazon.ion.simple_types.IonPyText` :param document_id: The document ID required to query for the person. :rtype: :py:class:`amazon.ion.simple_types.IonPyDict` :return: The resulting document from the query. """ query = 'SELECT p.* FROM Person AS p BY pid WHERE pid = ?' cursor = transaction_executor.execute_statement(query, document_id) return next(cursor) def find_primary_owner_for_vehicle(transaction_executor, vin): """ Find the primary owner of a vehicle given its VIN. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type vin: str :param vin: The VIN to find primary owner for. :rtype: :py:class:`amazon.ion.simple_types.IonPyDict` :return: The resulting document from the query. """ logger.info('Finding primary owner for vehicle with VIN: {}.'.format(vin)) query = "SELECT Owners.PrimaryOwner.PersonId FROM VehicleRegistration AS v WHERE v.VIN = ?" cursor = transaction_executor.execute_statement(query, convert_object_to_ion(vin)) try: return find_person_from_document_id(transaction_executor, next(cursor).get('PersonId')) except StopIteration: logger.error('No primary owner registered for this vehicle.') return None def update_vehicle_registration(transaction_executor, vin, document_id): """ Update the primary owner for a vehicle using the given VIN. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type vin: str :param vin: The VIN for the vehicle to operate on. :type document_id: :py:class:`amazon.ion.simple_types.IonPyText` :param document_id: New PersonId for the primary owner. :raises RuntimeError: If no vehicle registration was found using the given document ID and VIN. """ logger.info('Updating the primary owner for vehicle with Vin: {}...'.format(vin)) statement = "UPDATE VehicleRegistration AS r SET r.Owners.PrimaryOwner.PersonId = ? WHERE r.VIN = ?" cursor = transaction_executor.execute_statement(statement, document_id, convert_object_to_ion(vin)) try: print_result(cursor) logger.info('Successfully transferred vehicle with VIN: {} to new owner.'.format(vin)) except StopIteration: raise RuntimeError('Unable to transfer vehicle, could not find registration.') def validate_and_update_registration(transaction_executor, vin, current_owner, new_owner): """ Validate the current owner of the given vehicle and transfer its ownership to a new owner in a single transaction. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type vin: str :param vin: The VIN of the vehicle to transfer ownership of. :type current_owner: str :param current_owner: The GovId of the current owner of the vehicle. :type new_owner: str :param new_owner: The GovId of the new owner of the vehicle. :raises RuntimeError: If unable to verify primary owner. """ primary_owner = find_primary_owner_for_vehicle(transaction_executor, vin) if primary_owner is None or primary_owner['GovId'] != current_owner: raise RuntimeError('Incorrect primary owner identified for vehicle, unable to transfer.') document_id = next(get_document_ids(transaction_executor, Constants.PERSON_TABLE_NAME, 'GovId', new_owner)) update_vehicle_registration(transaction_executor, vin, document_id) if __name__ == '__main__': """ Find primary owner for a particular vehicle's VIN. Transfer to another primary owner for a particular vehicle's VIN. """ vehicle_vin = SampleData.VEHICLE[0]['VIN'] previous_owner = SampleData.PERSON[0]['GovId'] new_owner = SampleData.PERSON[1]['GovId'] try: with create_qldb_session() as session: session.execute_lambda(lambda executor: validate_and_update_registration(executor, vehicle_vin, previous_owner, new_owner), retry_indicator=lambda retry_attempt: logger.info('Retrying due to OCC conflict...')) except Exception: logger.exception('Error updating VehicleRegistration.')
  2. To run the program, enter the following command.

    python transfer_vehicle_ownership.py
  3. Use the following program (add_secondary_owner.py) to add a secondary owner to the vehicle with VIN KM8SRDHF6EU074761 in your ledger.

    3.x
    # Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # # This code expects that you have AWS credentials setup per: # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html from logging import basicConfig, getLogger, INFO from pyqldbsamples.model.sample_data import to_ion_struct, get_document_ids, print_result, SampleData, \ convert_object_to_ion from pyqldbsamples.constants import Constants from pyqldbsamples.connect_to_ledger import create_qldb_driver logger = getLogger(__name__) basicConfig(level=INFO) def get_document_id_by_gov_id(driver, government_id): """ Find a driver's person ID using the given government ID. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type government_id: str :param government_id: A driver's government ID. :rtype: list :return: A list of document IDs. """ logger.info("Finding secondary owner's person ID using given government ID: {}.".format(government_id)) return driver.execute_lambda(lambda executor: get_document_ids(executor, Constants.PERSON_TABLE_NAME, 'GovId', government_id)) def is_secondary_owner_for_vehicle(driver, vin, secondary_owner_id): """ Check whether a secondary owner has already been registered for the given VIN. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: VIN of the vehicle to query. :type secondary_owner_id: str :param secondary_owner_id: The secondary owner's person ID. :rtype: bool :return: If the driver has already been registered. """ logger.info('Finding secondary owners for vehicle with VIN: {}...'.format(vin)) query = 'SELECT Owners.SecondaryOwners FROM VehicleRegistration AS v WHERE v.VIN = ?' rows = driver.execute_lambda(lambda executor: executor.execute_statement(query, convert_object_to_ion(vin))) for row in rows: secondary_owners = row.get('SecondaryOwners') person_ids = map(lambda owner: owner.get('PersonId').text, secondary_owners) if secondary_owner_id in person_ids: return True return False def add_secondary_owner_for_vin(driver, vin, parameter): """ Add a secondary owner into `VehicleRegistration` table for a particular VIN. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: VIN of the vehicle to add a secondary owner for. :type parameter: :py:class:`amazon.ion.simple_types.IonPyValue` :param parameter: The Ion value or Python native type that is convertible to Ion for filling in parameters of the statement. """ logger.info('Inserting secondary owner for vehicle with VIN: {}...'.format(vin)) statement = "FROM VehicleRegistration AS v WHERE v.VIN = ? INSERT INTO v.Owners.SecondaryOwners VALUE ?" cursor = driver.execute_lambda(lambda executor: executor.execute_statement(statement, convert_object_to_ion(vin), parameter)) logger.info('VehicleRegistration Document IDs which had secondary owners added: ') print_result(cursor) def register_secondary_owner(driver, vin, gov_id): """ Register a secondary owner for a vehicle if they are not already registered. :type driver: :py:class:`pyqldb.driver.qldb_driver.QldbDriver` :param driver: An instance of the QldbDriver class. :type vin: str :param vin: VIN of the vehicle to register a secondary owner for. :type gov_id: str :param gov_id: The government ID of the owner. """ logger.info('Finding the secondary owners for vehicle with VIN: {}.'.format(vin)) document_ids = get_document_id_by_gov_id(driver, gov_id) for document_id in document_ids: if is_secondary_owner_for_vehicle(driver, vin, document_id): logger.info('Person with ID {} has already been added as a secondary owner of this vehicle.'.format(gov_id)) else: add_secondary_owner_for_vin(driver, vin, to_ion_struct('PersonId', document_id)) def main(ledger_name=Constants.LEDGER_NAME): """ Finds and adds secondary owners for a vehicle. """ vin = SampleData.VEHICLE[1]['VIN'] gov_id = SampleData.PERSON[0]['GovId'] try: with create_qldb_driver(ledger_name) as driver: register_secondary_owner(driver, vin, gov_id) logger.info('Secondary owners successfully updated.') except Exception as e: logger.exception('Error adding secondary owner.') raise e if __name__ == '__main__': main()
    2.x
    # Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # # This code expects that you have AWS credentials setup per: # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html from logging import basicConfig, getLogger, INFO from pyqldbsamples.model.sample_data import to_ion_struct, get_document_ids, print_result, SampleData, \ convert_object_to_ion from pyqldbsamples.constants import Constants from pyqldbsamples.connect_to_ledger import create_qldb_session logger = getLogger(__name__) basicConfig(level=INFO) def get_document_id_by_gov_id(transaction_executor, government_id): """ Find a driver's person ID using the given government ID. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type government_id: str :param government_id: A driver's government ID. :rtype: list :return: A list of document IDs. """ logger.info("Finding secondary owner's person ID using given government ID: {}.".format(government_id)) return get_document_ids(transaction_executor, Constants.PERSON_TABLE_NAME, 'GovId', government_id) def is_secondary_owner_for_vehicle(transaction_executor, vin, secondary_owner_id): """ Check whether a secondary owner has already been registered for the given VIN. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type vin: str :param vin: VIN of the vehicle to query. :type secondary_owner_id: str :param secondary_owner_id: The secondary owner's person ID. :rtype: bool :return: If the driver has already been registered. """ logger.info('Finding secondary owners for vehicle with VIN: {}...'.format(vin)) query = 'SELECT Owners.SecondaryOwners FROM VehicleRegistration AS v WHERE v.VIN = ?' rows = transaction_executor.execute_statement(query, convert_object_to_ion(vin)) for row in rows: secondary_owners = row.get('SecondaryOwners') person_ids = map(lambda owner: owner.get('PersonId').text, secondary_owners) if secondary_owner_id in person_ids: return True return False def add_secondary_owner_for_vin(transaction_executor, vin, parameter): """ Add a secondary owner into `VehicleRegistration` table for a particular VIN. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type vin: str :param vin: VIN of the vehicle to add a secondary owner for. :type parameter: :py:class:`amazon.ion.simple_types.IonPyValue` :param parameter: The Ion value or Python native type that is convertible to Ion for filling in parameters of the statement. """ logger.info('Inserting secondary owner for vehicle with VIN: {}...'.format(vin)) statement = "FROM VehicleRegistration AS v WHERE v.VIN = '{}' INSERT INTO v.Owners.SecondaryOwners VALUE ?"\ .format(vin) cursor = transaction_executor.execute_statement(statement, parameter) logger.info('VehicleRegistration Document IDs which had secondary owners added: ') print_result(cursor) def register_secondary_owner(transaction_executor, vin, gov_id): """ Register a secondary owner for a vehicle if they are not already registered. :type transaction_executor: :py:class:`pyqldb.execution.executor.Executor` :param transaction_executor: An Executor object allowing for execution of statements within a transaction. :type vin: str :param vin: VIN of the vehicle to register a secondary owner for. :type gov_id: str :param gov_id: The government ID of the owner. """ logger.info('Finding the secondary owners for vehicle with VIN: {}.'.format(vin)) document_ids = get_document_id_by_gov_id(transaction_executor, gov_id) for document_id in document_ids: if is_secondary_owner_for_vehicle(transaction_executor, vin, document_id): logger.info('Person with ID {} has already been added as a secondary owner of this vehicle.'.format(gov_id)) else: add_secondary_owner_for_vin(transaction_executor, vin, to_ion_struct('PersonId', document_id)) if __name__ == '__main__': """ Finds and adds secondary owners for a vehicle. """ vin = SampleData.VEHICLE[1]['VIN'] gov_id = SampleData.PERSON[0]['GovId'] try: with create_qldb_session() as session: session.execute_lambda(lambda executor: register_secondary_owner(executor, vin, gov_id), lambda retry_attempt: logger.info('Retrying due to OCC conflict...')) logger.info('Secondary owners successfully updated.') except Exception: logger.exception('Error adding secondary owner.')
  4. To run the program, enter the following command.

    python add_secondary_owner.py

To review these changes in the vehicle-registration ledger, see Step 6: View the revision history for a document.