Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Multistream-Verarbeitung mit KCL
In diesem Abschnitt werden die erforderlichen Änderungen in KCL beschrieben, die es Ihnen ermöglichen, KCL-Consumer-Anwendungen zu erstellen, die mehr als einen Datenstrom gleichzeitig verarbeiten können.
Wichtig
-
Die Multistream-Verarbeitung wird nur in KCL 2.3 oder höher unterstützt.
-
Die Multistream-Verarbeitung wird nicht für KCL-Benutzer unterstützt, die in anderen Sprachen als Java geschrieben sind und mit ausgeführt werden.
multilangdaemon
-
Die Multistream-Verarbeitung wird in keiner Version von KCL 1.x unterstützt.
-
MultistreamTracker Schnittstelle
-
Um eine Verbraucheranwendung zu erstellen, die mehrere Streams gleichzeitig verarbeiten kann, müssen Sie eine neue Schnittstelle namens implementieren MultistreamTracker
. Diese Schnittstelle enthält die streamConfigList
-Methode, die die Liste der Datenströme und ihrer Konfigurationen zurückgibt, die von der KCL-Konsumentenanwendung verarbeitet werden sollen. Beachten Sie, dass die Datenströme, die verarbeitet werden, während der Laufzeit der Verbraucheranwendung geändert werden können.streamConfigList
wird regelmäßig von KCL aufgerufen, um mehr über die Änderungen der zu verarbeitenden Datenströme zu erfahren. -
Das
streamConfigList
füllt die Liste auf StreamConfig.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
Die Felder
StreamIdentifier
undInitialPositionInStreamExtended
sind Pflichtfelder, während sie optionalconsumerArn
sind. Sie müssen dasconsumerArn
nur angeben, wenn Sie KCL verwenden, um eine erweiterte Fan-Out-Anwendung für Privatanwender zu implementieren. -
Weitere Informationen
StreamIdentifier
dazu finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129.Um eine zu erstellen StreamIdentifier
, empfehlen wir, eine Multistream-Instanz aus demstreamArn
und dem zu erstellenstreamCreationEpoch
, das in KCL 2.5.0 oder höher verfügbar ist. Erstellen Sie in KCL v2.3 und v2.4, die dies nicht unterstützenstreamArm
, eine Multistream-Instanz mithilfe des folgenden Formats.account-id:StreamName:streamCreationTimestamp
Dieses Format ist veraltet und wird ab der nächsten Hauptversion nicht mehr unterstützt. -
MultistreamTracker beinhaltet auch eine Strategie zum Löschen von Leasingverträgen alter Streams in der Leasetabelle (). formerStreamsLeases DeletionStrategy Beachten Sie, dass die Strategie während der Laufzeit der Konsumentenanwendung NICHT geändert werden kann. Weitere Informationen finden Sie unter https://github.com/awslabs/amazon-kinesis-clientb/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy/blob/0c5042dadf794fe988438436252a5a8fe70b6b0
.java.
-
Oder Sie können ConfigsBuilder mit initialisieren, MultiStreamTracker
wenn Sie eine KCL-Consumer-Anwendung implementieren möchten, die mehrere Streams gleichzeitig verarbeitet.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Da die Multistream-Unterstützung für Ihre KCL-Consumer-Anwendung implementiert ist, enthält jede Zeile der Leasetabelle der Anwendung jetzt die Shard-ID und den Stream-Namen der mehreren Datenströme, die diese Anwendung verarbeitet.
-
Wenn Multistream-Unterstützung für Ihre KCL-Verbraucheranwendung implementiert ist, hat der LeaseKey die folgende Struktur:.
account-id:StreamName:streamCreationTimestamp:ShardId
Beispiel,111111111:multiStreamTest-1:12345:shardId-000000000336
.
Wichtig
Wenn Ihre bestehende KCL-Consumer-Anwendung so konfiguriert ist, dass sie nur einen Datenstrom verarbeitet, ist der leaseKey
(der Partitionsschlüssel für die Leasetabelle) die Shard-ID. Wenn Sie eine bestehende KCL-Consumer-Anwendung so umkonfigurieren, dass sie mehrere Datenströme verarbeitet, wird Ihre Leasing-Tabelle beschädigt, da die leaseKey
Struktur wie folgt aussehen muss: account-id:StreamName:StreamCreationTimestamp:ShardId
um Multistream zu unterstützen.