實作消費者去彙總 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

實作消費者去彙總

從 1.4.0 版開始,KCL支援KPL使用者記錄的自動解彙總。使用舊版的編寫的消費者應用程式程式碼KCL會在您更新KCL. 但是,如果在生產者端使用KPL聚合,則有一個涉及檢查點的微妙之處:聚合記錄中的所有子記錄都具有相同的序列號,因此如果您需要區分子記錄,則必須將其他數據與檢查點一起存儲。這些額外的資料稱為子序號

從舊版的移轉 KCL

您無須更改任何現有的呼叫即可搭配彙整執行檢查點作業。保證您仍能成功擷取存放於 Kinesis Data Streams 的所有記錄。KCL現在提供兩個新的檢查點作業來支援特定使用案例,如下所述。

如果您的現有程式碼是針對KCL先前KPL支援而撰寫的,且呼叫檢查點作業時不帶引數,則相當於檢查指向批次中最後一個KPL使用者記錄的序號。如果使用序號字串呼叫檢查點操作,則等同於對批次的指定序號及隱含的子序號 0 (零) 執行檢查點作業。

checkpoint()不使用任何引數呼叫新的KCL檢查點作業在語意上等同於檢查點批次中最後一個呼Record叫的序列號,以及隱含的子序列號 0 (零)。

調用新的KCL檢查點操作checkpoint(Record record)在語義上等同於檢查點給定Record的序列號以及隱式子序列號 0(零)。若 Record 呼叫實際為 UserRecord,則會對 UserRecord 序號和子序號執行檢查點作業。

調用新的KCL檢查點操作checkpoint(String sequenceNumber, long subSequenceNumber)明確檢查點與給定的子序列號一起給定的序列號。

在上述任何情況下,在檢查點存放在 Amazon DynamoDB 檢查點表格後,即使應用程式損毀並重新啟動,仍KCL可正確恢復擷取記錄。如果序列中包含多筆記錄,則會從序號最近執行過檢查點作業的記錄中為下一個子序號的記錄開始擷取。若最近的檢查點包括前一序號記錄的最新子序號,將從下一個序號的記錄開始擷取。

下一節將討論需要避免略過記錄和重複記錄的消費者對序號與子序號執行檢查點作業的詳細資訊。若停止並重新啟動消費者的記錄處理會略過 (或重複) 記錄無關緊要,您就可以執行現有的程式碼而無須修改。

使用KCL擴充功能解KPL除彙總

KPL去聚合可以涉及子序列檢查點。為了方便使用子序列檢查點,一個UserRecord類已被添加到:KCL

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

現已使用此類別代替 Record。這不會破壞現有的程式碼,因為其為 Record 的子類別。UserRecord 類別同時代表實際的子記錄和未彙整的標準記錄。未彙整的記錄可想像成恰有一筆子記錄的已彙整記錄。

此外,IRecordProcessorCheckpointer 也增加了兩項新的操作:

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

若要開始使用子序號檢查點作業,您可進行以下轉換。更改以下形式的程式碼:

checkpointer.checkpoint(record.getSequenceNumber());

新形式的程式碼:

checkpointer.checkpoint(record);

建議您使用 checkpoint(Record record) 形式執行子序列檢查點作業。不過,若您已將 sequenceNumbers 存放在字串中用於檢查點作業,則現在亦應存放 subSequenceNumber,如以下範例所示:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Record到的轉換UserRecord總是成功,因為實現總是使用UserRecord。除非需要對序號進行算術運算,否則這種方式並不建議。

在處理KPL使用者記錄時,會將子序號KCL寫入 Amazon DynamoDB,做為每個資料列的額外欄位。繼續檢查點時KCL用AFTER_SEQUENCE_NUMBER來擷取記錄的舊版本。目前KCL的KPL支持使用AT_SEQUENCE_NUMBER代替。在擷取已對序號執行過檢查點作業的記錄時,會檢查已執行檢查點作業的子序號,且將視需要刪除子記錄 (若已對最後一筆子記錄執行檢查點作業,則可能全部刪除)。同樣地,未彙整的記錄可想像成只有一筆子記錄的已彙整記錄,所以同一套演算法對已彙整和未彙整的記錄都適用。

GetRecords直接使用

您也可以選擇不使用,KCL而是GetRecords直接呼叫API作業來擷取 Kinesis Data Streams 記錄。若要將這些擷取的記錄解壓縮至您的原始KPL使用者記錄,請在中呼叫下列其中UserRecord.java一個靜態作業:

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

第一項操作使用 0 的預設值 startingHashKey (零) 以及 2^128 -1 的預設值 endingHashKey

這些作業都會將指定的 Kinesis Data Streams 記錄清單解彙總到使用者記錄清單中KPL。KPL任何明確雜湊索引鍵或分割區索引鍵落在 startingHashKey (含) 和 (含) 範圍之外的endingHashKey使用者記錄,都會從傳回的記錄清單中捨棄。