Gravando em seu Kinesis Data Stream usando o KPL - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Gravando em seu Kinesis Data Stream usando o KPL

As seções a seguir mostram o código de exemplo em uma progressão desde o produtor de "barebones" mais simples possível até o código totalmente assíncrono.

Código de produtor de barebones

Só é preciso o código a seguir para gravar um produtor de trabalho mínimo. Os registros do usuário da Kinesis Producer Library (KPL) são processados em segundo plano.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Responder aos resultados de forma síncrona

No exemplo anterior, o código não verificou se os registros do usuário da KPL foram bem-sucedidos. O KPL realiza todas as novas tentativas necessárias para contabilizar as falhas. No entanto, para verificar os resultados, você poderá examiná-los usando os objetos Future que são retornados de addUserRecord, como no exemplo a seguir (exemplo anterior mostrado para fins de contexto):

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Responder aos resultados de forma assíncrona

O exemplo anterior está chamando get() para um objeto Future, o que bloqueia a execução. Se não quiser bloquear a execução, você poderá usar um retorno de chamada assíncrono, como mostrado no exemplo a seguir:

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }