Wählen Sie Ihre Cookie-Einstellungen aus

Wir verwenden essentielle Cookies und ähnliche Tools, die für die Bereitstellung unserer Website und Services erforderlich sind. Wir verwenden Performance-Cookies, um anonyme Statistiken zu sammeln, damit wir verstehen können, wie Kunden unsere Website nutzen, und Verbesserungen vornehmen können. Essentielle Cookies können nicht deaktiviert werden, aber Sie können auf „Anpassen“ oder „Ablehnen“ klicken, um Performance-Cookies abzulehnen.

Wenn Sie damit einverstanden sind, verwenden AWS und zugelassene Drittanbieter auch Cookies, um nützliche Features der Website bereitzustellen, Ihre Präferenzen zu speichern und relevante Inhalte, einschließlich relevanter Werbung, anzuzeigen. Um alle nicht notwendigen Cookies zu akzeptieren oder abzulehnen, klicken Sie auf „Akzeptieren“ oder „Ablehnen“. Um detailliertere Entscheidungen zu treffen, klicken Sie auf „Anpassen“.

Entwickeln Sie einen Kinesis Client Library-Consumer in Node.js

Fokusmodus
Entwickeln Sie einen Kinesis Client Library-Consumer in Node.js - Amazon Kinesis Data Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Anmerkung

Die Versionen 1.x und 2.x der Kinesis Client Library (KCL) sind veraltet. Wir empfehlen die Migration auf die KCL-Version 3.x, die eine verbesserte Leistung und neue Funktionen bietet. Die aktuelle KCL-Dokumentation und den Migrationsleitfaden finden Sie unter. Kinesis Client Library verwenden

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Node.js behandelt.

Bei der KCL handelt es sich um eine Java-Bibliothek. Die Unterstützung für andere Sprachen als Java erfolgt über eine mehrsprachige Schnittstelle namens. MultiLangDaemon Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für Node.js installieren und Ihre App für Privatanwender vollständig in Node.js schreiben, müssen Sie trotzdem Java auf Ihrem System installieren, und zwar aufgrund der. MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der MultiLangDaemon KCL-Projektseite.

Um die Node.js KCL von herunterzuladen GitHub, gehen Sie zur Kinesis Client Library (Node.js).

Downloads von Beispiel-Code

Es gibt zwei Code-Beispiele für die KCL in Node.js:

  • basic-sample

    Wird in den folgenden Abschnitten verwendet, um die Grundlagen zum Erstellen einer KCL-Konsumentenanwendung in Node.js zu zeigen.

  • click-stream-sample

    Etwas komplexere und verwendet ein reales Szenario, nachdem Sie sich mit dem grundlegenden Beispiel-Code vertraut gemacht haben. Dieses Beispiel wird hier nicht behandelt. Es gibt jedoch eine Readme-Datei mit weiteren Informationen.

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in Node.js implementieren:

Implementieren Sie den Record Processor

Der einfachste Konsument, der die KCL für Node.js verwenden kann, muss die Funktion recordProcessor implementieren. Diese enthält wiederum die Funktionen initialize, processRecords, und shutdown. Das Beispiel zeigt eine Implementierung, die Sie als Ausgangspunkt verwenden können (siehe sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
initialize

Die KCL ruft die Funktion initialize auf, wenn der Datensatzverarbeiter gestartet wird. Dieser Datensatzverarbeiter verarbeitet nur die Shard-ID, die als initializeInput.shardId übergeben wird. In der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Das liegt daran, dass Kinesis Data Streams eine Semantik nach dem Grundsatz mindestens einmal hat. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter Verwenden Sie Resharding, Skalierung und Parallelverarbeitung, um die Anzahl der Shards zu ändern.

initialize: function(initializeInput, completeCallback)
processRecords

Die KCL ruft diese Funktion mit einer Eingabe auf, die eine Liste von Datensätzen aus der für die Funktion initialize angegebenen Shard enthält. Der von Ihnen implementierte Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik Ihres Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern.

processRecords: function(processRecordsInput, completeCallback)

Zusätzlich zu den Daten enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel, die der Auftragnehmer beim Verarbeiten der Daten verwenden kann. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Das record-Anmeldeverzeichnis stellt die folgenden Schlüssel-Wert-Paare für den Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bereit:

record.data record.sequenceNumber record.partitionKey

Beachten Sie, dass die Daten Base64-kodiert sind.

Im einfachen Beispiel weist die Funktion processRecords Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Die KCL übernimmt die Nachverfolgung durch ein checkpointer-Objekt, das als processRecordsInput.checkpointer übergeben wird. Der Datensatzverarbeiter ruft die Funktion checkpointer.checkpoint auf, um die KCL über die Fortschritte zu informieren, die er beim Verarbeiten der Datensätze in der Shard gemacht hat. Wenn der Auftragnehmer fehlschlägt, verwendet die KCL diese Informationen, wenn Sie die Verarbeitung der Shard erneut starten, damit sie den Vorgang ab dem letzten bekannten Datensatz fortsetzt.

Im Fall einer Teilungs- oder Zusammenführungsoperation beginnt die KCL erst dann mit der Verarbeitung der neuen Shards, wenn die Verarbeiter für die ursprünglichen Shards checkpoint aufgerufen haben, um zu signalisieren, dass die Verarbeitung der ursprünglichen Shards vollständig abgeschlossen ist.

Wenn Sie die Sequenznummer nicht an die checkpoint-Funktion übergeben, nimmt die KCL an, dass der Aufruf von checkpoint bedeutet, dass alle Datensätze bis zum letzten Datensatz, der an den Datensatzverarbeiter übergeben wurde, verarbeitet wurden. Daher sollte der Datensatzverarbeiter die Methode checkpoint erst aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen checkpoint nicht bei jedem Aufruf von processRecords aufrufen. Ein Verarbeiter könnte beispielsweise checkpoint bei jedem dritten Aufruf oder beim Eintritt eines Ereignisses außerhalb Ihres Datensatzverarbeiters aufrufen, beispielsweise eines von Ihnen implementierten Verifizierung-/Validierungs-Service.

Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für checkpoint angeben. In diesem Fall nimmt die KCL an, dass alle Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Die einfache Beispielanwendung zeigt den einfachsten möglichen Aufruf der Funktion checkpointer.checkpoint. Sie können weitere Checkpoint-Logik hinzufügen, die Sie an diesem Punkt in der Funktion für Ihren Konsumenten benötigen.

shutdown

Die KCL ruft die Funktion shutdown entweder auf, wenn die Verarbeitung beendet wird (shutdownInput.reason ist TERMINATE) oder wenn der Auftragnehmer nicht mehr reagiert (shutdownInput.reason ist ZOMBIE).

shutdown: function(shutdownInput, completeCallback)

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, entweder weil die Shard geteilt oder zusammengeführt wurde oder weil der Stream gelöscht wurde.

Die KCL übergibt auch ein shutdownInput.checkpointer-Objekt an shutdown. Wenn der Grund für das Herunterfahren TERMINATE ist, sollten Sie sicherstellen, dass der Datensatzverarbeiter die Verarbeitung aller Datensätze fertiggestellt hat, und dann die Funktion checkpoint in seiner Schnittstelle aufrufen.

Ändern Sie die Konfigurationseigenschaften

Das Beispiel zeigt Standardwerte für die Konfigurationseigenschaften. Sie können diese Eigenschaften mit eigenen Werten überschreiben (siehe sample.properties im einfachen Beispiel).

Anwendungsname

Die KCL erfordert eine Anwendung, die unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:

  • Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.

  • Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der KCL-Consumer-Anwendung verarbeitet wurden.

Richten Sie Anmeldeinformationen ein

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der Standardanmeldedienstanbieter zur Verfügung stellen. Sie können die Eigenschaft AWSCredentialsProvider verwenden, um einen Anmeldeinformationsanbieter einzurichten. Die sample.properties-Datei muss Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Anmeldeinformationsanbieter-Standardkette bereitstellen. Wenn Sie Ihren Consumer auf einer EC2 Amazon-Instance ausführen, empfehlen wir Ihnen, die Instance mit einer IAM-Rolle zu konfigurieren. AWS Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden den Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Methode zur Verwaltung von Anmeldeinformationen für eine Verbraucheranwendung, die auf einer EC2 Instance ausgeführt wird.

Im folgenden Beispiel wird eine KCL konfiguriert, um einen Kinesis-Datenstrom namens kclnodejssample mittels des Datensatzverarbeiters zu verarbeiten, der in sample_kcl_app.js bereitgestellt wird:

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON
DatenschutzNutzungsbedingungen für die WebsiteCookie-Einstellungen
© 2025, Amazon Web Services, Inc. oder Tochtergesellschaften. Alle Rechte vorbehalten.