Verwenden der Kinesis Client Library - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden der Kinesis Client Library

Eine der Methoden zur Entwicklung benutzerdefinierter Verbraucheranwendungen, die Daten aus KDS-Datenströmen verarbeiten können, ist die Verwendung der Kinesis Client Library (KCL).

Anmerkung

Sowohl für KCL 1.x als auch für KCL 2.x wird empfohlen, je nach Nutzungsszenario ein Upgrade auf die neueste KCL-1.x-Version oder KCL-2.x-Version durchzuführen. Sowohl KCL 1.x als auch KCL 2.x werden regelmäßig mit neueren Versionen aktualisiert, die die neuesten Abhängigkeits- und Sicherheitspatches, Bugfixes und abwärtskompatible neue Features enthalten. Weitere Informationen finden Sie unter https://github.com/awslabs/ amazon-kinesis-client /releases.

Was ist die Kinesis Client Library?

KCL unterstützt Sie bei der Nutzung und Verarbeitung von Daten aus einem Kinesis-Datenstrom, indem sie sich um viele der komplexen Aufgaben kümmert, die mit verteilter Datenverarbeitung verbunden sind. Dazu gehören der Lastenausgleich über mehrere Anwendungs-Instances hinweg, die Reaktion auf Ausfälle von Verbraucheranwendungs-Instances, die Überprüfung verarbeiteter Datensätze und die Reaktion auf Resharding. Die KCL kümmert sich um all diese Unteraufgaben, sodass Sie sich darauf konzentrieren können, Ihre benutzerdefinierte Logik für die Verarbeitung von Datensätzen zu schreiben.

Die KCL unterscheidet sich von den Kinesis-Data-Streams-APIs, die in den AWS -SDKs verfügbar sind. Die Kinesis-Data-Streams-APIs helfen Ihnen bei der Verwaltung vieler Aspekte von Kinesis Data Streams, einschließlich Stream-Erstellung, Resharding sowie Hinzufügen und Abrufen von Datensätzen. Die KCL bietet eine Abstraktionsebene für all diese Unteraufgaben, sodass Sie sich auf die benutzerdefinierte Datenverarbeitungslogik Ihrer Verbraucheranwendung konzentrieren können. Informationen zur Kinesis-Data-Streams-API finden Sie in der Amazon-Kinesis-API-Referenz.

Wichtig

Die KCL ist eine Java-Bibliothek. Die Support für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle namens bereitgestellt. MultiLangDaemon Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie beispielsweise die KCL für Python installieren und Ihre Consumer-Anwendung vollständig in Python schreiben, müssen Sie trotzdem Java auf Ihrem System installiert haben, da MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. an die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie unter MultiLangDaemon KCL-Projekt. MultiLangDaemon GitHub

Die KCL dient als Vermittler zwischen Ihrer Datensatzverarbeitungslogik und Kinesis Data Streams. Die KCL führt die folgenden Aufgaben aus:

  • Stellt eine Verbindung mit dem Datenstrom her

  • Listet die Shards innerhalb des Datenstroms auf

  • Nutzt Leases, um Shard-Verbindungen mit seinen Workern zu koordinieren

  • Instanziiert einen Datensatzverarbeiter für jeden Shard, der verwaltet wird

  • Ruft Datensätze aus dem Datenstrom ab

  • Überträgt per Push Datensätze an den entsprechenden Datensatzverarbeiter

  • Verwendet Checkpoints für verarbeitete Datensätze

  • Gleicht zwischen Shard-Worker-Zuordnungen (Leases) aus, wenn sich die Anzahl der Worker-Instances ändert oder wenn der Datenstrom erneut geteilt wird (Shards werden aufgeteilt oder zusammengeführt)

Verfügbare KCL-Versionen

Derzeit können Sie eine der folgenden unterstützten Versionen von KCL verwenden, um Ihre benutzerdefinierten Anwendungen für Privatanwender zu erstellen:

Sie können entweder KCL 1.x oder KCL 2.x verwenden, um Verbraucheranwendungen zu erstellen, die einen gemeinsamen Durchsatz verwenden. Weitere Informationen finden Sie unter Entwickeln benutzerdefinierter Verbraucher mit gemeinsam genutztem Durchsatz unter Verwendung von KCL.

Um Verbraucheranwendungen zu erstellen, die einen dedizierten Durchsatz verwenden (Enhanced Fan-Out-Consumer), können Sie nur KCL 2.x verwenden. Weitere Informationen finden Sie unter Entwickeln benutzerdefinierter Verbraucher mit dediziertem Durchsatz (Erweitertes Rundsenden).

Informationen zu den Unterschieden zwischen KCL 1.x und KCL 2.x sowie Anweisungen zur Migration von KCL 1.x zu KCL 2.x finden Sie unter Migrieren von Verbrauchern von KCL 1.x zu KCL 2.x.

KCL-Konzepte

  • KCL-Konsumentenanwendung – eine Anwendung, die unter Verwendung von KCL kundenspezifisch entwickelt wurde und zum Lesen und Verarbeiten von Datensätzen aus Datenströmen bestimmt ist.

  • Konsumentenanwendungs-Instance – KCL-Konsumentenanwendungen werden in der Regel verteilt, wobei eine oder mehrere Anwendungs-Instances gleichzeitig ausgeführt werden, um Fehler zu koordinieren und einen dynamischen Lastenausgleich bei der Verarbeitung von Datensätzen vorzunehmen.

  • Worker – eine übergeordnete Klasse, die eine KCL-Konsumentenanwendungs-Instance verwendet, um mit der Datenverarbeitung zu beginnen.

    Wichtig

    Jede KCL-Konsumentenanwendungs-Instance hat einen Worker.

    Der Worker initialisiert und überwacht verschiedene Aufgaben, darunter das Synchronisieren von Shard- und Leasing-Informationen, das Verfolgen von Shard-Zuweisungen und das Verarbeiten von Daten aus den Shards. Ein Worker stellt KCL die Konfigurationsinformationen für die Consumer-Anwendung zur Verfügung, z. B. den Namen des Datenstreams, dessen Datensätze diese KCL-Consumer-Anwendung verarbeiten wird, und die AWS Anmeldeinformationen, die für den Zugriff auf diesen Datenstrom erforderlich sind. Der Worker startet außerdem die spezifische Instance der KCL-Konsumentenanwendung, um Datensätze aus dem Datenstrom an die Datensatzprozessoren zu übermitteln.

    Wichtig

    In KCL 1.x heißt diese Klasse Worker. Weitere Informationen (dies sind die Java-KCL-Repositorys) finden Sie unter https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/Kinesis/ClientLibrary/lib/Worker/Worker.java. In KCL 2.x heißt diese Klasse Scheduler. Der Zweck von Scheduler in KCL 2.x ist identisch mit dem Zweck von Worker in KCL 1.x. Weitere Informationen zur Scheduler-Klasse in KCL 2.x amazon-kinesis-clientfinden Sie unter amazon-kinesis-client https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java.

  • Lease – Daten, die die Bindung zwischen einem Worker und einem Shard definieren. Verteilte KCL-Konsumentenanwendungen nutzen Leases, um die Verarbeitung von Datensätzen auf eine ganze Flotte von Workern zu verteilen. Zu einem bestimmten Zeitpunkt ist jeder Shard von Datensätzen durch einen Lease, der durch die leaseKey-Variable identifiziert wird, an einen bestimmten Worker gebunden.

    Standardmäßig kann ein Mitarbeiter einen oder mehrere Leasingverträge (abhängig vom Wert der Variablen Worker) gleichzeitig abschließen. maxLeasesFor

    Wichtig

    Jeder Worker versucht, alle verfügbaren Leases für alle verfügbaren Shards in einem Datenstrom zu halten. Aber es kann nur jeweils ein Worker jeden Lease zu einem bestimmten Zeitpunkt erfolgreich halten.

    Wenn Sie beispielsweise eine Konsumentenanwendungs-Instance A mit Worker A haben, die einen Datenstrom mit 4 Shards verarbeitet, kann Worker A Leases für die Shards 1, 2, 3 und 4 gleichzeitig halten. Wenn Sie jedoch zwei Konsumentenanwendungs-Instances haben: A und B mit Worker A und Worker B und diese Instances einen Datenstrom mit 4 Shards verarbeiten, können Worker A und Worker B nicht gleichzeitig den Lease für Shard 1 halten. Ein Worker hält den Lease für einen bestimmten Shard, bis er bereit ist, die Verarbeitung der Datensätze dieses Shards zu beenden, oder bis er ausfällt. Wenn ein Worker den Lease beendet, nimmt ein anderer Worker den Lease auf und hält ihn.

    Weitere Informationen (dies sind die Java-KCL-Repositorys) finden Sie unter https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java für KCL 1.x und https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java für KCL 2.x. amazon-kinesis-client amazon-kinesis-client

  • Leasetabelle – Eine einzigartige Amazon-DynamoDB-Tabelle, die verwendet wird, um die Shards in einem KDS-Datenstrom zu verfolgen, die von den Workern der KCL-Konsumentenanwendung geleast und verarbeitet werden. Die Leasetabelle muss (innerhalb eines Workers und für alle Worker) mit den neuesten Shard-Informationen aus dem Datenstrom synchronisiert bleiben, während die KCL-Konsumentenanwendung ausgeführt wird. Weitere Informationen finden Sie unter Mithilfe einer Leasetabelle können Sie die von der KCL-Konsumentenanwendung verarbeiteten Shards verfolgen.

  • Datensatzprozessor – Die Logik, die definiert, wie Ihre KCL-Konsumentenanwendung die Daten verarbeitet, die sie aus den Datenströmen erhält. Zur Laufzeit instanziiert eine KCL-Konsumentenanwendungs-Instance einen Worker, und dieser Worker instanziiert einen Datensatzprozessor für jeden Shard, für den er einen Lease hat.

Mithilfe einer Leasetabelle können Sie die von der KCL-Konsumentenanwendung verarbeiteten Shards verfolgen

Was ist eine Leasetabelle

Für jede Anwendung mit Amazon Kinesis Data Streams verwendet KCL eine einzigartige Leasetabelle (gespeichert in einer Amazon-DynamoDB-Tabelle), die verwendet wird, um die Shards in einem KDS-Datenstrom zu verfolgen, die von den Workern der KCL-Konsumentenanwendung geleast und verarbeitet werden.

Wichtig

KCL verwendet den Namen der Konsumentenanwendung, um den Namen der Leasetabelle zu erstellen, die diese Konsumentenanwendung verwendet. Daher muss der Name jeder Konsumentenanwendung eindeutig sein.

Sie können die Leasetabelle mit der Amazon-DynamoDB-Konsole anzeigen, während die Konsumentenanwendung ausgeführt wird.

Wenn die Leasetabelle für Ihre KCL-Konsumentenanwendung beim Start der Anwendung nicht vorhanden ist, erstellt einer der Worker die Leasetabelle für diese Anwendung.

Wichtig

Ihr Konto wird neben den Kosten für Kinesis Data Streams mit den Kosten belastet, die für die DynamoDB-Tabelle anfallen.

Jede Zeile in der Leasetabelle stellt einen Shard dar, der von den Workern Ihrer Konsumentenanwendung verarbeitet wird. Wenn Ihre KCL-Konsumentenanwendung nur einen Datenstrom verarbeitet, dann ist leaseKey, der Hash-Schlüssel für die Leasetabelle, die Shard-ID. Wenn Sie Verarbeitung mehrerer Datenströme mit derselben Konsumentenanwendung KCL 2.x für Java sind, dann sieht die Struktur des leaseKey wie folgt aus: account-id:StreamName:streamCreationTimestamp:ShardId. z. B. 111111111:multiStreamTest-1:12345:shardId-000000000336.

Neben der Shard-ID enthält jede Zeile noch folgende Daten:

  • checkpoint: Die letzte Prüfpunkt-Sequenznummer des Shards. Dieser Wert ist für alle Shards im Datenstrom eindeutig.

  • checkpointSubSequenceNummer: Wenn Sie die Aggregationsfunktion der Kinesis Producer Library verwenden, ist dies eine Erweiterung des Checkpoints, mit der einzelne Benutzerdatensätze innerhalb des Kinesis-Datensatzes verfolgt werden.

  • leaseCounter: Wird für ein Lease Versioning verwendet, damit Auftragnehmer erkennen, wenn ihr Lease von einem anderen Auftragnehmer übernommen wurde.

  • leaseKey: Eine eindeutige Kennung für einen Lease. Jeder Lease gilt für einen bestimmten Shard im Datenstrom und wird immer nur von einem Worker gehalten.

  • leaseOwner: Der Auftragnehmer, dem der Lease gehört.

  • ownerSwitchesSinceCheckpoint: Wie oft hat dieser Leasingvertrag seit der letzten Erstellung eines Checkpoints die Mitarbeiter gewechselt?

  • parentShardId: Wird verwendet, um sicherzustellen, dass der übergeordnete Shard vollständig verarbeitet ist, bevor die Verarbeitung der untergeordneten Shards beginnt. So wird sichergestellt, dass Datensätze in der gleichen Reihenfolge verarbeitet werden, in der sie in den Stream eingespeist wurden.

  • hashrange: Wird von PeriodicShardSyncManager verwendet, um regelmäßige Synchronisierungen durchzuführen, um fehlende Shards in der Leasetabelle zu finden und bei Bedarf Leases für sie zu erstellen.

    Anmerkung

    Diese Daten sind in der Leasetabelle für jeden Shard enthalten, der mit KCL 1.14 und KCL 2.3 beginnt. Weitere Hinweise zu PeriodicShardSyncManager und die periodische Synchronisierung zwischen Leases und Shards finden Sie unter Wie wird eine Leasetabelle mit den Shards in einem KDS-Datenstrom synchronisiert.

  • childshards: Wird von LeaseCleanupManager verwendet, um den Verarbeitungsstatus des untergeordneten Shards zu überprüfen und zu entscheiden, ob der übergeordnete Shard aus der Leasetabelle gelöscht werden kann.

    Anmerkung

    Diese Daten sind in der Leasetabelle für jeden Shard enthalten, der mit KCL 1.14 und KCL 2.3 beginnt.

  • shardID: Die ID des Shards.

    Anmerkung

    Diese Daten sind nur dann in der Leasetabelle enthalten, wenn Sie Verarbeitung mehrerer Datenströme mit derselben Konsumentenanwendung KCL 2.x für Java sind. Dies wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und darüber hinaus.

  • Streamname Die Kennung des Datenstroms im folgenden Format: account-id:StreamName:streamCreationTimestamp.

    Anmerkung

    Diese Daten sind nur dann in der Leasetabelle enthalten, wenn Sie Verarbeitung mehrerer Datenströme mit derselben Konsumentenanwendung KCL 2.x für Java sind. Dies wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und darüber hinaus.

Durchsatz

Wenn Ihre Amazon Kinesis Data Streams-Anwendung Ausnahmen für den bereitgestellten Durchsatz erhält, sollten Sie den bereitgestellten Durchsatz für die DynamoDB-Tabelle erhöhen. Die KCL erstellt die Tabelle mit einem bereitgestellten Durchsatz von 10 Lese- und 10 Schreibvorgängen pro Sekunde, dies reicht aber möglichweise für Ihre Anwendung nicht aus. Beispiel: Wenn Ihre Amazon Kinesis Data Streams-Anwendung häufig Prüfpunkte setzt oder einen Stream verarbeitet, der aus vielen Shards besteht, müssen Sie den Durchsatz möglicherweise erhöhen.

Weitere Informationen zum bereitgestellten Durchsatz in DynamoDB finden Sie unter Lese-/Schreibkapazitätsmodus und Arbeiten mit Tabellen und Daten im Amazon-DynamoDB-Entwicklerhandbuch.

Wie wird eine Leasetabelle mit den Shards in einem KDS-Datenstrom synchronisiert

Worker in KCL-Konsumentenanwendungen verwenden Leases, um Shards aus einem bestimmten Datenstrom zu verarbeiten. Die Informationen darüber, welcher Worker zu einem bestimmten Zeitpunkt welchen Shard least, werden in einer Leasetabelle gespeichert. Die Leasetabelle muss mit den neuesten Shard-Informationen aus dem Datenstrom synchronisiert bleiben, während die KCL-Konsumentenanwendung ausgeführt wird. KCL synchronisiert die Leasetabelle mit den Shard-Informationen, die während des Bootstrapings der Konsumentenanwendung (entweder wenn die Konsumentenanwendung initialisiert oder neu gestartet wird) vom Dienst Kinesis Data Streams erfasst werden und auch immer dann, wenn ein Shard, der gerade verarbeitet wird, ein Ende erreicht (Resharding), erreicht. Mit anderen Worten, die Worker oder eine KCL-Konsumentenanwendung werden mit dem Datenstrom synchronisiert, den sie beim ersten Bootstrap der Konsumentenanwendung verarbeiten, und immer dann wenn die Konsumentenanwendung auf ein Reshard-Ereignis des Datenstroms trifft.

Synchronisation in KCL 1.0–1.13 und KCL 2.0–2.2

In KCL 1.0–1.13 und KCL 2.0–2.2 synchronisiert KCL beim Bootstraping der Konsumentenanwendung und auch bei jedem Reshard-Ereignis des Datenstroms die Leasetabelle mit den Shard-Informationen, die vom Dienst Kinesis Data Streams abgerufen wurden, indem die ListShards oder die DescribeStream-Discovery-APIs aufgerufen werden. In allen oben aufgeführten KCL-Versionen führt jeder Worker einer KCL-Konsumentenanwendung die folgenden Schritte durch, um den Lease-/Shard-Synchronisierungsprozess während des Bootstrappings der Konsumentenanwendung und bei jedem Stream-Reshard-Ereignis durchzuführen:

  • Ruft alle Shards für Daten des Streams ab, der gerade verarbeitet wird

  • Ruft alle Shard-Leases aus der Leasetabelle ab

  • Filtert jeden offenen Shard heraus, für den es in der Leasetabelle keinen Lease gibt

  • Iteriert über alle gefundenen offenen Shards und für jeden offenen Shard ohne offenes übergeordnetes Element:

    • Durchläuft den Hierarchiebaum über den Pfad der Vorgänger, um festzustellen, ob der Shard ein Nachfolger ist. Ein Shard wird als untergeordnetes Element betrachtet, wenn gerade ein Vorgänger-Shard verarbeitet wird (der Leaseeintrag für den Vorgänger-Shard ist in der Leasetabelle vorhanden) oder wenn ein Vorgänger-Shard verarbeitet werden soll (wenn die Anfangsposition beispielsweise TRIM_HORIZON oder AT_TIMESTAMP ist)

    • Handelt es sich bei dem offenen Shard im Kontext um einen Nachfolger-Shard, überprüft KCL den Shard anhand seiner Ausgangsposition und erstellt, falls erforderlich, Leaseverträge für die übergeordneten Shard

Synchronisation in KCL 2.x, ab KCL 2.3 und höher

Beginnend mit den neuesten unterstützten Versionen von KCL 2.x (KCL 2.3) und höher unterstützt die Bibliothek nun die folgenden Änderungen am Synchronisationsprozess. Diese Änderungen an der Lease-/Shard-Synchronisierung reduzieren die Anzahl der API-Aufrufe, die von KCL-Konsumentenanwendungen an den Service Kinesis Data Streams getätigt werden, erheblich und optimieren das Lease-Management in Ihrer KCL-Konsumentenanwendung.

  • Wenn beim Bootstraping der Anwendung die Leasetabelle leer ist, verwendet KCL die Filteroption der ListShard-API (den optionalen Anforderungsparameter von ShardFilter), um Leases nur für einen Snapshot von Shards abzurufen und zu erstellen, die zu dem durch den ShardFilter-Parameter angegebenen Zeitpunkt geöffnet waren. Mit dem ShardFilter-Parameter können Sie die Antwort der ListShards-API herausfiltern. Die einzige erforderliche Eigenschaft des ShardFilter-Parameters ist Type. KCL verwendet die Type-Filtereigenschaft und die folgenden gültigen Werte, um offene Shards, für die möglicherweise neue Leases erforderlich sind, zu identifizieren und eine Momentaufnahme zurückzugeben:

    • AT_TRIM_HORIZON – Die Antwort umfasst alle Shards, die am TRIM_HORIZON geöffnet waren.

    • AT_LATEST – Die Antwort enthält nur die aktuell geöffneten Shards des Datenstroms.

    • AT_TIMESTAMP – Die Antwort umfasst alle Shards, deren Startzeitstempel kleiner oder gleich dem angegebenen Zeitstempel sind und deren Endzeitstempel größer oder gleich dem angegebenen Zeitstempel ist oder solche, die noch offen sind.

    ShardFilter wird verwendet, wenn Leases für eine leere Leasetabelle erstellt werden, um Leases für einen Snapshot von Shards zu initialisieren, die unter RetrievalConfig#initialPositionInStreamExtended angegeben sind.

    Mehr über ShardFilter erfahren Sie unter https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Anstatt dass alle Worker die Leasing-/Shard-Synchronisierung durchführen, um die Leasetabelle mit den neuesten Shards im Datenstrom auf dem neuesten Stand zu halten, führt ein einziger gewählter Worker-Leader die Lease/Shard-Synchronisierung durch.

  • KCL 2.3 verwendet den ChildShards-Rückgabeparameter der GetRecords- und der SubscribeToShard-APIs, um die Leasing-/Shard-Synchronisierung durchzuführen, die bei SHARD_END für geschlossene Shards stattfindet, sodass ein KCL-Worker nur Leases für die untergeordneten Shards des Shards erstellen kann, dessen Verarbeitung er abgeschlossen hat. Bei gemeinsam genutzten Konsumentenanwendungen verwendet diese Optimierung der Lease/Shard-Synchronisierung den ChildShards-Parameter der GetRecords-API. Bei Konsumentenanwendungen mit dediziertem Durchsatz (Enhanced Fan-out) wird für diese Optimierung der Lease/Shard-Synchronisierung der ChildShards-Parameter der SubscribeToShard-API verwendet. Weitere Informationen finden Sie unter GetRecordsSubscribeToShards, und. ChildShard

  • Mit den oben genannten Änderungen ändert sich das Verhalten von KCL von dem Modell, dass alle Worker über alle vorhandenen Shards lernen, hin zu dem Modell, dass Worker nur noch über die untergeordneten Shards der Shards lernen, die jeder Worker besitzt. Daher führt KCL jetzt zusätzlich zu der Synchronisation, die beim Bootstraping von Konsumentenanwendungen und bei Reshard-Ereignissen stattfindet, auch zusätzliche regelmäßige Shard-/Lease-Scans durch, um potenzielle Lücken in der Leasetabelle zu identifizieren (mit anderen Worten, um mehr über alle neuen Shards zu erfahren), um sicherzustellen, dass der gesamte Hash-Bereich des Datenstroms verarbeitet wird, und um bei Bedarf Leases für sie zu erstellen. PeriodicShardSyncManager ist die Komponente, die für die regelmäßige Ausführung von Lease-/Shard-Scans verantwortlich ist.

    Weitere Informationen zu PeriodicShardSyncManager KCL 2.3 finden Sie unter https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/leases/ amazon-kinesis-client .java #L201 -L213. LeaseManagementConfig

    In KCL 2.3 stehen neue Konfigurationsoptionen, um PeriodicShardSyncManager in LeaseManagementConfig zu konfigurieren:

    Name Standardwert Beschreibung
    leasesRecoveryAuditorExecutionFrequencyMillis

    120 000 (2 Minuten)

    Häufigkeit (in Millionen), mit der der Auditor in der Leasetabelle nach teilweisen Leases sucht. Wenn der Auditor eine Lücke in den Leases für einen Stream feststellt, würde er die Shard-Synchronisierung auf der Grundlage von leasesRecoveryAuditorInconsistencyConfidenceThreshold auslösen.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Konfidenzschwellenwert für die regelmäßige Auditortätigkeit, um festzustellen, ob die Leases für einen Datenstrom in der Leasetabelle inkonsistent sind. Wenn der Auditor für diesen Datenstrom mehrmals hintereinander dieselben Inkonsistenzen feststellt, würde er eine Shard-Synchronisierung auslösen.

    CloudWatch Außerdem werden jetzt neue Messwerte ausgegeben, um den Zustand von zu überwachen. PeriodicShardSyncManager Weitere Informationen finden Sie unter PeriodicShardSyncManager.

  • Einschließlich einer Optimierung auf HierarchicalShardSyncer, um nur Leases für eine Shard-Ebene zu erstellen.

Synchronisation in KCL 1.x, ab KCL 1.14 und höher

Beginnend mit den neuesten unterstützten Versionen von KCL 1.x (KCL 1.14) und höher unterstützt die Bibliothek nun die folgenden Änderungen am Synchronisationsprozess. Diese Änderungen an der Lease-/Shard-Synchronisierung reduzieren die Anzahl der API-Aufrufe, die von KCL-Konsumentenanwendungen an den Service Kinesis Data Streams getätigt werden, erheblich und optimieren das Lease-Management in Ihrer KCL-Konsumentenanwendung.

  • Wenn beim Bootstraping der Anwendung die Leasetabelle leer ist, verwendet KCL die Filteroption der ListShard-API (den optionalen Anforderungsparameter von ShardFilter), um Leases nur für einen Snapshot von Shards abzurufen und zu erstellen, die zu dem durch den ShardFilter-Parameter angegebenen Zeitpunkt geöffnet waren. Mit dem ShardFilter-Parameter können Sie die Antwort der ListShards-API herausfiltern. Die einzige erforderliche Eigenschaft des ShardFilter-Parameters ist Type. KCL verwendet die Type-Filtereigenschaft und die folgenden gültigen Werte, um offene Shards, für die möglicherweise neue Leases erforderlich sind, zu identifizieren und eine Momentaufnahme zurückzugeben:

    • AT_TRIM_HORIZON – Die Antwort umfasst alle Shards, die am TRIM_HORIZON geöffnet waren.

    • AT_LATEST – Die Antwort enthält nur die aktuell geöffneten Shards des Datenstroms.

    • AT_TIMESTAMP – Die Antwort umfasst alle Shards, deren Startzeitstempel kleiner oder gleich dem angegebenen Zeitstempel sind und deren Endzeitstempel größer oder gleich dem angegebenen Zeitstempel ist oder solche, die noch offen sind.

    ShardFilter wird verwendet, wenn Leases für eine leere Leasetabelle erstellt werden, um Leases für einen Snapshot von Shards zu initialisieren, die unter KinesisClientLibConfiguration#initialPositionInStreamExtended angegeben sind.

    Mehr über ShardFilter erfahren Sie unter https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Anstatt dass alle Worker die Leasing-/Shard-Synchronisierung durchführen, um die Leasetabelle mit den neuesten Shards im Datenstrom auf dem neuesten Stand zu halten, führt ein einziger gewählter Worker-Leader die Lease/Shard-Synchronisierung durch.

  • KCL 1.14 verwendet den ChildShards-Rückgabeparameter der GetRecords- und der SubscribeToShard-APIs, um die Leasing-/Shard-Synchronisierung durchzuführen, die bei SHARD_END für geschlossene Shards stattfindet, sodass ein KCL-Worker nur Leases für die untergeordneten Shards des Shards erstellen kann, dessen Verarbeitung er abgeschlossen hat. Weitere Informationen finden Sie unter GetRecordsund ChildShard.

  • Mit den oben genannten Änderungen ändert sich das Verhalten von KCL von dem Modell, dass alle Worker über alle vorhandenen Shards lernen, hin zu dem Modell, dass Worker nur noch über die untergeordneten Shards der Shards lernen, die jeder Worker besitzt. Daher führt KCL jetzt zusätzlich zu der Synchronisation, die beim Bootstraping von Konsumentenanwendungen und bei Reshard-Ereignissen stattfindet, auch zusätzliche regelmäßige Shard-/Lease-Scans durch, um potenzielle Lücken in der Leasetabelle zu identifizieren (mit anderen Worten, um mehr über alle neuen Shards zu erfahren), um sicherzustellen, dass der gesamte Hash-Bereich des Datenstroms verarbeitet wird, und um bei Bedarf Leases für sie zu erstellen. PeriodicShardSyncManager ist die Komponente, die für die regelmäßige Ausführung von Lease-/Shard-Scans verantwortlich ist.

    Wenn KinesisClientLibConfiguration#shardSyncStrategyType auf ShardSyncStrategyType.SHARD_END gesetzt ist, wird PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold verwendet, um den Schwellenwert für die Anzahl der aufeinanderfolgenden Scans mit Lücken in der Leasetabelle zu bestimmen, nach dem eine Shard-Synchronisierung erzwungen werden soll. Wenn KinesisClientLibConfiguration#shardSyncStrategyType auf ShardSyncStrategyType.PERIODIC gesetzt ist, wird leasesRecoveryAuditorInconsistencyConfidenceThreshold ignoriert.

    Weitere Informationen zu KCL 1.14 finden Sie unter https://github.com/awslabs/ PeriodicShardSyncManager amazon-kinesis-client KinesisClientLibConfiguration /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ .java #L987 -L999.

    In KCL 1.14 ist eine neue Konfigurationsoption verfügbar, um PeriodicShardSyncManager in LeaseManagementConfig zu konfigurieren:

    Name Standardwert Beschreibung
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Konfidenzschwellenwert für die regelmäßige Auditortätigkeit, um festzustellen, ob die Leases für einen Datenstrom in der Leasetabelle inkonsistent sind. Wenn der Auditor für diesen Datenstrom mehrmals hintereinander dieselben Inkonsistenzen feststellt, würde er eine Shard-Synchronisierung auslösen.

    CloudWatch Außerdem werden jetzt neue Messwerte ausgegeben, um den Zustand von zu überwachen. PeriodicShardSyncManager Weitere Informationen finden Sie unter PeriodicShardSyncManager.

  • KCL 1.14 unterstützt jetzt auch die verzögerte Lease-Bereinigung. Leases werden asynchron von LeaseCleanupManager bei Erreichen von SHARD_END gelöscht, wenn ein Shard entweder die Aufbewahrungsfrist des Datenstroms überschritten hat oder wenn er aufgrund eines Resharding-Vorgangs geschlossen wurde.

    Es stehen neue Konfigurationsoptionen zur Verfügung, um LeaseCleanupManager zu konfigurieren.

    Name Standardwert Beschreibung
    leaseCleanupIntervalMillis

    1 Minute

    Intervall, in dem der Lease-Cleanup-Thread ausgeführt werden soll.

    completedLeaseCleanupIntervalMillis 5 Minuten

    Intervall, in dem überprüft werden soll, ob ein Lease abgeschlossen ist oder nicht.

    garbageLeaseCleanupIntervalMillis 30 Minuten

    Intervall, in dem geprüft werden soll, ob ein Lease Datenmüll ist (d. h. über die Aufbewahrungsfrist des Datenstroms hinaus abgeschnitten wurde) oder nicht.

  • Einschließlich einer Optimierung auf KinesisShardSyncer, um nur Leases für eine Shard-Ebene zu erstellen.

Verarbeitung mehrerer Datenströme mit derselben Konsumentenanwendung KCL 2.x für Java

In diesem Abschnitt werden die folgenden Änderungen in KCL 2.x für Java beschrieben, die es Ihnen ermöglichen, KCL-Consumer-Anwendungen zu erstellen, die mehr als einen Datenstrom gleichzeitig verarbeiten können.

Wichtig

Die Multistream-Verarbeitung wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und darüber hinaus.

Die Multistream-Verarbeitung wird NICHT für andere Sprachen unterstützt, in denen KCL 2.x implementiert werden kann.

Die Multistream-Verarbeitung wird in keiner Version von KCL 1.x unterstützt.

  • MultistreamTracker Schnittstelle

    Um eine Verbraucheranwendung zu erstellen, die mehrere Streams gleichzeitig verarbeiten kann, müssen Sie eine neue Schnittstelle namens implementieren MultistreamTracker. Diese Schnittstelle enthält die streamConfigList-Methode, die die Liste der Datenströme und ihrer Konfigurationen zurückgibt, die von der KCL-Konsumentenanwendung verarbeitet werden sollen. Beachten Sie, dass die Datenströme, die verarbeitet werden, während der Laufzeit der Konsumentenanwendung geändert werden können. streamConfigList wird regelmäßig von der KCL aufgerufen, um mehr über die Änderungen der zu verarbeitenden Datenströme zu erfahren.

    Die streamConfigList Methode füllt die StreamConfigListe auf.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Beachten Sie, dass es sich bei den Feldern StreamIdentifier und InitialPositionInStreamExtended um Pflichtfelder handelt, während consumerArn optional ist. Sie müssen den consumerArn nur angeben, wenn Sie KCL 2.x verwenden, um eine erweiterte Fan-Out-Konsumentenanwendung zu implementieren.

    Weitere Informationen dazu finden Sie unter https://github.com/awslabs/ amazon-kinesis-client /blob/v2.5.8/ StreamIdentifier /src/main/java/software/amazon/kinesis/common/ amazon-kinesis-client .java #L129. StreamIdentifier Um eine zu erstellen, empfehlen wir, dass Sie eine Multistream-Instanz aus der und der, die in Version 2.5.0 und höher verfügbar ist, erstellen. StreamIdentifier streamArn streamCreationEpoch Erstellen Sie in KCL v2.3 und v2.4, die dies nicht unterstützenstreamArm, eine Multistream-Instanz mithilfe des folgenden Formats. account-id:StreamName:streamCreationTimestamp Dieses Format ist veraltet und wird ab der nächsten Hauptversion nicht mehr unterstützt.

    MultistreamTracker beinhaltet auch eine Strategie zum Löschen von Leases alter Streams in der Leasetabelle (formerStreamsLeasesDeletionStrategy). Beachten Sie, dass die Strategie während der Laufzeit der Konsumentenanwendung NICHT geändert werden kann. Weitere Informationen finden Sie unter https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /src/main/java/software/amazon/kinesis/processor/ .java amazon-kinesis-client FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderist eine anwendungsweite Klasse, mit der Sie alle KCL 2.x-Konfigurationseinstellungen angeben können, die beim Erstellen Ihrer KCL-Consumer-Anwendung verwendet werden sollen. ConfigsBuilderDie Klasse unterstützt jetzt die Schnittstelle. MultistreamTracker Sie können ConfigsBuilder entweder mit dem Namen des einen Datenstroms initialisieren, aus dem Datensätze abgerufen werden sollen:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    Oder Sie können ConfigsBuilder mit initialisieren, MultiStreamTracker wenn Sie eine KCL-Consumer-Anwendung implementieren möchten, die mehrere Streams gleichzeitig verarbeitet.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Da die Multistream-Unterstützung für Ihre KCL-Konsumentenanwendung implementiert ist, enthält jede Zeile der Leasetabelle der Anwendung jetzt die Shard-ID und den Stream-Namen der mehreren Datenströme, die diese Anwendung verarbeitet.

  • Wenn die Multistream-Unterstützung für Ihre KCL-Konsumentenanwendung implementiert ist, hat der leaseKey die folgende Struktur: account-id:StreamName:streamCreationTimestamp:ShardId. z. B. 111111111:multiStreamTest-1:12345:shardId-000000000336.

    Wichtig

    Wenn Ihre vorhandene KCL-Konsumentenanwendung so konfiguriert ist, dass sie nur einen Datenstrom verarbeitet, ist der leaseKey (der Hash-Schlüssel für die Leasetabelle) die Shard-ID. Wenn Sie diese vorhandene KCL-Konsumentenanwendung für die Verarbeitung mehrerer Datenströme neu konfigurieren, wird Ihre Leasetabelle beschädigt, da bei Multistream-Unterstützung die leaseKey-Struktur wie folgt aussehen muss: account-id:StreamName:StreamCreationTimestamp:ShardId.

Verwenden der Kinesis-Clientbibliothek mit der AWS Glue-Schemaregistrierung

Sie können Ihre Kinesis-Datenströme in die AWS Glue-Schemaregistrierung integrieren. Mit dem AWS Glue Schema Registry können Sie Schemata zentral erkennen, steuern und weiterentwickeln und gleichzeitig sicherstellen, dass die erstellten Daten kontinuierlich durch ein registriertes Schema validiert werden. Ein Schema definiert die Struktur und das Format eines Datensatzes. Ein Schema ist eine versionierte Spezifikation für zuverlässige Datenveröffentlichung, -nutzung oder -speicherung. Mit der AWS Glue Schema Registry können Sie die end-to-end Datenqualität und Datenverwaltung in Ihren Streaming-Anwendungen verbessern. Weitere Informationen finden Sie unter AWS Glue Schema Registry. Eine Möglichkeit, diese Integration einzurichten, ist die KCL in Java.

Wichtig

Derzeit wird die Integration von Kinesis Data Streams und der AWS Glue-Schemaregistrierung nur für Kinesis-Datenstreams unterstützt, die in Java implementierte KCL 2.3-Consumer verwenden. Mehrsprachige Unterstützung wird nicht bereitgestellt. KCL-1.0-Konsumenten werden nicht unterstützt. KCL-2.x-Konsumenten vor KCL 2.3 werden nicht unterstützt.

Detaillierte Anweisungen zur Einrichtung der Integration von Kinesis Data Streams mit Schema Registry mithilfe der KCL finden Sie im Abschnitt „Interaktion mit Daten mithilfe der KPL/KCL-Bibliotheken“ unter Anwendungsfall: Integration von Amazon Kinesis Data Streams mit der Glue Schema Registry. AWS