Amazon Kinesis Data Streams
開発者ガイド

コンシューマーの集約解除

KCL は、リリース 1.4.0 から KPL ユーザーレコードの自動集計解除をサポートしています。以前のバージョンの KCL で書かれたコンシューマーアプリケーションのコードは、KCL を更新した後、コードを何も修正せずにコンパイルできます。ただし、プロデューサー側で KPL の集約を使用している場合、チェックポイントが多少関係してきます。集約されたレコード内のすべてのサブレコードは同じシーケンス番号を持っているため、サブレコード間の区別が必要な場合、チェックポイントを使用して追加のデータを保存する必要があります。この追加データは、サブシーケンス番号と呼ばれます。

以前のバージョンの KCL からの移行

集約とともにチェックポイントを作成する既存の呼び出しを変更する必要はありません。Kinesis Data Streams に保存されているすべてのレコードを正しく取得できることが保証されています。以下で説明する特定のユースケースをサポートするために、現在 KCL には、2 つの新しいチェックポイントオペレーションが用意されています。

既存のコードが KPL サポート以前の KCL 用に書かれていて、チェックポイントオペレーションが引数なしで呼び出される場合、そのコードの動作は、バッチ内にある最後の KPL ユーザーレコードのシーケンス番号に対するチェックポイントの作成と同等です。シーケンス番号文字列を使用してチェックポイントオペレーションを呼び出す場合は、暗黙的なサブシーケンス番号 0(ゼロ)を伴う、バッチの指定されたシーケンス番号に対するチェックポイントの作成と同等です。

引数なしで新しい KCL チェックポイントオペレーション checkpoint() を呼び出すことは、暗黙的なサブシーケンス番号 0(ゼロ)を伴う、バッチ内の最後の Record 呼び出しのシーケンス番号に対するチェックポイントの作成と意味的に同等です。

新しい KCL チェックポイントオペレーション checkpoint(Record record) を呼び出すことは、暗黙的なサブシーケンス番号 0(ゼロ)を伴う、指定された Record のシーケンス番号に対するチェックポイントの作成と意味的に同等です。Record 呼び出しが実際には UserRecord である場合、UserRecord のシーケンス番号とサブシーケンス番号にチェックポイントが作成されます。

新しい KCL チェックポイントオペレーション checkpoint(String sequenceNumber, long subSequenceNumber) を呼び出すと、指定されたシーケンス番号とサブシーケンス番号に明示的にチェックポイントが作成されます。

これらのどの場合も、チェックポイントが Amazon DynamoDB チェックポイントテーブルに保存された後は、アプリケーションがクラッシュして再起動した場合、KCL により、レコードの取得が正常に再開されます。さらにレコードがシーケンス内に含まれている場合は、最後にチェックポイントが作成されたシーケンス番号が付けられているレコード内の次のサブシーケンス番号のレコードから取得が開始されます。前のシーケンス番号のレコードにある最後のサブシーケンス番号が、最新のチェックポイントに含まれている場合、その次のシーケンス番号が付けられているレコードから取得が開始されます。

次のセクションでは、レコードのスキップや重複を避けるために必要な、コンシューマーのシーケンスとサブシーケンスのチェックポイントの詳細について説明します。コンシューマーのレコード処理を停止し再起動するときに、レコードのスキップや重複が重要でない場合は、変更せずに既存のコードを実行してかまいません。

KPL の集約解除のための KCL の拡張

すでに説明したように、KPL の集約解除ではサブシーケンスチェックポイントを使用できます。サブシーケンスチェックポイントを使いやすくするために、UserRecord クラスが KCL に追加されています。

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

このクラスは、現在 Record の代わりに使用されています。これは Record のサブクラスであるため、既存のコードは影響を受けません。UserRecord クラスは、実際のサブレコードと通常の集約されていないレコードの両方を表します。集約されていないレコードは、サブレコードを 1 つだけ含む集約されたレコードと考えることができます。

さらに、2 つの新しいオペレーションが IRecordProcessorCheckpointer に追加されています。

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

サブシーケンス番号チェックポイントの使用を開始するには、次の変更を行います。次のフォームコードを変更します。

checkpointer.checkpoint(record.getSequenceNumber());

新しいフォームコードは次のようになります。

checkpointer.checkpoint(record);

サブシーケンスチェックポイントでは、checkpoint(Record record) フォームを使用することをお勧めします。ただし、チェックポイントの作成で使用する文字列にすでに sequenceNumbers を保存している場合は、次の例に示すように、subSequenceNumber も保存する必要があります。

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

この実装では内部で UserRecord を必ず使用するため、Record から UserRecord へのキャストは必ず成功します。シーケンス番号の計算を実行する必要がない場合、この方法はお勧めしません。

KPL ユーザーレコードの処理中に、KCL は、サブシーケンス番号を Amazon DynamoDB に各行の追加フィールドとして書き込みます。以前のバージョンの KCL では、チェックポイントを再開するときに AFTER_SEQUENCE_NUMBER を使用してレコードを取得していました。KPL サポートを含む現在の KCL では、代わりに AT_SEQUENCE_NUMBER を使用します。チェックポイントが作成されたシーケンス番号のレコードを取得するとき、チェックポイントが作成されたサブシーケンス番号がチェックされ、サブレコードが必要に応じて削除されます(最後のサブレコードにチェックポイントが作成されている場合、すべてのサブレコードが削除されます)。ここでも、集約されていないレコードは、単一のサブレコードを含む集約されたレコードと考えることができ、集約されたレコードと集約されていないレコードの両方で同じアルゴリズムを使用できます。

GetRecords の直接的な使用

KCL の使用を選択せずに、API オペレーション GetRecords を直接呼び出して Kinesis Data Streams レコードを取得することもできます。これらの取得したレコードを元の KPL ユーザーレコードに解凍するには、UserRecord.java にある次の静的なオペレーションの 1 つを呼び出します。

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

最初のオペレーションでは、startingHashKey についてデフォルト値 0(ゼロ)を使用し、endingHashKey についてデフォルト値 2^128 -1 を使用します。

これらの各オペレーションは、Kinesis Data Streams レコードの指定されたリストを KPL ユーザーレコードのリストに集約解除します。明示的なハッシュキーまたはパーティションキーが startingHashKeyendingHashKey の範囲(境界を含む)の外にある KPL ユーザーレコードは、返されるレコードのリストから破棄されます。