Datenproduzent – Disaggregation - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Datenproduzent – Disaggregation

Ab Version 1.4.0 unterstützt die KCL eine automatische Disaggregation von KPL-Benutzerdatensätzen. Code für Konsumentenanwendungen, der mit früheren KCL-Versionen geschrieben wurde, wird nach der Aktualisierung der KCL ohne Änderungen kompiliert. Wenn jedoch eine KPL-Aggregation auf Konsumentenseite verwendet wird, ist beim Checkpointing Folgendes zu beachten: Alle untergeordneten Datensätze innerhalb eines aggregierten Datensatzes haben dieselbe Sequenznummer. Somit müssen zusätzliche Daten mit dem Prüfpunkt gespeichert werden, wenn Sie zwischen untergeordneten Datensätze unterscheiden müssen. Diese zusätzlichen Daten werden als Teilsequenznummer bezeichnet.

Migrieren von früheren KCL-Versionen

Sie müssen Ihre vorhandenen Aufrufe zum Setzen eines Prüfpunkts in Verbindung mit einer Aggregation nicht ändern. Sie können nach wie vor alle Datensätze erfolgreich abrufen, die in Kinesis Data Streams gespeichert sind. Die KCL stellt nun zwei neue Prüfpunkt-Operationen zur Verfügung, um bestimmte, unten beschriebene Anwendungsfälle zu unterstützen.

Falls Ihr Code vor dem KPL-Support für die KCL geschrieben wurde und Ihre Prüfpunkt-Operation ohne Argumente aufgerufen wird, entspricht dies dem Checkpointing der Sequenznummer des letzten KPL-Benutzerdatensatzes im Stapel. Wird Ihre Prüfpunkt-Operation mit einer Sequenznummernzeichenfolge aufgerufen, entspricht dies dem Checkpointing der angegebenen Sequenznummer des Stapels zusammen mit der impliziten Teilsequenznummer 0 (null).

Das Aufrufen der neuen KCL-Prüfpunkt-Operation checkpoint() ohne Argumente entspricht semantisch dem Checkpointing der Sequenznummer des letzten Record-Aufrufs im Stapels zusammen mit der impliziten Teilsequenznummer 0 (null).

Das Aufrufen der neuen KCL-Prüfpunkt-Operation checkpoint(Record record) entspricht semantisch dem Checkpointing der angegebenen Sequenznummer von Record zusammen mit der impliziten Teilsequenznummer 0 (null). Handelt es sich beim Record-Aufruf um einen UserRecord, wird ein Checkpointing der Sequenznummer und Teilsequenznummer von UserRecord durchgeführt.

Das Aufrufen der neuen KCL-Prüfpunkt-Operation checkpoint(String sequenceNumber, long subSequenceNumber) führt explizit zu einem Checkpointing der angegebenen Sequenznummer zusammen mit der angegebenen Teilsequenznummer.

In all diesen Fällen kann, nachdem der Prüfpunkt in der Amazon-DynamoDB-Prüfpunkttabelle gespeichert wurde, die KCL das Abrufen der Datensätze fehlerfrei wieder aufnehmen, auch wenn die Anwendung abstürzt und neu gestartet wird. Sind mehr Datensätze in der Sequenz enthalten, beginnt das Abrufen mit dem Datensatz, dem die nächste Teilsequenznummer zugeordnet wurde, innerhalb des Datensatzes mit der Sequenznummer, für die zuletzt ein Prüfpunkt gesetzt wurde. Enthält der letzte Prüfpunkt die allerletzte Teilsequenznummer des vorherigen Sequenznummerndatensatzes, beginnt das Abrufen mit dem Datensatz, dem die nächst folgende Sequenznummer zugeordnet ist.

Im nächsten Abschnitt wird das Sequenz- und Teilsequenz-Checkpointing für Konsumenten erläutert, bei denen ein Überspringen und Duplizieren von Datensätzen vermieden werden muss. Wenn das Überspringen (oder Duplizieren) bei einem Stopp und Neustart der Datensatzverarbeitung Ihres Konsumenten keine Rolle spielt, können Sie Ihren vorhandenen Code ohne Änderungen ausführen.

KCL-Erweiterungen für die KPL-Disaggregation

Wie bereits erwähnt kann die KPL-Disaggregation ein Checkpointing der Teilsequenz umfassen. Zur Unterstützung des Checkpointings einer Teilsequenz wurde eine UserRecord-Klasse zur KCL hinzugefügt:

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

Diese Klasse wird nun anstelle von Record verwendet. Sie führt nicht zu Fehlern im vorhandenen Code, da es sich um eine Subklasse von Record handelt. Die UserRecord-Klasse repräsentiert sowohl tatsächlich untergeordnete Datensätze als auch standardmäßige, nicht aggregierte Datensätze. Nicht-aggregierte Datensätze sind aggregierte Datensätze mit genau einem untergeordneten Datensatz.

Darüber hinaus wurden zwei neue Operationen zu IRecordProcessorCheckpointer hinzugefügt:

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

Führen Sie die folgende Konvertierung durch, um mit dem Checkpointing einer Teilsequenznummer zu beginnen: Ändern Sie folgenden Formularcode:

checkpointer.checkpoint(record.getSequenceNumber());

Neue Formularcode:

checkpointer.checkpoint(record);

Wir empfehlen für das Checkpointing der Teilsequenz das checkpoint(Record record)-Formular. Wenn Sie jedoch bereits sequenceNumbers in Zeichenfolgen für das Checkpointing gespeichert haben, sollten Sie nun auch subSequenceNumber speichern, wie im folgenden Beispiel gezeigt:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Die Umwandlung von Record in UserRecord ist stets erfolgreich, da bei der Implementierung stets UserRecord im Hintergrund verwendet wird. Wenn Sie keine arithmetischen Operationen für die Sequenznummern durchführen müssen, ist dieser Ansatz nicht zu empfehlen.

Während der Verarbeitung der KPL-Benutzerdatensätze schreibt die KCL die Teilsequenznummer für jede Zeile als zusätzliches Feld in die Amazon DynamoDB. Frühere Versionen der KCL nutzten AFTER_SEQUENCE_NUMBER zum Abrufen von Datensätzen bei der Wiederaufnahme von Prüfpunkten. Die aktuelle KCL mit KPL-Support verwendet stattdessen AT_SEQUENCE_NUMBER. Wenn der Datensatz bei der Sequenznummer abgerufen wird, bei der ein Prüfpunkt gesetzt wurde, wird die Teilsequenznummer, für die ein Checkpointing durchgeführt wurde, geprüft und untergeordnete Datensätze gegebenenfalls ausgelassen (möglicherweise alle, wenn beim letzten Datensatz ein Prüfpunkt gesetzt wurde). Nochmals: Nicht aggregierte Datensätze können als aggregierte Datensätze mit einem einzelnen untergeordneten Datensatz betrachtet werden, sodass derselbe Algorithmus sowohl für aggregierte als auch für nicht aggregierte Datensätze funktioniert.

Direktes Verwenden von GetRecords

Sie können sich auch gegen die Verwendung der KCL entscheiden und stattdessen direkt die API-Operation GetRecords aufrufen, um Datensätze aus Kinesis Data Streams abzurufen. Zum Entpacken dieser abgerufenen Datensätze in Ihre ursprünglichen KPL-Benutzerdatensätze rufen Sie eine der folgenden statischen Operationen in UserRecord.java auf:

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

Die erste Operation verwendet den Standardwert 0 (null) für startingHashKey und den Standardwert 2^128 -1 für endingHashKey.

Jede dieser Operationen führt eine Disaggregation der vorhandenen Liste der Datensätze aus Kinesis Data Streams in eine Liste mit KPL-Benutzerdatensätzen durch. Alle KPL-Benutzerdatensätze, deren expliziter Hash- oder Partitionsschlüssel außerhalb des Bereichs von startingHashKey (einschließlich) liegt, sowie endingHashKey (einschließlich) werden aus der zurückgegebenen Datensatzliste entfernt.