Entwickeln eines Kinesis Client Library-Verbrauchers in .NET - Amazon Kinesis Data Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Entwickeln eines Kinesis Client Library-Verbrauchers in .NET

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird .NET behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle namens bereitgestelltMultiLangDaemon. Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für .NET installieren und Ihre Konsumenten-App vollständig in .NET schreiben, benötigen Sie aufgrund der trotzdem Java auf Ihrem System MultiLangDaemon. Darüber hinaus MultiLangDaemon verfügt über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der es eine Verbindung herstellt. Weitere Informationen über die MultiLangDaemon auf finden GitHubSie auf der KCL- MultiLangDaemon Projektseite.

Um die .NET KCL von herunterzuladen GitHub, gehen Sie zur Kinesis Client Library (.NET). Um Beispielcode für eine .NET-KCL-Konsumentenanwendung herunterzuladen, gehen Sie auf die Seite KCL für .NET-Beispielkonsumentenprojekt auf GitHub.

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in .NET implementieren:

Implementieren der I-RecordProcessor Klassenmethoden

Der Konsument muss die folgenden Methoden für IRecordProcessor implementieren. Der Konsument im Beispiel stellt Implementierungen bereit, die Sie als Ausgangspunkt verwenden können (siehe die SampleRecordProcessor-Klasse in SampleConsumer/AmazonKinesisSampleConsumer.cs).

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)
Initialisieren

Die KCL ruft diese Methode auf, wenn der Datensatzverarbeiter instanziiert wird, und übergibt eine spezifische Shard-ID an den input-Parameter (input.ShardId). Dieser Datensatzverarbeiter verarbeitet nur diese Shard und in der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Das liegt daran, dass Kinesis Data Streams eine Semantik nach dem Grundsatz mindestens einmal hat. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter Resharding, Skalierung und Parallelverarbeitung.

public void Initialize(InitializationInput input)
ProcessRecords

Die KCL ruft diese Methode auf und übergibt eine Liste der Datensätze an den input-Parameter (input.Records) aus der Shard, die von der Methode Initialize angegeben wird. Der von Ihnen implementierte Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik Ihres Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern.

public void ProcessRecords(ProcessRecordsInput input)

Zusätzlich zu den Daten selbst enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel. Der Auftragnehmer kann diese Werte beim Verarbeiten der Daten verwenden. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Die Klasse Record stellt die folgenden Methoden bereit, die Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bieten:

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

Im Beispiel weist die Methode ProcessRecordsWithRetries Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Die KCL übernimmt diese Nachverfolgung für Sie, indem ein Checkpointer-Objekt an ProcessRecords (input.Checkpointer) übergeben wird. Der Datensatzverarbeiter ruft die Methode Checkpointer.Checkpoint auf, um die KCL über die Fortschritte zu informieren, die sie beim Verarbeiten der Datensätze in der Shard gemacht hat. Wenn der Auftragnehmer fehlschlägt, verwendet die KCL diese Informationen, um die Verarbeitung der Shard mit dem letzten bekannten Datensatz neu zu starten.

Im Fall einer Teilungs- oder Zusammenführungsoperation beginnt die KCL erst dann mit der Verarbeitung der neuen Shards, wenn die Verarbeiter für die ursprünglichen Shards Checkpointer.Checkpoint aufgerufen haben, um zu signalisieren, dass die Verarbeitung der ursprünglichen Shards vollständig abgeschlossen ist.

Wenn Sie keinen Parameter übergeben, nimmt die KCL an, dass der Aufruf von Checkpointer.Checkpoint bedeutet, dass alle Datensätze bis zum letzten Datensatz, der an den Datensatzverarbeiter übergeben wurde, verarbeitet wurden. Daher sollte der Datensatzverarbeiter die Methode Checkpointer.Checkpoint erst aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen Checkpointer.Checkpoint nicht bei jedem Aufruf von ProcessRecords aufrufen. Ein Prozessor könnte beispielsweise Checkpointer.Checkpoint bei jedem dritten oder vierten Aufruf aufrufen. Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für Checkpointer.Checkpoint angeben. In diesem Fall nimmt die KCL an, dass die Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Im Beispiel zeigt die private Methode Checkpoint(Checkpointer checkpointer), wie die Checkpointer.Checkpoint-Methode mithilfe der entsprechenden Ausnahmebehandlung und Wiederholungslogik aufgerufen wird.

Die KCL für .NET verarbeitet Ausnahmen anders als andere KCL-Sprachbibliotheken, da sie keine Ausnahmen verarbeitet, die aus der Verarbeitung der Datensätze entstanden sind. Alle nicht abgefangenen Ausnahmen vom Benutzer-Code bringen das Programm zum Absturz.

Herunterfahren

Die KPL ruft die Methode Shutdown entweder auf, wenn die Verarbeitung beendet wird (Grund für das Herunterfahren ist TERMINATE) oder wenn der Auftragnehmer nicht mehr reagiert (der input.Reason-Wert für das Herunterfahren ist ZOMBIE).

public void Shutdown(ShutdownInput input)

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, weil die Shard geteilt oder zusammengeführt wurde oder der Stream gelöscht wurde.

Die KCL übergibt auch ein Checkpointer-Objekt an shutdown. Wenn der Grund für das Herunterfahren TERMINATE ist, sollte der Datensatzverarbeiter alle Datensätze fertigstellen und dann die Methode checkpoint in seiner Schnittstelle aufrufen.

Ändern der Konfigurationseigenschaften

Der Beispielkonsument zeigt Standardwerte für die Konfigurationseigenschaften. Sie können diese Eigenschaften mit eigenen Werten überschreiben (siehe SampleConsumer/kcl.properties).

Anwendungsname

Die KCL erfordert eine Anwendung, die unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:

  • Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.

  • Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter Mithilfe einer Leasetabelle können Sie die von der KCL-Konsumentenanwendung verarbeiteten Shards verfolgen.

Einrichten von Anmeldeinformationen

Sie müssen Ihre AWS-Anmeldeinformationen für einen der Anmeldeinformationsanbieter in der Anmeldeinformationsanbieter-Standardkette bereitstellen. Sie können die Eigenschaft AWSCredentialsProvider verwenden, um einen Anmeldeinformationsanbieter einzurichten. Die sample.properties muss Ihre Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Anmeldeinformationsanbieter-Standardkette bereitstellen. Wenn Sie Ihre Konsumentenanwendung auf einer EC2-Instance ausführen, empfehlen wir, die Instance mit einer IAM-Rolle zu konfigurieren. AWS-Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden den Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Art, Anmeldeinformationen für einen Konsumenten zu verwalten, der auf einer EC2-Instance ausgeführt wird.

Die Eigenschaftendatei des Beispiels konfiguriert KCL, um einen Kinesis-Datenstrom namens „words“ mittels des Datensatzverarbeiters zu verarbeiten, der in AmazonKinesisSampleConsumer.cs bereitgestellt wird.