Escritura en los flujos de datos de Kinesis mediante KPL - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Escritura en los flujos de datos de Kinesis mediante KPL

En las secciones siguientes, se muestra un código de muestra con una progresión que va desde el productor más básico y sencillo posible hasta un código completamente asíncrono.

Código de productor barebones

El siguiente código es todo lo que necesita para escribir un productor mínimamente funcional. Los registros de usuario de Kinesis Producer Library (KPL) se procesan en segundo plano.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Respuesta síncrona a los resultados

En el ejemplo anterior, el código no comprobó si el procesamiento de registros de usuario de KPL finalizó correctamente. KPL efectúa los reintentos necesarios en caso de error. Sin embargo, si desea comprobar los resultados, puede examinarlos con los objetos Future que devuelve addUserRecord, como en el siguiente ejemplo (el ejemplo anterior se incluye para aportar contexto):

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Respuesta asíncrona a los resultados

El ejemplo anterior llama a get() en un objeto Future que bloquea la ejecución. Si no desea bloquear la ejecución, puede utilizar una devolución de llamada asíncrona, tal y como se muestra en el ejemplo siguiente:

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }