Schreiben in den Kinesis Data Streams mit der KPL - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schreiben in den Kinesis Data Streams mit der KPL

In den folgenden Abschnitten wird ein Beispiel-Code in Entwicklung gezeigt (vom reinen Produzenten bis hin zum komplexen asynchronen Code).

Reiner Produzentencode

Der folgende Code reicht für die Entwicklung eines einfachen Produzenten aus. Die Benutzerdatensätze der Kinesis Producer Library (KPL) werden im Hintergrund verarbeitet.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Synchrone Reaktion auf Ergebnisse

Im vorherigen Beispiel wird vom Code nicht geprüft, ob die KPL-Benutzerdatensätze erfolgreich verarbeitet wurden. Die KPL führt alle Wiederholungen durch, die notwendig sind, um Ausfälle zu kompensieren. Wenn Sie jedoch die Ergebnisse prüfen möchten, können Sie sich diese, wie im folgenden Beispiel gezeigt, mit den Future-Objekten ansehen, die von addUserRecord zurückgegeben werden (vorheriges Beispiel aus Kontextgründen erneut aufgeführt).

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Asynchrone Reaktion auf Ergebnisse

Im vorherigen Beispiel wird get() auf einem Future-Objekt aufgerufen, wodurch die Ausführung blockiert wird. Wenn Sie die Ausführung nicht blockieren möchten, können Sie einen asynchronen Callback nutzen, wie im folgenden Beispiel gezeigt:

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }