Developing with streams in QLDB - Amazon Quantum Ledger Database (Amazon QLDB)

Developing with streams in QLDB

This section summarizes the API operations that you can use with an AWS SDK or the AWS CLI to create and manage journal streams in Amazon QLDB. It also describes the sample applications that demonstrate these operations and use the Kinesis Client Library (KCL) or AWS Lambda to implement a stream consumer.

The KCL enables you to build consumer applications and simplifies coding by providing useful abstractions above the low-level Kinesis Data Streams API. To learn more about the KCL, see Developing consumers using the Kinesis Client Library in the Amazon Kinesis Data Streams Developer Guide.

QLDB journal stream APIs

The QLDB API provides the following journal stream operations for use by application programs:

  • StreamJournalToKinesis – Creates a journal stream for a given QLDB ledger. The stream captures every document revision that is committed to the ledger's journal and delivers the data to a specified Kinesis Data Streams resource.

    • Record aggregation in Kinesis Data Streams is enabled by default. This option lets QLDB publish multiple data records in a single Kinesis Data Streams record, increasing the number of records sent per API call.

      Record aggregation has important implications for processing records and requires de-aggregation in your stream consumer. To learn more, see KPL key concepts and Consumer de-aggregation in the Amazon Kinesis Data Streams Developer Guide.

  • DescribeJournalKinesisStream – Returns detailed information about a given QLDB journal stream. The output includes the ARN, stream name, current status, creation time, and the parameters of your original stream creation request.

  • ListJournalKinesisStreamsForLedger – Returns a list of all QLDB journal stream descriptors for a given ledger. The output of each stream descriptor includes the same details that are returned by DescribeJournalKinesisStream.

  • CancelJournalKinesisStream – Ends a given QLDB journal stream. Before a stream can be canceled, its current status must be ACTIVE.

    You can't restart a stream after you cancel it. To resume delivery of your data to Kinesis Data Streams, you can create a new QLDB stream.

For complete descriptions of these API operations, see the Amazon QLDB API reference.

For information about creating and managing journal streams using the AWS CLI, see the AWS CLI Command Reference.

Sample applications

QLDB provides sample applications that demonstrate various operations using journal streams. These applications are open sourced on the AWS Samples GitHub site.

Basic operations (Java)

For a Java code example that demonstrates basic operations for QLDB journal streams, see the GitHub repository aws-samples/amazon-qldb-dmv-sample-java. For instructions on how to download and install this sample application, see Installing the Amazon QLDB Java sample application.

Note

After you install the application, don't proceed to Step 1 of the Java tutorial to create a ledger. This sample application for streaming creates the vehicle-registration ledger for you.

This sample application packages the complete source code from the Java tutorial and its dependencies, including the following modules:

  • AWS SDK for Java – To create and delete both the QLDB and Kinesis Data Streams resources, including ledgers, QLDB journal streams, and Kinesis data streams.

  • Amazon QLDB driver for Java – To run data transactions on a ledger using PartiQL statements, including creating tables and inserting documents.

  • Kinesis Client Library – To consume and process data from a Kinesis data stream.

Running the code

The StreamJournal class contains tutorial code that demonstrates the following operations:

  1. Create a ledger named vehicle-registration, create tables, and load them with sample data.

    Note

    Before running this code, make sure that you don't already have an active ledger named vehicle-registration.

  2. Create a Kinesis data stream, an IAM role that enables QLDB to assume write permissions for the Kinesis data stream, and a QLDB journal stream.

  3. Use the KCL to start a stream reader that processes the Kinesis data stream and logs each QLDB data record.

  4. Use the stream data to validate the hash chain of the vehicle-registration sample ledger.

  5. Clean up all resources by stopping the stream reader, canceling the QLDB journal stream, deleting the ledger, and deleting the Kinesis data stream.

To run the StreamJournal tutorial code, enter the following Gradle command from your project root directory.

./gradlew run -Dtutorial=streams.StreamJournal

Integration with OpenSearch Service (Python)

For a Python sample application that demonstrates how to integrate a QLDB stream with Amazon OpenSearch Service, see the GitHub repository aws-samples/amazon-qldb-streaming-amazon-opensearch-service-sample-python. This application uses an AWS Lambda function to implement a Kinesis Data Streams consumer.

To clone the repository, enter the following git command.

git clone https://github.com/aws-samples/amazon-qldb-streaming-amazon-opensearch-service-sample-python.git

To run the sample application, see the README on GitHub for instructions.

Integration with Amazon SNS and Amazon SQS (Python)

For a Python sample application that demonstrates how to integrate a QLDB stream with Amazon Simple Notification Service (Amazon SNS), see the GitHub repository aws-samples/amazon-qldb-streams-dmv-sample-lambda-python.

This application uses an AWS Lambda function to implement a Kinesis Data Streams consumer. It sends messages to an Amazon SNS topic, which has an Amazon Simple Queue Service (Amazon SQS) queue subscribed to it.

To clone the repository, enter the following git command.

git clone https://github.com/aws-samples/amazon-qldb-streams-dmv-sample-lambda-python.git

To run the sample application, see the README on GitHub for instructions.