使用 Java 2.x SDK 的 Kinesis 示例 - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Java 2.x SDK 的 Kinesis 示例

下列程式碼範例說明如何使用 Kinesis 來執行動作和實作常見案例。 AWS SDK for Java 2.x

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會告訴您如何呼叫個別服務函數,但您可以在其相關情境和跨服務範例中查看內容中的動作。

Scenarios (案例) 是向您展示如何呼叫相同服務中的多個函數來完成特定任務的程式碼範例。

每個範例都包含一個連結 GitHub,您可以在其中找到如何在內容中設定和執行程式碼的指示。

動作

下列程式碼範例會示範如何使用CreateStream

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class CreateDataStream { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream (for example, StockTradeStream). """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); createStream(kinesisClient, streamName); System.out.println("Done"); kinesisClient.close(); } public static void createStream(KinesisClient kinesisClient, String streamName) { try { CreateStreamRequest streamReq = CreateStreamRequest.builder() .streamName(streamName) .shardCount(1) .build(); kinesisClient.createStream(streamReq); } catch (KinesisException e) { System.err.println(e.getMessage()); System.exit(1); } } }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考CreateStream中的。

下列程式碼範例會示範如何使用DeleteStream

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class DeleteDataStream { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); deleteStream(kinesisClient, streamName); kinesisClient.close(); System.out.println("Done"); } public static void deleteStream(KinesisClient kinesisClient, String streamName) { try { DeleteStreamRequest delStream = DeleteStreamRequest.builder() .streamName(streamName) .build(); kinesisClient.deleteStream(delStream); } catch (KinesisException e) { System.err.println(e.getMessage()); System.exit(1); } } }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考DeleteStream中的。

下列程式碼範例會示範如何使用GetRecords

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import java.util.ArrayList; import java.util.List; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class GetRecords { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to read from (for example, StockTradeStream). """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); getStockTrades(kinesisClient, streamName); kinesisClient.close(); } public static void getStockTrades(KinesisClient kinesisClient, String streamName) { String shardIterator; String lastShardId = null; DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); List<Shard> shards = new ArrayList<>(); DescribeStreamResponse streamRes; do { streamRes = kinesisClient.describeStream(describeStreamRequest); shards.addAll(streamRes.streamDescription().shards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).shardId(); } } while (streamRes.streamDescription().hasMoreShards()); GetShardIteratorRequest itReq = GetShardIteratorRequest.builder() .streamName(streamName) .shardIteratorType("TRIM_HORIZON") .shardId(lastShardId) .build(); GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq); shardIterator = shardIteratorResult.shardIterator(); // Continuously read data records from shard. List<Record> records; // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest recordsRequest = GetRecordsRequest.builder() .shardIterator(shardIterator) .limit(1000) .build(); GetRecordsResponse result = kinesisClient.getRecords(recordsRequest); // Put result into record list. Result may be empty. records = result.records(); // Print records for (Record record : records) { SdkBytes byteBuffer = record.data(); System.out.printf("Seq No: %s - %s%n", record.sequenceNumber(), new String(byteBuffer.asByteArray())); } } }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考GetRecords中的。

下列程式碼範例會示範如何使用PutRecord

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考PutRecord中的。

無伺服器範例

下列程式碼範例示範如何實作 Lambda 函數,該函數會接收從 Kinesis 串流接收記錄而觸發的事件。此函數會擷取 Kinesis 承載、從 Base64 解碼,並記錄記錄內容。

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 搭配 Lambda 來使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }

下列程式碼範例顯示如何針對接收來自 Kinesis 串流之事件的 Lambda 函數實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

透過使用 Java 的 Lambda 報告 Kinesis 批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }