Ecriture dans le flux de données Kinesis à l'aide de KPL - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Ecriture dans le flux de données Kinesis à l'aide de KPL

Les sections suivantes présentent un exemple de code qui va de l'application producteur « bare-bones » la plus simple possible au code entièrement asynchrone.

Code producteur Barebones

Le code suivant contient tous les éléments nécessaires pour écrire une application producteur convenable minimale. Les enregistrements utilisateur Kinesis Producer Library (KPL) sont traités en arrière-plan.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Réponse synchrone aux résultats

Dans l'exemple précédent, le code n'a pas vérifié si les enregistrements utilisateur KPL ont abouti. Le KPL effectue toutes les tentatives nécessaires pour tenir compte des défaillances. Si vous voulez vérifier les résultats, vous pouvez les examiner à l'aide des objets Future renvoyés par addUserRecord, comme dans l'exemple suivant (exemple précédent indiqué pour le contexte) :

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Réponse asynchrone aux résultats

L'exemple précédent appelle get() sur un objet Future, ce qui bloque l'exécution. Si vous ne voulez pas bloquer l'exécution, vous pouvez utiliser un rappel asynchrone, comme illustré dans l'exemple suivant :

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }