コンシューマーの集約解除 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

コンシューマーの集約解除

リリース 1.4.0 から、KCL は KPL ユーザーレコードの自動集計解除をサポートしています。以前のバージョンの KCL で書かれたコンシューマーアプリケーションのコードは、KCL を更新した後、コードを何も修正せずにコンパイルできます。ただし、プロデューサー側で KPL の集約を使用している場合、チェックポイントが多少関係してきます。集約されたレコード内のすべてのサブレコードは同じシーケンス番号を持っているため、サブレコード間の区別が必要な場合、チェックポイントを使用して追加のデータを保存する必要があります。この追加データは、サブシーケンス番号と呼ばれます。

以前のバージョンの KCL からの移行

集約とともにチェックポイントを作成する既存の呼び出しを変更する必要はありません。Kinesis Data Streams に保存されているすべてのレコードを正しく取得できることが保証されています。以下で説明する特定のユースケースをサポートするために、現在 KCL には、2 つの新しいチェックポイントオペレーションが用意されています。

既存のコードが KPL サポート以前の KCL 用に書かれていて、チェックポイントオペレーションが引数なしで呼び出される場合、そのコードの動作は、バッチ内にある最後の KPL ユーザーレコードのシーケンス番号に対するチェックポイントの作成と同等です。シーケンス番号文字列を使用してチェックポイントオペレーションを呼び出す場合は、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチの指定されたシーケンス番号に対するチェックポイントの作成と同等です。

新しい KCL チェックポイント操作の呼び出しcheckpoint()のシーケンス番号をチェックポイントすることと意味的に等価です。引数なしでRecord呼び出しと、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチ内の呼び出しです。

新しい KCL チェックポイント操作の呼び出しcheckpoint(Record record)は、指定したRecordのシーケンス番号と暗黙的なサブシーケンス番号 0 (ゼロ) を伴う。Record 呼び出しが実際には UserRecord である場合、UserRecord のシーケンス番号とサブシーケンス番号にチェックポイントが作成されます。

新しい KCL チェックポイント操作の呼び出しcheckpoint(String sequenceNumber, long subSequenceNumber)は、指定されたシーケンス番号とサブシーケンス番号に明示的にチェックポイントを作成します。

いずれの場合も、チェックポイントが Amazon DynamoDB チェックポイントテーブルに保存された後は、アプリケーションがクラッシュして再起動した場合、KCL はレコードの取得を正しく再開できます。さらにレコードがシーケンス内に含まれている場合は、最後にチェックポイントが作成されたシーケンス番号が付けられているレコード内の次のサブシーケンス番号のレコードから取得が開始されます。前のシーケンス番号のレコードにある最後のサブシーケンス番号が、最新のチェックポイントに含まれている場合、その次のシーケンス番号が付けられているレコードから取得が開始されます。

次のセクションでは、レコードのスキップや重複を避けるために必要な、コンシューマーのシーケンスとサブシーケンスのチェックポイントの詳細について説明します。コンシューマーのレコード処理を停止し再起動するときに、レコードのスキップや重複が重要でない場合は、変更せずに既存のコードを実行してかまいません。

KPL の集約解除のための KCL 拡張

すでに説明したように、KPL の集約解除ではサブシーケンスチェックポイントを使用できます。サブシーケンスチェックポイントを使いやすくするために、UserRecordクラスが KCL に追加されました。

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

このクラスは、現在 Record の代わりに使用されています。これは Record のサブクラスであるため、既存のコードは影響を受けません。UserRecord クラスは、実際のサブレコードと通常の集約されていないレコードの両方を表します。集約されていないレコードは、サブレコードを 1 つだけ含む集約されたレコードと考えることができます。

さらに、2 つの新しいオペレーションが IRecordProcessorCheckpointer に追加されています。

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

サブシーケンス番号チェックポイントの使用を開始するには、次の変更を行います。次のフォームコードを変更します。

checkpointer.checkpoint(record.getSequenceNumber());

新しいフォームコードは次のようになります。

checkpointer.checkpoint(record);

サブシーケンスチェックポイントでは、checkpoint(Record record) フォームを使用することをお勧めします。ただし、チェックポイントの作成で使用する文字列にすでに sequenceNumbers を保存している場合は、次の例に示すように、subSequenceNumber も保存する必要があります。

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

この実装では内部で Record を必ず使用するため、UserRecord から UserRecord へのキャストは必ず成功します。シーケンス番号の計算を実行する必要がない場合、この方法はお勧めしません。

KPL ユーザーレコードの処理中に、KCL は、サブシーケンス番号を Amazon DynamoDB に各行の追加フィールドとして書き込みます。以前の KCL の使用AFTER_SEQUENCE_NUMBERでは、チェックポイントを再開するときにレコードを取得できます。KPL サポートを使用して現在の KCL を使用します。AT_SEQUENCE_NUMBER代わりにです。チェックポイントが作成されたシーケンス番号のレコードを取得するとき、チェックポイントが作成されたサブシーケンス番号がチェックされ、サブレコードが必要に応じて削除されます (最後のサブレコードにチェックポイントが作成されている場合、すべてのサブレコードが削除されます)。ここでも、集約されていないレコードは、単一のサブレコードを含む集約されたレコードと考えることができ、集約されたレコードと集約されていないレコードの両方で同じアルゴリズムを使用できます。

GetRecords の直接的な使用

KCL を使用せずに API オペレーションを呼び出すこともできますGetRecordsを直接使用して、Kinesis Data Streams レコードを取得します。これらの取得したレコードを元の KPL ユーザーレコードに解凍するには、次の静的なオペレーションの 1 つを呼び出します。UserRecord.java:

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

最初のオペレーションでは、startingHashKey のデフォルト値 0 (ゼロ) と endingHashKey のデフォルト値 2^128 -1 を使用します。

これらの各オペレーションは、Kinesis Data Streams レコードのリストを KPL ユーザーレコードのリストに集約解除します。明示的なハッシュキーまたはパーティションキーの範囲外にある任意の KPL ユーザーレコードstartingHashKey(包括的) およびendingHashKey(包括的) は、返されたレコードのリストから破棄されます。