Abonnieren von Amazon Kinesis Data Streams - AWS SDK for Java 2.x

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Abonnieren von Amazon Kinesis Data Streams

Die folgenden Beispiele zeigen Ihnen, wie Sie mit subscribeToShard dieser Methode Daten aus Amazon Kinesis Datenströmen abrufen und verarbeiten. Kinesis Data Streamsverwendet jetzt die erweiterte Fanout-Funktion und eine HTTP/2-Datenabruf-API mit niedriger Latenz, was es Entwicklern erleichtert, mehrere Hochleistungsanwendungen mit niedriger Latenz auf demselben Datenstream auszuführen. Kinesis

Einrichten

Erstellen Sie zunächst einen asynchronen Client und ein ObjektKinesis. SubscribeToShardRequest Diese Objekte werden in jedem der folgenden Beispiele zum Abonnieren von Kinesis-Ereignissen verwendet.

Importe

import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

Code

Region region = Region.US_EAST_1; KinesisAsyncClient client = KinesisAsyncClient.builder() .region(region) .build(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId("arn:aws:kinesis:us-east-1:111122223333:stream/StockTradeStream") .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();

Verwenden Sie die Builder-Schnittstelle

Sie können die builder Methode verwenden, um die Erstellung von zu vereinfachen SubscribeToShardResponseHandler.

Mit dem Builder können Sie jeden Lebenszyklusrückruf mit einem Methodenaufruf versehen, anstatt die vollständige Schnittstelle zu implementieren.

Code

private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onComplete(() -> System.out.println("All records stream successfully")) // Must supply some type of subscriber .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

Um mehr Kontrolle über den Herausgeber zu erhalten, können Sie ihn mit der Methode publisherTransformer anzupassen.

Code

private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100)) .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

Das vollständige Beispiel finden Sie unter GitHub.

Verwenden Sie einen benutzerdefinierten Antworthandler

Implementieren Sie die SubscribeToShardResponseHandler Schnittstelle, um die vollständige Kontrolle über den Abonnenten und den Herausgeber zu erhalten.

In diesem Beispiel implementieren Sie die Methode onEventStream, die Vollzugriff auf den Herausgeber ermöglicht. Hier wird veranschaulicht, wie der Herausgeber für durch den Abonnenten zu druckende Ereignisdatensätze transformiert wird.

Code

private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() { @Override public void responseReceived(SubscribeToShardResponse response) { System.out.println("Receieved initial response"); } @Override public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { publisher // Filter to only SubscribeToShardEvents .filter(SubscribeToShardEvent.class) // Flat map into a publisher of just records .flatMapIterable(SubscribeToShardEvent::records) // Limit to 1000 total records .limit(1000) // Batch records into lists of 25 .buffer(25) // Print out each record batch .subscribe(batch -> System.out.println("Record Batch - " + batch)); } @Override public void complete() { System.out.println("All records stream successfully"); } @Override public void exceptionOccurred(Throwable throwable) { System.err.println("Error during stream - " + throwable.getMessage()); } }; return client.subscribeToShard(request, responseHandler); }

Das vollständige Beispiel finden Sie unter GitHub.

Verwenden Sie die Besucherschnittstelle

Sie können mithilfe eines Visitor-Objekts bestimmte Ereignisse abonnieren, an denen Sie interessiert sind.

Code

private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor .builder() .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e)) .build(); SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); }

Das vollständige Beispiel finden Sie unter GitHub.

Verwenden Sie einen benutzerdefinierten Abonnenten

Sie können auch Ihren eigenen benutzerdefinierten Abonnenten implementieren, um den Stream zu abonnieren.

Dieser Codeausschnitt zeigt einen Beispiel für einen Abonnenten.

Code

private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> { private Subscription subscription; private AtomicInteger eventCount = new AtomicInteger(0); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) { System.out.println("Received event " + shardSubscriptionEventStream); if (eventCount.incrementAndGet() >= 100) { // You can cancel the subscription at any time if you wish to stop receiving events. subscription.cancel(); } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error occurred while stream - " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Finished streaming all events"); } }

Sie können den benutzerdefinierten Abonnenten an die subscribe Methode übergeben, wie im folgenden Codeausschnitt gezeigt.

Code

private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(MySubscriber::new) .build(); return client.subscribeToShard(request, responseHandler); }

Das vollständige Beispiel finden Sie unter. GitHub

Schreiben Sie Datensätze in einen Kinesis Datenstrom

Sie können das KinesisClientObjekt verwenden, um Datensätze in einen Kinesis Datenstrom zu schreiben, indem Sie die putRecords Methode verwenden. Um diese Methode erfolgreich aufzurufen, erstellen Sie ein PutRecordsRequestObjekt. Sie übergeben den Namen des Datenstroms an die streamName Methode. Darüber hinaus müssen Sie die Daten mit der Methode putRecords übergeben (wie im folgenden Codebeispiel gezeigt).

Importe

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;

Beachten Sie im folgenden Java-Codebeispiel, dass das StockTradeObjekt als Daten verwendet wird, um in den Kinesis Datenstrom zu schreiben. Bevor Sie dieses Beispiel ausführen, stellen Sie sicher, dass Sie den Datenstream erstellt haben.

Code

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }

Das vollständige Beispiel finden Sie unter GitHub.

Verwenden Sie eine Bibliothek eines Drittanbieters

Sie können andere Drittanbieter-Bibliotheken verwenden, anstatt eine benutzerdefinierten Abonnenten zu implementieren. In diesem Beispiel wird die Verwendung der RxJava Implementierung demonstriert. Sie können jedoch jede Bibliothek verwenden, die die Reactive Streams-Schnittstellen implementiert. Weitere Informationen zu dieser Bibliothek finden Sie auf der RxJava Wiki-Seite auf Github.

Um die Bibliothek zu verwenden, fügen Sie sie als Abhängigkeit hinzu. Bei Einsatz von Maven zeigt das Beispiel den zu verwendenden POM-Ausschnitt.

POM-Eintrag

<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>

Importe

import java.net.URI; import java.util.concurrent.CompletableFuture; import io.reactivex.Flowable; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.utils.AttributeMap;

In diesem Beispiel wird die onEventStream Lifecycle-Methode verwendet RxJava . Dadurch haben Sie Vollzugriff auf den Herausgeber, der zum Erstellen einer Rx Flowable verwendet werden kann.

Code

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onEventStream(p -> Flowable.fromPublisher(p) .ofType(SubscribeToShardEvent.class) .flatMapIterable(SubscribeToShardEvent::records) .limit(1000) .buffer(25) .subscribe(e -> System.out.println("Record batch = " + e))) .build();

Sie können bei dem Herausgeber Flowable auch die Methode publisherTransformer verwenden. Sie müssen den Flowable Herausgeber an eine anpassen SdkPublisher, wie im folgenden Beispiel gezeigt.

Code

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100))) .build();

Das vollständige Beispiel finden Sie unter GitHub.

Weitere Informationen