Amazon Kinesis
Developer Guide (API Version 2013-12-02)
« PreviousNext »
View the PDF for this guide.Go to the AWS Discussion Forum for this product.Did this page help you?  Yes | No |  Tell us about it...

Developing Record Consumer Applications

This section covers how to develop an Amazon Kinesis application using the Amazon Kinesis Client Library. The application uses the IRecordProcessor interface to process data received from an Amazon Kinesis stream. Although you can use the Amazon Kinesis service API to get data from an Amazon Kinesis stream, we recommend using the design patterns and code for Amazon Kinesis applications provided here.

For illustration purposes, this topic describes the Amazon Kinesis application sample included with the SDK. (The sample code is also available on GitHub). The sample application is instrumented using Apache Commons Logging. You can change the logging configuration in the static configure() method defined in the file SampleAmazon KinesisApplication.java. For more information about how to use Apache Commons Logging with Log4j and AWS Java applications, go to Logging with Log4j in the AWS Java Developer Guide.

The IRecordProcessor Interface and the Amazon Kinesis Client Library

The Amazon Kinesis Client Library acts as an intermediary between your client application, which processes Amazon Kinesis stream data, and the Amazon Kinesis service itself. The Amazon Kinesis Client Library uses the IRecordProcessor interface to communicate with your application. Your application implements this interface, and the Amazon Kinesis Client Library calls into your application code using the methods in this interface. For example, the Amazon Kinesis Client Library uses the Amazon Kinesis service API to get data from an Amazon Kinesis stream and then passes this data to the record processors for your application using the processRecords() method of IRecordProcessor.

Client Applications, Workers, and Streams

Each Amazon Kinesis application has a unique name and operates on one specific stream. The application name must be unique because it is used to create an Amazon DynamoDB table that maintains state for the application. Note that your account will be charged for the costs associated with this Amazon DynamoDB table in addition to the costs associated with Amazon Kinesis itself.

At startup, the application calls into the Amazon Kinesis Client Library to instantiate a worker. This call provides the Amazon Kinesis Client Library with configuration information for the application, such as the stream name and AWS credentials. This call also passes a reference to an IRecordProcessorFactory implementation. The Amazon Kinesis Client Library uses this factory to create new record processor instances as needed to process data from the stream. The Amazon Kinesis Client Library communicates with these instances using the IRecordProcessor interface.

The Amazon Kinesis Client Library manages many low-level details for you. However, if you are monitoring KCL activity, you may notice that the KCL makes multiple calls before returning data. This behavior is by design and does not indicate a problem with the KCL or your data. For additional information, see Using GetRecords().

Each record processor instance processes exactly one shard from the stream. Because streams comprise multiple shards, the worker instantiates multiple record processors to process the stream as a whole. To scale out your capacity to handle a stream that has a large data volume, you could create multiple instances of your application. These could run on a single computer or on multiple computers. We recommend that you run your application instances across a set of Amazon EC2 instances that are part of an Auto Scaling group. This enables you to automatically instantiate additional instances if the processing demands of the stream increase.

Implementation of an IRecordProcessor Application

To develop a Amazon Kinesis application, you implement the following three components.

  • The IRecordProcessor interface

  • A factory for the class that implements the IRecordProcessor interface

  • Code that initializes the application and creates the worker

IRecordProcessor Interface

The IRecordProcessor interface exposes the following methods. Your client application must implement all of these methods.

public void initialize(String shardId) 
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

The sample provides implementations that you can use as a starting point for your own application. The sample implements these methods in the class SampleRecordProcessor in the file SampleRecordProcessor.java.

initialize

  public void initialize(String shardId) 

The Amazon Kinesis Client Library calls the initialize() method when the record processor is instantiated. As a parameter to initialize(), the Amazon Kinesis Client Library passes the ID of a specific shard from the stream. This record processor processes only this shard. And in general, this shard is processed only by this record processor. However, in cases where the processing of the shard is transitioning from one worker to another, you could have brief periods where data records from the shard are being processed by multiple workers associated with the same application. Such transitions occur during shard split and merge operations, and if a worker appears to have failed and its shard needs to be transferred to a different worker. Therefore, your application should account for the possibility that any given data record might be processed more than once. Note, however, that Amazon Kinesis has "at least once" semantics, meaning that every data record from a shard will be processed at least once by a worker in your Amazon Kinesis application.

processRecords

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 

The Amazon Kinesis Client Library calls the processRecords() method with a list of data records from the shard specified by initialize(shardId). The record processor processes the data in these records according to the semantics of the application. For example, the worker might perform a transformation on the data and then store the result in an Amazon S3 bucket.

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the Amazon S3 bucket in which to store the data based on the value of the partition key. The Record class exposes the following methods that provide access to the record's data, sequence number, and partition key.

record.getData()
record.getSequenceNumber() 
record.getPartitionKey()

In the sample, the private method processRecordsWithRetries() has code that shows how a worker could access the record's data, sequence number, and partition key.

The Amazon Kinesis Client Library also passes an IRecordProcessorCheckpointer interface to processRecords(). The record processor calls the checkpoint() method on this interface to inform the Amazon Kinesis Client Library of how far it has progressed in processing the records in the shard. In the event that the worker fails, the Amazon Kinesis Client Library uses this information to restart the processing of the shard at the last known processed record. In the case of a split or merge operation, the Amazon Kinesis Client Library won't start processing the new shard(s) until the processors for the original shards have called checkpoint() to signal that all processing on the original shards is complete.

The record processor should call checkpoint() only after it completes processing all the records in the list of records it was passed. The Amazon Kinesis Client Library assumes that the call to checkpoint() means that all records up to the last record that it passed to the record processor have been processed. However, this does not mean that the record processor must call checkpoint() on each call to processRecords(). It could, for example, call checkpoint() on every third call to processRecords(). What it must not do is call checkpoint() in the midst of processing a list of records.

In the sample, the private method checkpoint() shows how to call the IRecordProcessorCheckpointer.checkpoint() method using appropriate exception handling and retry logic.

The Amazon Kinesis Client Library relies on processRecords() to handle any exceptions that arise from processing the data records. If an exception is thrown out of processRecords(), the Amazon Kinesis Client Library skips over the data records that were passed to processRecords() prior to the exception; that is, these records are not re-sent to the record processor that threw the exception or to any other record processor in the application.

shutdown

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

The Amazon Kinesis Client Library calls the shutdown() method under the following circumstances:

This record processor will not receive any further records from the shard. This could be because of a shard split or shard merge operation that involved this shard. Or it could be because the stream itself is being deleted and all data for this shard of the stream has been processed. In these cases, the ShutdownReason parameter of the shutdown() method has a value of TERMINATE.

The worker appears to be unresponsive to the Amazon Kinesis Client Library. In this case, the ShutdownReason parameter has a value of ZOMBIE.

The Amazon Kinesis Client Library also passes an IRecordProcessorCheckpointer interface to shutdown(). If the ShutdownReason parameter is TERMINATE, the record processor should finish processing any data records and then call the checkpoint() method on this interface.

Implementation of Class Factory for IRecordProcessor

You also need to implement a factory for the class that implements the IRecordProcessor interface methods. When your application instantiates the worker, it passes a reference to this factory.

The sample implements the factory class SampleRecordProcessorFactory in the file SampleRecordProcessorFactory.java.

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
    /**
     * Constructor.
     */
    public SampleRecordProcessorFactory() {
        super();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public IRecordProcessor createProcessor() {
        return new SampleRecordProcessor();
    }
}

Initializing the IRecordProcessor Application

The initialization code for the sample application is available is the file SampleAmazon KinesisApplication.java.

The sample initialization code provides default values for the application name, the stream name that the application should process, and the HTTPS endpoint for the Amazon Kinesis service. You can override any of these with your own values, which you would specify in a Java properties file. To use the properties file, specify it on the command line when you invoke the sample application.

Application Name

The application name — specified in the code and passed to the Amazon Kinesis Client Library — is significant. All workers associated with this application name are assumed to be working together on the same stream. If you run an additional instance of the same application code, but with a different application name, the Amazon Kinesis Client Library treats the second instance as an entirely separate application also operating on the same stream.

The Amazon Kinesis Client Library creates a DynamoDB table with this name and uses it to maintain state information—such as checkpoints—for the application. Each application has its own DynamoDB table. Because the application name is used to name the table, you should pick an application name that doesn't conflict with any existing DynamoDB tables in the same account and region. For more information about how the Amazon Kinesis Client Library uses Amazon DynamoDB see Application State Is Managed in Amazon DynamoDB.

Worker ID

The sample initialization code creates an ID for the worker using the name of the local computer and appending a globally unique identifier.

          String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        

This approach supports the scenario where you have multiple instances of the application running on a single computer.

Credentials

If you are running the application on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through the Instance Meta Data Service (IMDS). This is the most secure way to manage credentials for a Amazon Kinesis application running on an Amazon EC2 instance. The sample application first attempts to retrieve these IAM credentials from the Amazon EC2 Instance Metadata Service (IMDS).

credentialsProvider = new InstanceProfileCredentialsProvider();
If the sample application cannot obtain credentials from the IMDS, it attempts to retrieve credentials from a credentials property file.
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

The sample consolidates the configuration data for the worker in a KinesisClientLibConfiguration object. This object and a reference to the class factory for IRecordProcessor are passed in the call that instantiates the worker.

Application State Is Managed in Amazon DynamoDB

The current version of the library code for the Amazon Kinesis application uses a unique Amazon DynamoDB table for each application to keep track of the application's state. The library uses the name of the application to formulate the name of the table for the application. Therefore, you should ensure that two applications do not share the same name. You can view the table using the Amazon DynamoDB console while the application is running.

Note that your account is charged for the costs associated with the DynamoDB table in addition to the costs associated with Amazon Kinesis itself.

If the application receives provisioned-throughput exceptions, you should increase the provisioned throughput for the table. The library creates the table with a provisioned throughput of 10 reads per second and 10 writes per second, but this might not be sufficient for your application. Factors that could contribute to needing more provisioned throughput are frequent checkpointing and operating on a stream that is composed of many shards. For information about provisioned throughput in DynamoDB, see Provisioned Throughput in Amazon DynamoDB and Working with Tables in the Amazon DynamoDB Developer Guide.

Each row in the DynamoDB table represents a shard that is being processed by your application. The hash key for the table is the shard ID.

At the time that the Amazon Kinesis application starts up, if the Amazon DynamoDB table for the application does not exist, one of the workers will create the table and call the describeStream() method to populate the table with rows for the shards in the stream.

In addition to the shard ID, each row also includes:

  • The most recent checkpoint sequence number for the shard. This value is unique across all shards in the stream.

  • The worker ID of the worker that is processing the shard. Remember that a single worker ID may be associated with multiple record processors and therefore multiple shards. As a result, the same worker ID may appear in multiple rows in the table.

  • A "heartbeat" count which the record processor for the shard periodically increments—say, every 30 seconds—to indicate that the processor is still actively processing records from the shard. The workers associated with the instances of the application periodically scan these values to ensure that all shards continue to be processed. If the heartbeat count does not increase within a configurable timeout period, other workers will take over processing of that shard.

Resharding

Resharding is a process by which you can increase or decrease the number of shards in your stream to adapt to changes in the rate of data flowing through the stream. Resharding is typically performed by an administrative application that monitors metrics that measure how much data is being handled by each of the shards that constitute the stream. Although the Amazon Kinesis Client Library itself doesn't initiate resharding operations, it is designed to adapt to changes in the number of shards that result from resharding.

As mentioned earlier, the Amazon Kinesis Client Library tracks the shards in the stream using an Amazon DynamoDB table. When new shards are created as a result of resharding, the Amazon Kinesis Client Library discovers the new shards and populates new rows in the table. The workers automatically discover the new shards and create processors to handle the data from them. The Amazon Kinesis Client Library is designed in such a way that it also distributes the shards in the stream across all the available workers and record processors.

In the event that resharding increases the number of shards in the stream, the increase in the number of record processors will increase the load on the Amazon EC2 instances that are hosting them. If the Amazon EC2 instances are part of an autoscaling group, and the load increases sufficiently, the autoscaling group will add more instances to handle the increased load. You should configure your Amazon EC2 instances so that at start up, they launch your application so that additional workers and record processors become active on the new instance.

The Amazon Kinesis Client Library ensures that any data that existed in shards prior to the resharding is processed first by record processors and only then is data from the new shards sent to record processors. This way, the order in which data records were added to the stream for a particular partition key is preserved when the records are processed.

For more detailed information about resharding, see Resharding a Stream.

Failover and Recovery

Failure can occur at one or more levels when you use a Amazon Kinesis application to process data from an Amazon Kinesis stream.

  • A record processor instance could fail

  • A worker could fail or the instance of the Amazon Kinesis application that instantiated the worker could fail

  • An Amazon EC2 instance, which is hosting an instance (or instances) of the application, could fail

Record Processor

The worker invokes RecordProcessor APIs, such as processRecords using Java ExecutorService tasks. If a task fails, the worker retains control of the shard that the record processor was processing. The worker will start a new record processor task to process that shard.

Worker or Application

If a worker—or an instance of the Amazon Kinesis application itself—fails, you should detect and handle the situation. For example, if the Worker.run() method throws an exception, you should catch and handle it.

If the application itself fails, you should detect this and restart it. When the application starts up, it instantiates a new worker, which in turn instantiates new record processors which are automatically assigned shards to process. These could be the same shards that these record processors where processing before the failure or shards that are new to these processors.

If the worker or application fails, but is not detected, and there are other instances of the application running on other Amazon EC2 instances, the workers on these instances will create additional record processors to process the shards that are no longer being processed by the failed worker. However, that will increase the load on these other Amazon EC2 instances. The scenario described here assumes that although the worker or application has failed, the hosting Amazon EC2 instance is still running and is therefore not restarted by an Auto Scaling group.

Amazon EC2 Instance

We recommend that you run the Amazon EC2 instances for your application in an Auto Scaling group. This way, if one of the Amazon EC2 instances which are hosting your application fails, the Auto Scaling group automatically launches a new instance to replace it. You should configure the Amazon EC2 instances that you use for your application so that they launch the application at start up.

Additional Considerations

Here are some additional considerations that you should incorporate into the design of your Amazon Kinesis application.

Start-Up Coordination between the Data Producer and IRecordProcessor Application

The Amazon Kinesis Client Library defaults to a configuration where it begins reading records from the tip of the stream, that is, the most recently added record. This means that if a data-producing application adds records to the stream before any receiving record processors are running, these records would not be read by the record processors after they start up. To change the behavior of the record processors to always read data from the beginning of the stream, set the following value in the properties file for your application.

initialPositionInStream = TRIM_HORIZON

Important

The Amazon Kinesis service keeps records for no more than 24 hours. Even with this configuration setting, if a record processor were to start more than 24 hours after a data-producing application began adding records to the stream, some of the records would not be available to the record processor.

Note that there might be some scenarios in which it is okay for record processors not to receive the first few records in the stream. For example, you might run some initial records through the stream to test that the stream is working end-to-end as expected. After doing this initial verification, you would then start your workers and begin to put production data into the stream.

Application Shutdown

When your application has completed its intended task, you should shut it down by terminating the Amazon EC2 instances on which it is running. You could terminate the instances using the AWS Management Console, the AWS Command Line Interface or some other method.

After you have shutdown the application, delete the Amazon DynamoDB table that the Amazon Kinesis Client Library used to track the application's state.

Read Throttling

The throughput of a stream is provisioned at the shard level. Each shard has a read throughput of 2MB/sec at 5 transactions per second. If an application (or group of applications operating on the same stream) attempts to get data from a shard at a faster rate, the service will throttle the corresponding get operations. In a Java application, this throttling typically appears as an exception. In a Amazon Kinesis application, if a record processor is processing data faster than the 2MB/sec limit—such as in the case of a failover—throttling will occur. Because the Amazon Kinesis Client Library manages the interactions between the application and the Amazon Kinesis service, throttling exceptions occur in the Amazon Kinesis Client Library code rather than in the application code. However, because the Amazon Kinesis Client Library logs these exceptions, you can see whether they occur by checking the logs. If you find that your application is throttled consistently, you should consider provisioning a higher number of shards for the stream.