本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon Kinesis Data API Streams 开发制作人 AWS SDK for Java
你可以使用 Amazon Kinesis Data API Streams 和 Java 版开发制作 AWS SDK器。如果是首次使用 Kinesis Data Streams,请先熟悉 什么是 Amazon Kinesis Data Streams? 和 使用 AWS CLI 来执行 Amazon Kinesis Data Streams 操作 中介绍的概念和术语。
这些示例讨论了 Kinesis Dat a Streams,并使用AWS SDK适用于
本章中的 Java 示例代码演示了如何执行 Kinesis Data API Streams 的基本操作,并按操作类型进行了逻辑划分。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,你可以使用其他编程语言调用 Kinesis Data API Streams。有关所有可用内容的更多信息 AWS SDKs,请参阅开始使用 Amazon Web Services 进行开发
每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅 创建和管理 Kinesis 数据流。
向直播中添加数据
在创建流之后,您可以记录的形式向其中添加数据。记录是一种数据结构,其中包含要处理的数据(采用数据 Blob 形式)。在将数据存储到记录中之后,Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。
Kinesis Data Streams 中有两种不同的操作可以向API流中添加数据 PutRecords
,PutRecord
和。该PutRecords
操作会HTTP根据请求向您的直播发送多条记录,而单一PutRecord
操作一次向您的直播发送一条记录(每条记录都需要单独的HTTP请求)。对于大多数应用程序,您应会更喜欢使用 PutRecords
,因为这将使每个数据创建者实现更高的吞吐量。有关每种操作的更多信息,请参阅以下各小节。
请记住,当您的源应用程序使用 Kinesis Data Streams 向API流中添加数据时,很可能有一个或多个使用者应用程序同时处理流外的数据。有关消费者如何使用 Kinesis Data API Streams 获取数据的信息,从直播中获取数据请参阅。
重要
使用添加多个 pecords PutRecords
PutRecords
操作可在一个请求中向 Kinesis Data Streams 发送多条记录。向 Kinesis 数据流发送数据时,创建器可以使用 PutRecords
实现更高的吞吐量。每个 PutRecords
请求最多可以支持 500 条记录。请求中的每一个记录最大可以为 1 MB,整个请求的上限为 5 MB,包括分区键。与下面描述的单个 PutRecord
操作一样,PutRecords
将使用序列号和分区键。但是,PutRecord
参数 SequenceNumberForOrdering
未包含在 PutRecords
调用中。PutRecords
操作将尝试按请求的自然顺序处理所有记录。
每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecords
向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecords
请求之间的时间段越长,序列号变得越大。
注意
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。
PutRecords
请求可包含具有不同分区键的记录。请求的应用范围是一个流;每个请求可包含分区键和记录的任何组合,直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。
PutRecords示例
以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 DataStream
的流中。
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
PutRecords
响应包含响应 Records
的数组。响应数组中的每个记录按自然顺序(从请求和响应的顶部到底部)直接与请求数组中的一个记录关联。响应 Records
数组包含的记录数量始终与请求数组相同。
使用时处理故障 PutRecords
默认情况下,请求内的单个记录的失败不会中止对 PutRecords
请求中后续记录的处理。这意味着,响应 Records
数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。
成功的记录包括 SequenceNumber
和 ShardID
值,而不成功的记录包含 ErrorCode
和 ErrorMessage
值。ErrorCode
参数反映了错误类型,可能为下列值之一:ProvisionedThroughputExceededException
或 InternalFailure
。ErrorMessage
提供有关 ProvisionedThroughputExceededException
异常的更多详细信息,包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 PutRecords
请求中有三个记录。第二个记录失败并反映在响应中。
例 PutRecords 请求语法
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
例 PutRecords 响应语法
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
处理不成功的请求可包含在后续 PutRecords
请求中。首先,查看 putRecordsResult
中的 FailedRecordCount
参数以确认请求中是否存在失败的记录。如果存在,则应将具有 putRecordsEntry
(不是 ErrorCode
)的每个 null
添加到后续请求中。有关此类处理程序的示例,请参阅以下代码。
例 PutRecords 失败处理程序
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
使用添加单条记录 PutRecord
对 PutRecord
的每次调用对一个记录起作用。应首选 PutRecords
中描述的 使用添加多个 pecords PutRecords 操作,除非您的应用程序明确需要每一请求始终发送一条记录,或因某种其他原因无法使用 PutRecords
。
每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecord
向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecord
请求之间的时间段越长,序列号变得越大。
在快速连续进行放置时,不保证返回的序列号会递增,因为放置操作的发生对于 Kinesis Data Streams 实际上是同步进行的。要确保相同分区键的序列号严格递增,请使用 SequenceNumberForOrdering
参数,如 PutRecord示例 代码示例中所示。
无论您是否使用 SequenceNumberForOrdering
,Kinesis Data Streams 通过 GetRecords
调用接收的记录都将按序列号严格进行排序。
注意
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。
分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说,Kinesis Data Streams 使用分区键作为将分区键(和关联数据)映射到特定分片的哈希函数的输入。
作为此哈希机制的结果,具有相同分区键的所有数据记录将映射到流中的同一分片。但是,如果分区键的数量超出分片的数量,则一些分片必定会包含具有不同分区键的记录。从设计的角度看,要确保您的所有分片得到充分利用,分片的数量(由 setShardCount
的 CreateStreamRequest
方法指定)应远少于唯一分区键的数量,并且流至单一分区键的数据量应远少于分片容量。
PutRecord示例
以下代码创建跨两个分区键分配的 10 条数据记录,并将它们放入名为 myStreamName
的流中。
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
上一个代码示例使用 setSequenceNumberForOrdering
来确保每个分区键内的顺序严格递增。要有效使用此参数,请将当前记录(记录 n)的 SequenceNumberForOrdering
设置为前一条记录(记录 n-1)的序列号。要获取已添加到流的记录的序列号,请对 putRecord
的结果调用 getSequenceNumber
。
SequenceNumberForOrdering
参数可确保相同分区键的序列号严格递增。SequenceNumberForOrdering
不提供跨多个分区键的记录排序。
使用 AWS Glue 架构注册表与数据交互
您可以将 Kinesis 数据流与 AWS Glue 架构注册表集成。 AWS Glue
Schema Registry 允许您集中发现、控制和演变架构,同时确保生成的数据由注册架构持续验证。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。 AWS Glue Schema Registr end-to-end y 允许您改善流媒体应用程序中的数据质量和数据治理。有关更多信息,请参阅 AWS Glue Schema Registry。设置这种集成的方法之一是通过 Java 中 AWS
提供的PutRecords
和 PutRecord
Kinesis Data APIs Streams。SDK
有关如何使用 PutRecords PutRecord 和 Kinesis Data Streams 设置 Kinesis Data Streams 与架构注册表集成的详细说明,请参阅 “用例:将 Amazon Kinesis APIs 数据流与 Glue 架构注册表集成” 中的 “使用 Kinesis 数据流与数据交互” 部分。APIs AWS