Kinesis examples using SDK for Java 2.x - AWS SDK for Java

You can now use the Amazon S3 Transfer Manager (Developer Preview) in the AWS SDK for Java 2.x for accelerated file transfers. Give it a try and let us know what you think! By the way, the AWS SDK for Java team is hiring software development engineers!

Kinesis examples using SDK for Java 2.x

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Java 2.x with Kinesis.

Actions are code excerpts that show you how to call individual Kinesis functions.

Scenarios are code examples that show you how to accomplish a specific task by calling multiple Kinesis functions.

Each example includes a link to GitHub, where you can find instructions on how to set up and run the code in context.

Topics

Actions

The following code example shows how to create a Kinesis stream.

SDK for Java 2.x

public static void createStream(KinesisClient kinesisClient, String streamName) { try { CreateStreamRequest streamReq = CreateStreamRequest.builder() .streamName(streamName) .shardCount(1) .build(); kinesisClient.createStream(streamReq); } catch (KinesisException e) { System.err.println(e.getMessage()); System.exit(1); } }
  • Find instructions and more code on GitHub.

  • For API details, see CreateStream in AWS SDK for Java 2.x API Reference.

The following code example shows how to delete a Kinesis stream.

SDK for Java 2.x

public static void deleteStream(KinesisClient kinesisClient, String streamName) { try { DeleteStreamRequest delStream = DeleteStreamRequest.builder() .streamName(streamName) .build(); kinesisClient.deleteStream(delStream); } catch (KinesisException e) { System.err.println(e.getMessage()); System.exit(1); } }
  • Find instructions and more code on GitHub.

  • For API details, see DeleteStream in AWS SDK for Java 2.x API Reference.

The following code example shows how to get data in batches from a Kinesis stream.

SDK for Java 2.x

public static void getStockTrades(KinesisClient kinesisClient, String streamName) { String shardIterator; String lastShardId = null; // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); List<Shard> shards = new ArrayList<>(); DescribeStreamResponse streamRes; do { streamRes = kinesisClient.describeStream(describeStreamRequest); shards.addAll(streamRes.streamDescription().shards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).shardId(); } } while (streamRes.streamDescription().hasMoreShards()); GetShardIteratorRequest itReq = GetShardIteratorRequest.builder() .streamName(streamName) .shardIteratorType("TRIM_HORIZON") .shardId(lastShardId) .build(); GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq); shardIterator = shardIteratorResult.shardIterator(); // Continuously read data records from shard. List<Record> records; // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest recordsRequest = GetRecordsRequest.builder() .shardIterator(shardIterator) .limit(1000) .build(); GetRecordsResponse result = kinesisClient.getRecords(recordsRequest); // Put result into record list. Result may be empty. records = result.records(); // Print records for (Record record : records) { SdkBytes byteBuffer = record.data(); System.out.println(String.format("Seq No: %s - %s", record.sequenceNumber(), new String(byteBuffer.asByteArray()))); } }

The following code example shows how to put data into a Kinesis stream.

SDK for Java 2.x

public static void setStockData( KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example int index = 50; for (int x=0; x<index; x++){ StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { e.getMessage(); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if(!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } }catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } }
  • Find instructions and more code on GitHub.

  • For API details, see PutRecord in AWS SDK for Java 2.x API Reference.