Wählen Sie Ihre Cookie-Einstellungen aus

Wir verwenden essentielle Cookies und ähnliche Tools, die für die Bereitstellung unserer Website und Services erforderlich sind. Wir verwenden Performance-Cookies, um anonyme Statistiken zu sammeln, damit wir verstehen können, wie Kunden unsere Website nutzen, und Verbesserungen vornehmen können. Essentielle Cookies können nicht deaktiviert werden, aber Sie können auf „Anpassen“ oder „Ablehnen“ klicken, um Performance-Cookies abzulehnen.

Wenn Sie damit einverstanden sind, verwenden AWS und zugelassene Drittanbieter auch Cookies, um nützliche Features der Website bereitzustellen, Ihre Präferenzen zu speichern und relevante Inhalte, einschließlich relevanter Werbung, anzuzeigen. Um alle nicht notwendigen Cookies zu akzeptieren oder abzulehnen, klicken Sie auf „Akzeptieren“ oder „Ablehnen“. Um detailliertere Entscheidungen zu treffen, klicken Sie auf „Anpassen“.

Entwickeln Sie Verbraucher mit KCL in Java

Fokusmodus
Entwickeln Sie Verbraucher mit KCL in Java - Amazon Kinesis Data Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Voraussetzungen

Bevor Sie mit der Verwendung von KCL 3.x beginnen, stellen Sie sicher, dass Sie über Folgendes verfügen:

  • Java Development Kit (JDK) 8 oder höher

  • AWS SDK for Java 2.x

  • Maven oder Gradle für das Abhängigkeitsmanagement

KCL sammelt Metriken zur CPU-Auslastung, wie z. B. die CPU-Auslastung von dem Rechenhost, auf dem die Worker arbeiten, um die Last gleichmäßig zu verteilen und eine gleichmäßige Ressourcenauslastung für alle Mitarbeiter zu erreichen. Damit KCL die CPU-Nutzungsmetriken von Workern erfassen kann, müssen Sie die folgenden Voraussetzungen erfüllen:

Amazon Elastic Compute Cloud(Amazon EC2)

  • Ihr Betriebssystem muss Linux OS sein.

  • Sie müssen es IMDSv2in Ihrer EC2 Instanz aktivieren.

Amazon Elastic Container Service (Amazon ECS) auf Amazon EC2

Amazon ECS auf AWS Fargate

  • Sie müssen den Fargate-Task-Metadaten-Endpunkt Version 4 aktivieren. Wenn Sie die Fargate-Plattformversion 1.4.0 oder höher verwenden, ist dies standardmäßig aktiviert.

  • Fargate-Plattformversion 1.4.0 oder höher.

Amazon Elastic Kubernetes Service (Amazon EKS) auf Amazon EC2

  • Ihr Betriebssystem muss Linux OS sein.

Amazon EKS auf AWS Fargate

  • Fargate-Plattform 1.3.0 oder höher.

Wichtig

Wenn KCL keine Kennzahlen zur CPU-Auslastung von Mitarbeitern sammeln kann, greift KCL auf den Durchsatz pro Mitarbeiter zurück, um Leasingverträge zuzuweisen und die Auslastung auf die Mitarbeiter in der Flotte zu verteilen. Weitere Informationen finden Sie unter Wie KCL Mitarbeitern Leasingverträge zuweist und die Arbeitslast verteilt.

Installieren und fügen Sie Abhängigkeiten hinzu

Wenn Sie Maven verwenden, fügen Sie Ihrer pom.xml Datei die folgende Abhängigkeit hinzu. Stellen Sie sicher, dass Sie 3.x.x durch die neueste KCL-Version ersetzt haben.

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Wenn Sie Gradle verwenden, fügen Sie Ihrer Datei Folgendes hinzu. build.gradle Stellen Sie sicher, dass Sie 3.x.x durch die neueste KCL-Version ersetzt haben.

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Sie können im Maven Central Repository nach der neuesten Version der KCL suchen.

Implementieren Sie den Verbraucher

Eine KCL-Verbraucheranwendung besteht aus den folgenden Schlüsselkomponenten:

RecordProcessor

RecordProcessor ist die Kernkomponente, in der sich Ihre Geschäftslogik für die Verarbeitung von Kinesis-Datenstream-Datensätzen befindet. Es definiert, wie Ihre Anwendung die Daten verarbeitet, die sie vom Kinesis-Stream empfängt.

Wichtigste Aufgaben:

  • Initialisieren Sie die Verarbeitung für einen Shard

  • Batches von Datensätzen aus dem Kinesis-Stream verarbeiten

  • Die Verarbeitung für einen Shard herunterfahren (z. B. wenn der Shard geteilt oder zusammengeführt wird oder der Lease an einen anderen Host übergeben wird)

  • Verwalte Checkpoints, um den Fortschritt zu verfolgen

Im Folgenden wird ein Implementierungsbeispiel gezeigt:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

Im Folgenden finden Sie eine detaillierte Erläuterung der einzelnen in diesem Beispiel verwendeten Methoden:

initialisieren (InitializationInputInitializationInput)

  • Zweck: Richten Sie alle erforderlichen Ressourcen oder den Status für die Verarbeitung von Datensätzen ein.

  • Wann es aufgerufen wird: Einmal, wenn KCL diesem Datensatzprozessor einen Shard zuweist.

  • Die wichtigsten Punkte:

    • initializationInput.shardId(): Die ID des Shards, den dieser Prozessor verarbeiten wird.

    • initializationInput.extendedSequenceNumber(): Die Sequenznummer, von der aus die Verarbeitung gestartet werden soll.

processRecords () ProcessRecordsInput processRecordsInput

  • Zweck: Verarbeitet die eingehenden Datensätze und überprüft optional den Fortschritt.

  • Wann es aufgerufen wird: Wiederholt, solange der Datensatzprozessor den Leasingvertrag für den Shard hält.

  • Die wichtigsten Punkte:

    • processRecordsInput.records(): Liste der zu verarbeitenden Datensätze.

    • processRecordsInput.checkpointer(): Wird verwendet, um den Fortschritt zu überprüfen.

    • Stellen Sie sicher, dass Sie alle Ausnahmen während der Verarbeitung behandelt haben, um zu verhindern, dass KCL fehlschlägt.

    • Diese Methode sollte idempotent sein, da derselbe Datensatz in einigen Szenarien mehrfach verarbeitet werden kann, z. B. bei Daten, die vor einem unerwarteten Absturz oder Neustart des Workers nicht überprüft wurden.

    • Leeren Sie vor dem Checkpoint stets alle gepufferten Daten, um die Datenkonsistenz sicherzustellen.

LeaseLost () LeaseLostInput leaseLostInput

  • Zweck: Bereinigen Sie alle Ressourcen, die für die Verarbeitung dieses Shards spezifisch sind.

  • Wann es aufgerufen wird: Wenn ein anderer Scheduler den Lease für diesen Shard übernimmt.

  • Die wichtigsten Punkte:

    • Checkpointing ist bei dieser Methode nicht erlaubt.

ShardEnded () ShardEndedInput shardEndedInput

  • Zweck: Beenden Sie die Verarbeitung für diesen Shard und diesen Checkpoint.

  • Wann es aufgerufen wird: Wenn der Shard geteilt oder zusammengeführt wird, was bedeutet, dass alle Daten für diesen Shard verarbeitet wurden.

  • Die wichtigsten Punkte:

    • shardEndedInput.checkpointer(): Wird verwendet, um das letzte Checkpointing durchzuführen.

    • Checkpointing ist bei dieser Methode zwingend erforderlich, um die Verarbeitung abzuschließen.

    • Wenn die Daten und der Checkpoint hier nicht geleert werden, kann dies zu Datenverlust oder doppelter Verarbeitung führen, wenn der Shard erneut geöffnet wird.

ShutdownRequestedInput shutdownRequestedInputshutdownRequested ()

  • Zweck: Checkpoint und Bereinigen von Ressourcen, wenn KCL heruntergefahren wird.

  • Wann es aufgerufen wird: Wenn KCL heruntergefahren wird (z. B. wenn die Anwendung beendet wird).

  • Die wichtigsten Punkte:

    • shutdownRequestedInput.checkpointer(): Wird verwendet, um Checkpoints vor dem Herunterfahren durchzuführen.

    • Stellen Sie sicher, dass Sie Checkpointing in der Methode implementiert haben, damit der Fortschritt gespeichert wird, bevor die Anwendung beendet wird.

    • Wenn die Daten und der Checkpoint hier nicht geleert werden, kann dies zu Datenverlust oder zur erneuten Verarbeitung von Datensätzen führen, wenn die Anwendung neu gestartet wird.

Wichtig

KCL 3.x sorgt dafür, dass weniger Daten erneut verarbeitet werden, wenn der Mietvertrag von einem Mitarbeiter an einen anderen übergeben wird, indem ein Checkpoint ausgeführt wird, bevor der vorherige Mitarbeiter geschlossen wird. Wenn Sie die Checkpoint-Logik nicht in der shutdownRequested() Methode implementieren, werden Sie diesen Vorteil nicht sehen. Stellen Sie sicher, dass Sie in der Methode eine Checkpoint-Logik implementiert haben. shutdownRequested()

RecordProcessorFactory

RecordProcessorFactory ist verantwortlich für die Erstellung neuer RecordProcessor Instanzen. KCL verwendet diese Factory, um RecordProcessor für jeden Shard, den die Anwendung verarbeiten muss, eine neue zu erstellen.

Wichtigste Aufgaben:

  • Neue RecordProcessor Instanzen bei Bedarf erstellen

  • Stellen Sie sicher, dass jede ordnungsgemäß initialisiert RecordProcessor ist

Im Folgenden finden Sie ein Implementierungsbeispiel:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

In diesem Beispiel erstellt die Factory bei SampleRecordProcessor jedem Aufruf von shardRecordProcessor () ein neues. Sie können dies um jede erforderliche Initialisierungslogik erweitern.

Scheduler

Der Scheduler ist eine Komponente auf hoher Ebene, die alle Aktivitäten der KCL-Anwendung koordiniert. Es ist für die gesamte Orchestrierung der Datenverarbeitung verantwortlich.

Wichtigste Aufgaben:

  • Managen Sie den Lebenszyklus von RecordProcessors

  • Erledigen Sie das Leasingmanagement für Shards

  • Koordinieren Sie die Checkpoints

  • Verteilen Sie die Shard-Verarbeitungslast auf mehrere Worker Ihrer Anwendung

  • Bewältigen Sie Signale für ein ordnungsgemäßes Herunterfahren und Beenden von Anwendungen

Der Scheduler wird normalerweise in der Hauptanwendung erstellt und gestartet. Das Implementierungsbeispiel von Scheduler finden Sie im folgenden Abschnitt, Main Consumer Application.

Hauptanwendung für Privatanwender

Die Hauptanwendung für Verbraucher verbindet alle Komponenten miteinander. Sie ist verantwortlich für die Einrichtung des KCL-Consumer, die Erstellung der erforderlichen Clients, die Konfiguration des Schedulers und die Verwaltung des Lebenszyklus der Anwendung.

Wichtigste Aufgaben:

  • AWS Service-Clients einrichten (Kinesis, DynamoDB,) CloudWatch

  • Konfigurieren Sie die KCL-Anwendung

  • Erstellen und starten Sie den Scheduler

  • Behandelt das Herunterfahren der Anwendung

Im Folgenden finden Sie ein Implementierungsbeispiel:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

KCL erstellt standardmäßig einen Enhanced Fan-Out (EFO) -Consumer mit dediziertem Durchsatz. Weitere Informationen zu Enhanced Fan-Out finden Sie unter. Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz Wenn Sie weniger als 2 Verbraucher haben oder keine Verzögerungen bei der Leseverteilung unter 200 ms benötigen, müssen Sie im Scheduler-Objekt die folgende Konfiguration festlegen, um Verbraucher mit gemeinsamem Durchsatz zu verwenden:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

Der folgende Code ist ein Beispiel für die Erstellung eines Scheduler-Objekts, das Verbraucher mit gemeinsamem Durchsatz verwendet:

Importe:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

Kode:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/
DatenschutzNutzungsbedingungen für die WebsiteCookie-Einstellungen
© 2025, Amazon Web Services, Inc. oder Tochtergesellschaften. Alle Rechte vorbehalten.