Using the Kinesis Client Library - Amazon Kinesis Data Streams

Using the Kinesis Client Library

One of the methods of developing custom consumer applications that can process data from KDS data streams is to use the Kinesis Client Library (KCL).

What is the Kinesis Client Library?

KCL helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing. These include load balancing across multiple consumer application instances, responding to consumer application instance failures, checkpointing processed records, and reacting to resharding. The KCL takes care of all of these subtasks so that you can focus your efforts on writing your custom record-processing logic.

The KCL is different from the Kinesis Data Streams APIs that are available in the AWS SDKs. The Kinesis Data Streams APIs help you manage many aspects of Kinesis Data Streams, including creating streams, resharding, and putting and getting records. The KCL provides a layer of abstraction around all these subtasks, specifically so that you can focus on your consumer application’s custom data processing logic. For information about the Kinesis Data Streams API, see the Amazon Kinesis API Reference.

Important

The KCL is a Java library. Support for languages other than Java is provided using a multi-language interface called the MultiLangDaemon. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. For example, if you install the KCL for Python and write your consumer application entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings that you might need to customize for your use case, for example, the AWS region that it connects to. For more information about the MultiLangDaemon on GitHub, see KCL MultiLangDaemon project.

The KCL acts as an intermediary between your record processing logic and Kinesis Data Streams. The KCL performs the following tasks:

  • Connects to the data stream

  • Enumerates the shards within the data stream

  • Uses leases to coordinates shard associations with its workers

  • Instantiates a record processor for every shard it manages

  • Pulls data records from the data stream

  • Pushes the records to the corresponding record processor

  • Checkpoints processed records

  • Balances shard-worker associations (leases) when the worker instance count changes or when the data stream is resharded (shards are split or merged)

KCL Available Versions

Currently, you can use either of the following supported versions of KCL to build your custom consumer applications:

You can use either KCL 1.x or KCL 2.x to build consumer applications that use shared throughput. For more information, see Developing Custom Consumers with Shared Throughput Using KCL.

To build consumer applications that use dedicated throughput (enhanced fan-out consumers), you can only use KCL 2.x. For more information, see Developing Custom Consumers with Dedicated Throughput (Enhanced Fan-Out).

For information about the differences between KCL 1.x and KCL 2.x, and instructions on how to migrate from KCL 1.x to KCL 2.x, see Migrating Consumers from KCL 1.x to KCL 2.x.

KCL Concepts

  • KCL consumer application – an application that is custom-built using KCL and designed to read and process records from data streams.

  • Consumer application instance - KCL consumer applications are typically distributed, with one or more application instances running simultaneously in order to coordinate on failures and dynamically load balance data record processing.

  • Worker – a high level class that a KCL consumer application instance uses to start processing data.

    Important

    Each KCL consumer application instance has one worker.

    The worker initializes and oversees various tasks, including syncing shard and lease information, tracking shard assignments, and processing data from the shards. A worker provides KCL with the configuration information for the consumer application, such as the name of the data stream whose data records this KCL consumer application is going to process and the AWS credentials that are needed to access this data stream. The worker also kick starts that specific KCL consumer application instance to deliver data records from the data stream to the record processors.

    Important

    In KCL 1.x this class is called Worker. For more information, (these are the Java KCL repositories), see https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. In KCL 2.x, this class is called Scheduler. Scheduler’s purpose in KCL 2.x is identical to Worker’s purpose in KCL 1.x. For more information about the Scheduler class in KCL 2.x, see https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java.

  • Lease – data that defines the binding between a worker and a shard. Distributed KCL consumer applications use leases to partition data record processing across a fleet of workers. At any given time, each shard of data records is bound to a particular worker by a lease identified by the leaseKey variable.

    By default, a worker can hold one or more leases (subject to the value of the maxLeasesForWorker variable) at the same time.

    Important

    Every worker will contend to hold all available leases for all available shards in a data stream. But only one worker will successfully hold each lease at any one time.

    For example, if you have a consumer application instance A with worker A that is processing a data stream with 4 shards, worker A can hold leases to shards 1, 2, 3, and 4 at the same time. But if you have two consumer application instances: A and B with worker A and worker B, and these instances are processing a data stream with 4 shards, worker A and worker B cannot both hold the lease to shard 1 at the same time. One worker holds the lease to a particular shard until it is ready to stop processing this shard’s data records or until it fails. When one worker stops holding the lease, another worker takes up and holds the lease.

    For more information, (these are the Java KCL repositories), see https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java for KCL 1.x and https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java for KCL 2.x.

  • Lease table - a unique Amazon DynamoDB table that is used to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application. The lease table must remain in sync (within a worker and across all workers) with the latest shard information from the data stream while the KCL consumer application is running. For more information, see Using a Lease Table to Track the Shards Processed by the KCL Consumer Application.

  • Record processor – the logic that defines how your KCL consumer application processes the data that it gets from the data streams. At runtime, a KCL consumer application instance instantiates a worker, and this worker instantiates one record processor for every shard to which it holds a lease.

Using a Lease Table to Track the Shards Processed by the KCL Consumer Application

What Is a Lease Table

For each Amazon Kinesis Data Streams application, KCL uses a unique lease table (stored in a Amazon DynamoDB table) to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application.

Important

KCL uses the name of the consumer application to create the name of the lease table that this consumer application uses, therefore each consumer application name must be unique.

You can view the lease table using the Amazon DynamoDB console while the consumer application is running.

If the lease table for your KCL consumer application does not exist when the application starts up, one of the workers creates the lease table for this application.

Important

Your account is charged for the costs associated with the DynamoDB table, in addition to the costs associated with Kinesis Data Streams itself.

Each row in the lease table represents a shard that is being processed by the workers of your consumer application. If your KCL consumer application processes only one data stream, then leaseKey which is the hash key for the lease table is the shard ID. If you are Processing Multiple Data Streams with the same KCL 2.x for Java Consumer Application, then the structure of the leaseKey looks like this: account-id:StreamName:streamCreationTimestamp:ShardId. For example, 111111111:multiStreamTest-1:12345:shardId-000000000336.

In addition to the shard ID, each row also includes the following data:

  • checkpoint: The most recent checkpoint sequence number for the shard. This value is unique across all shards in the data stream.

  • checkpointSubSequenceNumber: When using the Kinesis Producer Library's aggregation feature, this is an extension to checkpoint that tracks individual user records within the Kinesis record.

  • leaseCounter: Used for lease versioning so that workers can detect that their lease has been taken by another worker.

  • leaseKey: A unique identifier for a lease. Each lease is particular to a shard in the data stream and is held by one worker at a time.

  • leaseOwner: The worker that is holding this lease.

  • ownerSwitchesSinceCheckpoint: How many times this lease has changed workers since the last time a checkpoint was written.

  • parentShardId: Used to ensure that the parent shard is fully processed before processing starts on the child shards. This ensures that records are processed in the same order they were put into the stream.

  • hashrange: Used by the PeriodicShardSyncManager to run periodic syncs to find missing shards in the lease table and create leases for them if required.

    Note

    This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3. For more information about PeriodicShardSyncManager and periodic synchronization between leases and shards, see How a Lease Table Is Synchronized with the Shards in a KDS Data Stream.

  • childshards: Used by the LeaseCleanupManager to review the child shard's processing status and decide whether the parent shard can be deleted from the lease table.

    Note

    This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3.

  • shardID: The ID of the shard.

    Note

    This data is only present in the lease table if you are Processing Multiple Data Streams with the same KCL 2.x for Java Consumer Application. This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and beyond.

  • stream name The identifier of the data stream in the following format: account-id:StreamName:streamCreationTimestamp.

    Note

    This data is only present in the lease table if you are Processing Multiple Data Streams with the same KCL 2.x for Java Consumer Application. This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and beyond.

Throughput

If your Amazon Kinesis Data Streams application receives provisioned-throughput exceptions, you should increase the provisioned throughput for the DynamoDB table. The KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second, but this might not be sufficient for your application. For example, if your Amazon Kinesis Data Streams application does frequent checkpointing or operates on a stream that is composed of many shards, you might need more throughput.

For information about provisioned throughput in DynamoDB, see Read/Write Capacity Mode and Working with Tables and Data in the Amazon DynamoDB Developer Guide.

How a Lease Table Is Synchronized with the Shards in a KDS Data Stream

Workers in KCL consumer applications use leases to process shards from a given data stream. The information on what worker is leasing what shard at any given time is stored in a lease table. The lease table must remain in sync with the latest shard information from the data stream while the KCL consumer application is running. KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service during the consumer application bootstraping (either when the consumer application is initialized or restarted) and also whenever a shard that is being processed reaches an end (resharding). In other words, the workers or a KCL consumer application are synchronized with the data stream that they are processing during the initial consumer application bootstrap and whenever the consumer application encounters a data stream reshard event.

Synchronization in KCL 1.0 - 1.13 and KCL 2.0 - 2.2

In KCL 1.0 - 1.13 and KCL 2.0 - 2.2, during consumer application's bootstraping and also during each data stream reshard event, KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service by invoking the ListShards or the DescribeStream discovery APIs. In all the KCL versions listed above, each worker of a KCL consumer application completes the following steps to perform the lease/shard synchronization process during the consumer application's bootstrapping and at each stream reshard event:

  • Fetches all the shards for data the stream that is being processed

  • Fetches all the shard leases from the lease table

  • Filters out each open shard that does not have a lease in the lease table

  • Iterates over all found open shards and for each open shard with no open parent:

    • Traverses the hierarchy tree through its ancestors path to determine if the shard is a descendant. A shard is considered a descendant, if an ancestor shard is being processed (lease entry for ancestor shard exists in the lease table) or if an ancestor shard should be processed (for example, if the initial position is TRIM_HORIZON or AT_TIMESTAMP)

    • If the open shard in context is a descendant, KCL checkpoints the shard based on initial position and creates leases for its parents, if required

Synchronization in KCL 2.x, Starting with KCL 2.3 and Beyond

Starting with the latest supported versions of KCL 2.x (KCL 2.3) and beyond, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application.

  • During application's bootstraping, if the lease table is empty, KCL utilizes the ListShard API's filtering option (the ShardFilter optional request parameter) to retrieve and create leases only for a snapshot of shards open at the time specified by the ShardFilter parameter. The ShardFilter parameter enables you to filter out the response of the ListShards API. The only required property of the ShardFilter parameter is Type. KCL uses the Type filter property and the following of its valid values to identify and return a snapshot of open shards that might require new leases:

    • AT_TRIM_HORIZON - the response includes all the shards that were open at TRIM_HORIZON.

    • AT_LATEST - the response includes only the currently open shards of the data stream.

    • AT_TIMESTAMP - the response includes all shards whose start timestamp is less than or equal to the given timestamp and end timestamp is greater than or equal to the given timestamp or still open.

    ShardFilter is used when creating leases for an empty lease table to initialize leases for a snapshot of shards specified at RetrievalConfig#initialPositionInStreamExtended.

    For more information about ShardFilter, see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Instead of all workers performing the lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard synchronization.

  • KCL 2.3 uses the ChildShards return parameter of the GetRecords and the SubscribeToShard APIs to perform lease/shard synchronization that happens at SHARD_END for closed shards, allowing a KCL worker to only create leases for the child shards of the shard it finished processing. For shared throughout consumer applications, this optimization of the lease/shard synchronization uses the ChildShards parameter of the GetRecords API. For the dedicated throughput (enhanced fan-out) consumer applications, this optimization of the lease/shard synchronization uses the ChildShards parameter of the SubscribeToShard API. For more information, see GetRecords, SubscribeToShards, and ChildShard.

  • With the above changes, the behavior of KCL is moving from the model of all workers learning about all existing shards to the model of workers learning only about the children shards of the shards that each worker owns. Therefore, in addition to the synchronization that happens during consumer application bootstraping and reshard events, KCL now also performs additional periodic shard/lease scans in order to identify any potential holes in the lease table (in other words, to learn about all new shards) to ensure the complete hash range of the data stream is being processed and create leases for them if required. PeriodicShardSyncManager is the component that is responsible for running periodic lease/shard scans.

    For more information about PeriodicShardSyncManager in KCL 2.3, see https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213.

    In KCL 2.3, new configuration options are available to configure PeriodicShardSyncManager in LeaseManagementConfig:

    Name Default value Description
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 minutes)

    Frequency (in millis) of the auditor job to scan for partial leases in the lease table. If the auditor detects any hole in the leases for a stream, then it would trigger shard synchronization based on leasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Confidence threshold for the periodic auditor job to determine if leases for a data stream in the lease table are inconsistent. If the auditor finds same set of inconsistencies consecutively for a data stream for this many times, then it would trigger a shard synchronization.

    New CloudWatch metrics are also now emitted to monitor the health of the PeriodicShardSyncManager. For more information, see PeriodicShardSyncManager.

  • Including an optimization to HierarchicalShardSyncer to only create leases for one layer of shards.

Synchronization in KCL 1.x, Starting with KCL 1.14 and Beyond

Starting with the latest supported versions of KCL 1.x (KCL 1.14) and beyond, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application.

  • During application's bootstraping, if the lease table is empty, KCL utilizes the ListShard API's filtering option (the ShardFilter optional request parameter) to retrieve and create leases only for a snapshot of shards open at the time specified by the ShardFilter parameter. The ShardFilter parameter enables you to filter out the response of the ListShards API. The only required property of the ShardFilter parameter is Type. KCL uses the Type filter property and the following of its valid values to identify and return a snapshot of open shards that might require new leases:

    • AT_TRIM_HORIZON - the response includes all the shards that were open at TRIM_HORIZON.

    • AT_LATEST - the response includes only the currently open shards of the data stream.

    • AT_TIMESTAMP - the response includes all shards whose start timestamp is less than or equal to the given timestamp and end timestamp is greater than or equal to the given timestamp or still open.

    ShardFilter is used when creating leases for an empty lease table to initialize leases for a snapshot of shards specified at KinesisClientLibConfiguration#initialPositionInStreamExtended.

    For more information about ShardFilter, see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Instead of all workers performing the lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard synchronization.

  • KCL 1.14 uses the ChildShards return parameter of the GetRecords and the SubscribeToShard APIs to perform lease/shard synchronization that happens at SHARD_END for closed shards, allowing a KCL worker to only create leases for the child shards of the shard it finished processing. For more information, see GetRecords and ChildShard.

  • With the above changes, the behavior of KCL is moving from the model of all workers learning about all existing shards to the model of workers learning only about the children shards of the shards that each worker owns. Therefore, in addition to the synchronization that happens during consumer application bootstraping and reshard events, KCL now also performs additional periodic shard/lease scans in order to identify any potential holes in the lease table (in other words, to learn about all new shards) to ensure the complete hash range of the data stream is being processed and create leases for them if required. PeriodicShardSyncManager is the component that is responsible for running periodic lease/shard scans.

    When KinesisClientLibConfiguration#shardSyncStrategyType is set to ShardSyncStrategyType.SHARD_END, PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold is used to determine the threshold for number of consecutive scans containing holes in the lease table after which to enforce a shard synchronization. When KinesisClientLibConfiguration#shardSyncStrategyType is set to ShardSyncStrategyType.PERIODIC, leasesRecoveryAuditorInconsistencyConfidenceThreshold is ignored.

    For more information about PeriodicShardSyncManager in KCL 1.14, see https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999.

    In KCL 1.14, new configuration option is available to configure PeriodicShardSyncManager in LeaseManagementConfig:

    Name Default value Description
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Confidence threshold for the periodic auditor job to determine if leases for a data stream in the lease table are inconsistent. If the auditor finds same set of inconsistencies consecutively for a data stream for this many times, then it would trigger a shard synchronization.

    New CloudWatch metrics are also now emitted to monitor the health of the PeriodicShardSyncManager. For more information, see PeriodicShardSyncManager.

  • KCL 1.14 now also supports deferred lease cleanup. Leases are deleted asynchronously by LeaseCleanupManager upon reaching SHARD_END, when a shard has either expired past the data stream’s retention period or been closed as the result of a resharding operation.

    New configuration options are available to configure LeaseCleanupManager.

    Name Default value Description
    leaseCleanupIntervalMillis

    1 minute

    Interval at which to run lease cleanup thread.

    completedLeaseCleanupIntervalMillis 5 minutes

    Interval at which to check if a lease is completed or not.

    garbageLeaseCleanupIntervalMillis 30 minutes

    Interval at which to check if a lease is garbage (i.e trimmed past the data stream's retention period) or not.

  • Including an optimization to KinesisShardSyncer to only create leases for one layer of shards.

Processing Multiple Data Streams with the same KCL 2.x for Java Consumer Application

This section describes the following changes in KCL 2.x for Java that enable you to create KCL consumer applicaitons that can process more than one data stream at the same time.

Important

Multistream processing is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and beyond.

Multistream processing is NOT supported for any other languages in which KCL 2.x can be implemented.

Multistream processing is NOT supported in any versions of KCL 1.x.

  • MultistreamTracker interface

    To build a consumer application that can process multiple streams at the same time, you must implement a new interface called MultistreamTracker. This interface includes the streamConfigList method that returns the list of data streams and their configurations to be processed by the KCL consumer application. Notice that the data streams that are being processed can be changed during the consumer application runtime. streamConfigList is called periodically by the KCL to learn about the changes in data streams to process.

    The streamConfigList method populates the StreamConfig list.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Note that the StreamIdentifier and InitialPositionInStreamExtended are required fields, while consumerArn is optional. You must provide the consumerArn only if you are using KCL 2.x to implement an enhanced fan-out consumer application.

    For more information about StreamIdentifier, see https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L29. You can create a multistream instance for the StreamIdentifier from the serialized stream identifier. The serialized stream identifier should be of the following format: account-id:StreamName:streamCreationTimestamp.

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker also includes a strategy for deleting leases of old streams in the lease table (formerStreamsLeasesDeletionStrategy). Notice that the strategy CANNOT be changed during the consumer application runtime. For more information, see https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java

  • ConfigsBuilder is a an application-wide class that you can use to specify all of the KCL 2.x configuration settings to be used when building your KCL consumer application. ConfigsBuilder class now has support for the MultistreamTracker interface. You can initialize ConfigsBuilder either with the name of the one data stream to consume records from:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    Or you can initialize ConfigsBuilder with MultiStreamTracker if you want to implement a KCL consumer application that processes multiple streams at the same time.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • With multistream support implemented for your KCL consumer application, each row of the application's lease table now contains the shard ID and the stream name of the multiple data streams that this application processes.

  • When multistream support for your KCL consumer application is implemented, the leaseKey takes the following structure: account-id:StreamName:streamCreationTimestamp:ShardId. For example, 111111111:multiStreamTest-1:12345:shardId-000000000336.

    Important

    When your existing KCL consumer application is configured to process only one data stream, the leaseKey (which is the hash key for the lease table) is the shard ID. If you reconfigure this existing KCL consumer application to process multiple data streams, it breaks your lease table, because with multistream support, the leaseKey structure must be as follows: account-id:StreamName:StreamCreationTimestamp:ShardId.

Using the Kinesis Client Library with the AWS Glue Schema Registry

You can integrate your Kinesis data streams with the AWS Glue schema registry. The AWS Glue schema registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema Registry enables you to improve end-to-end data quality and data governance within your streaming applications. For more information, see AWS Glue Schema Registry. One of the ways to set up this integration is through the KCL in Java.

Important

Currently, Kinesis Data Streams and AWS Glue schema registry integration is only supported for the Kinesis data streams that use KCL 2.3 consumers implemented in Java. Multi-language support is not provided. KCL 1.0 consumers are not supported. KCL 2.x consumers prior to KCL 2.3 are not supported.

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the KCL, see the "Interacting with Data Using the KPL/KCL Libraries" section in Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry.