Troubleshooting Amazon Kinesis Streams Consumers
The following sections offer solutions to some common problems you may find while working with Amazon Kinesis Streams consumers.
- Some Streams Records are Skipped When Using the Kinesis Client Library
- Records Belonging to the Same Shard are Processed by Different Record Processors at the Same Time
- Consumer Application is Reading at a Slower Rate Than Expected
- GetRecords Returns Empty Records Array Even When There is Data in the Stream
- Shard Iterator Expires Unexpectedly
- Consumer Record Processing Falling Behind
Some Streams Records are Skipped When Using the Kinesis Client Library
The most common cause of skipped records is an unhandled exception thrown from
processRecords. The Amazon Kinesis Client Library (KCL) relies on your
processRecords code to handle any exceptions that arise from processing
the data records. Any exception thrown from
processRecords is absorbed by
the KCL. To avoid infinite retries on a recurring failure, the KCL does not resend
the batch of records processed at the time of the exception. The KCL then calls
processRecords for the next batch of data records without restarting
the record processor. This effectively results in consumer applications observing
skipped records. To prevent skipped records, handle all exceptions within
Records Belonging to the Same Shard are Processed by Different Record Processors at the Same Time
For any running Amazon Kinesis Client Library (KCL) application, a shard only has one owner. However, multiple record processors may temporarily process the same shard. In the case of a worker instance that loses network connectivity, the KCL assumes that the unreachable worker is no longer processing records, after the failover time expires, and directs other worker instances to take over. For a brief period, new record processors and record processors from the unreachable worker may process data from the same shard.
You should set a failover time that is appropriate for your application. For low-latency applications, the 10-second default may represent the maximum time you want to wait. However, in cases where you expect connectivity issues such as making calls across geographical areas where connectivity could be lost more frequently, this number may be too low.
Your application should anticipate and handle this scenario, especially because network connectivity is usually restored to the previously unreachable worker. If a record processor has its shards taken by another record processor, it must handle the following two cases to perform graceful shutdown:
After the current call to
processRecordsis completed, the KCL invokes the shutdown method on the record processor with shutdown reason 'ZOMBIE'. Your record processors are expected to clean up any resources as appropriate and then exit.
When you attempt to checkpoint from a 'zombie' worker, the KCL throws
ShutdownException. After receiving this exception, your code is expected to exit the current method cleanly.
For more information, see Handling Duplicate Records.
Consumer Application is Reading at a Slower Rate Than Expected
The most common reasons for read throughput being slower than expected are as follows:
Multiple consumer applications have total reads exceeding the per-shard limits. For more information, see Amazon Kinesis Streams Limits. In this case, increase the number of shards in the Amazon Kinesis stream.
The limit that specifies the maximum number of GetRecords per call may have been configured with a low value. If you are using the KCL, you may have configured the worker with a low value for the
maxRecordsproperty. In general, we recommend using the system defaults for this property.
The logic inside your
processRecordscall may be taking longer than expected for a number of possible reasons; the logic may be CPU intensive, I/O blocking, or bottlenecked on synchronization. To test if this is true, test run empty record processors and compare the read throughput. For information about how to keep up with the incoming data, see Resharding, Scaling, and Parallel Processing.
If you have only one consumer application, it is always possible to read at least two
times faster than the put rate. That’s because you can write up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). Each
open shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second. Note that each read (GetRecords
call) gets a batch of records. The size of the data returned by GetRecords
varies depending on the utilization of the shard. The maximum size of data that
GetRecords can return is 10 MB. If a call returns that
limit, subsequent calls made within the next 5 seconds throw
GetRecords Returns Empty Records Array Even When There is Data in the Stream
Consuming, or getting records is a pull model. Developers are expected to call GetRecords in a continuous loop with
no back-offs. Every call to GetRecords also returns a
ShardIterator value, which must be used in the next iteration of the
The GetRecords operation does not block. Instead, it returns immediately;
with either relevant data records or with an empty
Records element. An
Records element is returned under two conditions:
There is no more data currently in the shard.
There is no data near the part of the shard pointed to by the
The latter condition is subtle, but is a necessary design tradeoff to avoid unbounded seek time (latency) when retrieving records. Thus, the stream-consuming application should loop and call GetRecords, handling empty records as a matter of course.
In a production scenario, the only time the continuous loop should be exited is when
NextShardIterator value is
NULL, it means that the current shard
has been closed and the
ShardIteratorvalue would otherwise point past the
last record. If the consuming application never calls SplitShard or
MergeShards, the shard remains open and the calls to
GetRecords never return a
NextShardIterator value that is
If you use the Amazon Kinesis Client Library (KCL), the above consumption pattern is abstracted for you. This includes automatic handling of a set of shards that dynamically change. With the KCL, the developer only supplies the logic to process incoming records. This is possible because the library makes continuous calls to GetRecords for you.
Shard Iterator Expires Unexpectedly
A new shard iterator is returned by every GetRecords request (as
NextShardIterator), which you then use in the next
GetRecords request (as
this shard iterator does not expire before you use it. However, you may
find that shard iterators expire because you have not called GetRecords for
more than 5 minutes, or because you've performed a restart of your consumer application.
If the shard iterator expires immediately, before you can use it, this might indicate that the DynamoDB table used by Amazon Kinesis does not have enough capacity to store the lease data. This situation is more likely to happen if you have a large number of shards. To solve this problem, increase the write capacity assigned to the shard table. For more information, see Tracking Amazon Kinesis Streams Application State.
Consumer Record Processing Falling Behind
For most use cases, consumer applications are reading the latest data from the stream. In certain circumstances, consumer reads may fall behind, which may not be desired. After you identify how far behind your consumers are reading, look at the most common reasons why consumers fall behind.
Start with the
GetRecords.IteratorAgeMilliseconds metric, which tracks
the read position across all shards and consumers in the stream. Note that if an
iterator's age passes 50% of the retention period (by default 24 hours, configurable up
to 7 days), there is risk for data loss due to record expiration. A quick stopgap
solution is to increase the retention period. This stops the loss of important data
while you troubleshoot the issue further. For more information, see Monitoring the Amazon Kinesis Streams Service with
Next, identify how far behind your consumer application is reading from each shard using
a custom CloudWatch metric emitted by the Amazon Kinesis Client Library (KCL),
MillisBehindLatest. For more information, see Monitoring the Amazon Kinesis Client Library with
Here are the most common reasons consumers can fall behind:
Sudden large increases to
MillisBehindLatestusually indicate a transient problem, such as API operation failures to a downstream application. You should investigate these sudden increases if either of the metrics consistently display this behavior.
A gradual increase to these metrics indicates that a consumer is not keeping up with the stream because it is not processing records fast enough. The most common root causes for this behavior are insufficient physical resources or record processing logic that has not scaled with an increase in stream throughput. You can verify this behavior by looking at the other custom CloudWatch metrics that the KCL emits associated with the
If you see an increase in the
processRecords.Timemetric that correlates with increased throughput, you should analyze your record processing logic to identify why it is not scaling with the increased throughput.
If you see an increase to the
processRecords.Timevalues that are not correlated with increased throughput, check to see if you are making any blocking calls in the critical path, which are often the cause of slowdowns in record processing. An alternative approach is to increase your parallelism by increasing the number of shards. Finally, confirm you have an adequate amount of physical resources (memory, CPU utilization, etc.) on the underlying processing nodes during peak demand.