Developing Producers Using the Amazon Kinesis Data Streams API with the AWS SDK for Java - Amazon Kinesis Data Streams

Developing Producers Using the Amazon Kinesis Data Streams API with the AWS SDK for Java

You can develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java. If you are new to Kinesis Data Streams, start by becoming familiar with the concepts and terminology presented in What Is Amazon Kinesis Data Streams? and Getting Started with Amazon Kinesis Data Streams.

These examples discuss the Kinesis Data Streams API and use the AWS SDK for Java to add (put) data to a stream. However, for most use cases, you should prefer the Kinesis Data Streams KPL library. For more information, see Developing Producers Using the Amazon Kinesis Producer Library.

The Java example code in this chapter demonstrates how to perform basic Kinesis Data Streams API operations, and is divided up logically by operation type. These examples do not represent production-ready code, in that they do not check for all possible exceptions, or account for all possible security or performance considerations. Also, you can call the Kinesis Data Streams API using other programming languages. For more information about all available AWS SDKs, see Start Developing with Amazon Web Services.

Each task has prerequisites; for example, you cannot add data to a stream until you have created a stream, which requires you to create a client . For more information, see Creating and Managing Streams.

Adding Data to a Stream

Once a stream is created, you can add data to it in the form of records. A record is a data structure that contains the data to be processed in the form of a data blob. After you store the data in the record, Kinesis Data Streams does not inspect, interpret, or change the data in any way. Each record also has an associated sequence number and partition key.

There are two different operations in the Kinesis Data Streams API that add data to a stream, PutRecords and PutRecord. The PutRecords operation sends multiple records to your stream per HTTP request, and the singular PutRecord operation sends records to your stream one at a time (a separate HTTP request is required for each record). You should prefer using PutRecords for most applications because it will achieve higher throughput per data producer. For more information about each of these operations, see the separate subsections below.

Always keep in mind that, as your source application is adding data to the stream using the Kinesis Data Streams API, there are most likely one or more consumer applications that are simultaneously processing data off the stream. For information about how consumers get data using the Kinesis Data Streams API, see Getting Data from a Stream.

Adding Multiple Records with PutRecords

The PutRecords operation sends multiple records to Kinesis Data Streams in a single request. By using PutRecords, producers can achieve higher throughput when sending data to their Kinesis data stream. Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys. As with the single PutRecord operation described below, PutRecords uses sequence numbers and partition keys. However, the PutRecord parameter SequenceNumberForOrdering is not included in a PutRecords call. The PutRecords operation attempts to process all records in the natural order of the request.

Each data record has a unique sequence number. The sequence number is assigned by Kinesis Data Streams after you call client.putRecords to add the data records to the stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between PutRecords requests, the larger the sequence numbers become.

Note

Sequence numbers cannot be used as indexes to sets of data within the same stream. To logically separate sets of data, use partition keys or create a separate stream for each data set.

A PutRecords request can include records with different partition keys. The scope of the request is a stream; each request may include any combination of partition keys and records up to the request limits. Requests made with many different partition keys to streams with many different shards are generally faster than requests with a small number of partition keys to a small number of shards. The number of partition keys should be much larger than the number of shards to reduce latency and maximize throughput.

PutRecords Example

The following code creates 100 data records with sequential partition keys and puts them in a stream called DataStream.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

The PutRecords response includes an array of response Records. Each record in the response array directly correlates with a record in the request array using natural ordering, from the top to the bottom of the request and response. The response Records array always includes the same number of records as the request array.

Handling Failures When Using PutRecords

By default, failure of individual records within a request does not stop the processing of subsequent records in a PutRecords request. This means that a response Records array includes both successfully and unsuccessfully processed records. You must detect unsuccessfully processed records and include them in a subsequent call.

Successful records include SequenceNumber and ShardID values, and unsuccessful records include ErrorCode and ErrorMessage values. The ErrorCode parameter reflects the type of error and can be one of the following values: ProvisionedThroughputExceededException or InternalFailure. ErrorMessage provides more detailed information about the ProvisionedThroughputExceededException exception including the account ID, stream name, and shard ID of the record that was throttled. The example below has three records in a PutRecords request. The second record fails and is reflected in the response.

Example PutRecords Request Syntax
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
Example PutRecords Response Syntax
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

Records that were unsuccessfully processed can be included in subsequent PutRecords requests. First, check the FailedRecordCount parameter in the putRecordsResult to confirm if there are failed records in the request. If so, each putRecordsEntry that has an ErrorCode that is not null should be added to a subsequent request. For an example of this type of handler, refer to the following code.

Example PutRecords failure handler
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

Adding a Single Record with PutRecord

Each call to PutRecord operates on a single record. Prefer the PutRecords operation described in Adding Multiple Records with PutRecords unless your application specifically needs to always send single records per request, or some other reason PutRecords can't be used.

Each data record has a unique sequence number. The sequence number is assigned by Kinesis Data Streams after you call client.putRecord to add the data record to the stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between PutRecord requests, the larger the sequence numbers become.

When puts occur in quick succession, the returned sequence numbers are not guaranteed to increase because the put operations appear essentially as simultaneous to Kinesis Data Streams. To guarantee strictly increasing sequence numbers for the same partition key, use the SequenceNumberForOrdering parameter, as shown in the PutRecord Example code sample.

Whether or not you use SequenceNumberForOrdering, records that Kinesis Data Streams receives through a GetRecords call are strictly ordered by sequence number.

Note

Sequence numbers cannot be used as indexes to sets of data within the same stream. To logically separate sets of data, use partition keys or create a separate stream for each data set.

A partition key is used to group data within the stream. A data record is assigned to a shard within the stream based on its partition key. Specifically, Kinesis Data Streams uses the partition key as input to a hash function that maps the partition key (and associated data) to a specific shard.

As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream. However, if the number of partition keys exceeds the number of shards, some shards necessarily contain records with different partition keys. From a design standpoint, to ensure that all your shards are well utilized, the number of shards (specified by the setShardCount method of CreateStreamRequest) should be substantially less than the number of unique partition keys, and the amount of data flowing to a single partition key should be substantially less than the capacity of the shard.

PutRecord Example

The following code creates ten data records, distributed across two partition keys, and puts them in a stream called myStreamName.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

The preceding code sample uses setSequenceNumberForOrdering to guarantee strictly increasing ordering within each partition key. To use this parameter effectively, set the SequenceNumberForOrdering of the current record (record n) to the sequence number of the preceding record (record n-1). To get the sequence number of a record that has been added to the stream, call getSequenceNumber on the result of putRecord.

The SequenceNumberForOrdering parameter ensures strictly increasing sequence numbers for the same partition key. SequenceNumberForOrdering does not provide ordering of records across multiple partition keys.

Interacting with Data Using the AWS Glue Schema Registry

You can integrate your Kinesis data streams with the AWS Glue schema registry. The AWS Glue schema registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema Registry enables you to improve end-to-end data quality and data governance within your streaming applications. For more information, see AWS Glue Schema Registry. One of the ways to set up this integration is through the PutRecords and PutRecord Kinesis Data Streams APIs available in the AWS Java SDK.

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the PutRecords and PutRecord Kinesis Data Streams APIs, see the "Interacting with Data Using the Kinesis Data Streams APIs" section in Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry.