实施消费者去聚合 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

实施消费者去聚合

从 1.4.0 版开始,KCL支持自动取消聚合KPL用户记录。更新后,使用先前版本编写的消费者应用程序代码无需任何修改KCL即可编译KCL。但是,如果在生产者端使用KPL聚合,则有一个涉及检查点的微妙之处:聚合记录中的所有子记录都具有相同的序列号,因此,如果需要区分子记录,则必须将其他数据与检查点一起存储。此额外数据称作子序列号

从以前的版本迁移 KCL

您无需更改现有调用来同时执行检查点操作与聚合。仍确保您能够成功检索 Kinesis Data Streams 中存储的所有记录。KCL现在提供了两个新的检查点操作来支持特定的用例,如下所述。

如果您的现有代码是为KCL之前的KPL支持而编写的,并且您的检查点操作是在没有参数的情况下调用的,则等同于对批处理中最后一个KPL用户记录的序列号进行检查点操作。如果调用带序列号字符串的检查点操作,则它等同于对批处理的指定序列号以及隐式子序列号 0(零)进行检查点操作。

在没有任何参数checkpoint()的情况下调用新的KCL检查点操作在语义上等同于对批处理中最后一次Record调用的序列号以及隐式子序列号 0(零)进行检查点操作。

调用新的KCL检查点操作checkpoint(Record record)在语义上等同于对给定序列号和隐式Record子序列号 0(零)进行检查点操作。如果 Record 调用实际为 UserRecord,则对 UserRecord 序列号和子序列号进行检查点操作。

调用新的KCL检查点操作会checkpoint(String sequenceNumber, long subSequenceNumber)显式检查给定的序列号和给定的子序列号。

在上述任何一种情况下,将检查点存储在 Amazon DynamoDB 检查点表中后,即使应用程序崩溃KCL并重新启动,也可以正确地恢复检索记录。如果在序列中包含多条记录,则检索会从具有已进行检查点操作的最新序列号的记录中的下个子序列号记录开始。如果最新检查点包括上一条序列号记录的最新子序列号,则检索会从具有下个序列号的记录开始。

以下部分将讨论需要避免跳过和重复记录的消费端的序列和子序列检查点操作的详细信息。如果在停止并重新启动消费端的记录处理时跳过(或重复)记录并不重要,则可运行您的现有代码而无需进行修改。

使用KCL扩展模块进行KPL解聚合

KPL去聚合可能涉及子序列检查点。为了便于使用子序列检查点,已将一个UserRecord类添加到:KCL

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

现在使用的是此类而不是 Record。这不会破坏现有代码,因为它是 Record 的子类。UserRecord 类同时表示实际子记录和标准非聚合记录。非聚合记录可被视为刚好具有一条子记录的聚合记录。

此外,将两个新的操作添加到 IRecordProcessorCheckpointer

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

要开始使用子序列号检查点操作,您可执行以下转换。更改以下形式代码:

checkpointer.checkpoint(record.getSequenceNumber());

新的形式代码:

checkpointer.checkpoint(record);

建议您使用 checkpoint(Record record) 形式来进行子序列检查点操作。不过,如果您已以字符串形式存储 sequenceNumbers 以用于检查点操作,则您现在也应存储 subSequenceNumber,如以下示例所示:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Record到的转换UserRecord总是成功的,因为实现总是使用UserRecord。除非需要对序列号进行算术运算,否则建议不要采用此方法。

在处理KPL用户记录时,会将子序列号作为每行的额外字段KCL写入 Amazon DynamoDB。的先前版本KCL用于AFTER_SEQUENCE_NUMBER在恢复检查点时获取记录。当前KPL支持KCL者AT_SEQUENCE_NUMBER改用。在检索带已进行检查点操作的序列号的记录时,会检查已进行检查点操作的子序列号,而且会根据需要删除子记录(如果上一条子记录为已进行检查点操作的记录,则可能删除所有子记录)。同样,非聚合记录可被视为具有单条子记录的聚合记录,因此相同的算法同时适用于聚合和非聚合记录。

GetRecords直接使用

您也可以选择不使用KCL而是GetRecords直接调用该API操作来检索 Kinesis Data Streams 记录。要将这些检索到的记录解压缩到原始KPL用户记录中,请在中UserRecord.java调用以下静态操作之一:

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

第一个操作使用 0 的默认值 startingHashKey(零)和 2^128 -1 的默认值 endingHashKey

这些操作中的每一个都将给定的 Kinesis Data Streams 记录列表解聚为用户记录列表。KPL任何显式哈希键或分区键超出startingHashKey(含)和(含)范围的KPL用户记录都将endingHashKey从返回的记录列表中丢弃。