Desarrolle un consumidor de la biblioteca de clientes de Kinesis en Java - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrolle un consumidor de la biblioteca de clientes de Kinesis en Java

Puede usar la biblioteca de clientes de Kinesis (KCL) para crear aplicaciones que procesen datos de sus transmisiones de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Java. Para ver la referencia de Javadoc, consulte el tema AWS Javadoc de Class. AmazonKinesisClient

Para descargar Java KCL desde GitHub, vaya a la biblioteca de clientes de Kinesis (Java). Para localizar Java KCL en Apache Maven, vaya a la página de resultados de la KCL búsqueda. Para descargar un ejemplo de código para una aplicación de KCL consumo de Java GitHub, vaya a la página de ejemplo del proyecto KCL para Java en GitHub.

La aplicación de muestra utiliza Apache Commons Logging. Puede cambiar la configuración de registro en el método estático configure definido en el archivo AmazonKinesisApplicationSample.java. Para obtener más información sobre cómo utilizar el registro de Apache Commons con Log4j y aplicaciones AWS Java, consulte Cómo iniciar sesión con Log4j en la guía para desarrolladores.AWS SDK for Java

Debe realizar las siguientes tareas al implementar una aplicación para KCL consumidores en Java:

Implemente los IRecordProcessor métodos

KCLActualmente, admite dos versiones de la IRecordProcessor interfaz: la interfaz original está disponible con la primera versión y la versión 2 está disponible a partir de la KCL versión 1.5.0. KCL Ambas interfaces son totalmente compatibles. La elección dependerá de su situación específica. Consulte sus javadocs locales o el código fuente para ver todas las diferencias. En las siguientes secciones se describe la implementación mínima introductoria.

Interfaz original (versión 1)

La interfaz original IRecordProcessor (package com.amazonaws.services.kinesis.clientlibrary.interfaces) expone los siguientes métodos del procesador de registros que el consumidor debe implementar. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte AmazonKinesisApplicationSampleRecordProcessor.java).

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
initialize

KCLLlama al initialize método cuando se crea una instancia del procesador de registros y pasa un identificador de fragmento específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Kinesis Data Streams tiene una semántica de al menos una vez, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte Utilice la refragmentación, el escalado y el procesamiento paralelo para cambiar la cantidad de fragmentos.

public void initialize(String shardId)
processRecords

KCLLlama al processRecords método y pasa una lista de registros de datos del fragmento especificado por el método. initialize(shardId) El procesador de registros procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase Record expone los siguientes métodos que proporcionan acceso a los datos, el número secuencial y la clave de partición del registro.

record.getData() record.getSequenceNumber() record.getPartitionKey()

En el ejemplo, el método privado processRecordsWithRetries tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. Se KCL encarga de este seguimiento pasando un punto de control (IRecordProcessorCheckpointer) a. processRecords El procesador de registros llama al checkpoint método en esta interfaz para informar sobre cuánto ha avanzado en el procesamiento de los registros del fragmento. KCL Si el trabajador falla, KCL utiliza esta información para reiniciar el procesamiento del fragmento en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no empezará a procesar los nuevos fragmentos hasta que los procesadores de los fragmentos originales llamen checkpoint para indicar que se ha completado todo el procesamiento de los fragmentos originales.

Si no se pasa ningún parámetro, se KCL supone que la llamada a checkpoint significa que se han procesado todos los registros, hasta el último registro que se ha pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a checkpoint después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a checkpoint en cada llamada a processRecords. Un procesador podría, por ejemplo, llamar a checkpoint cada tercera vez que llame a processRecords. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para checkpoint. En este caso, se KCL supone que todos los registros se han procesado únicamente hasta ese registro.

En el ejemplo, el método privado checkpoint muestra cómo llamar a IRecordProcessorCheckpointer.checkpoint mediante la administración de excepciones y la lógica de reintentos apropiadas.

Se KCL basa en processRecords gestionar cualquier excepción que surja del procesamiento de los registros de datos. Si se produce una excepciónprocessRecords, KCL omite los registros de datos que se pasaron antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

shutdown

KCLLlama al shutdown método cuando finaliza el procesamiento (el motivo del cierre esTERMINATE) o el trabajador ya no responde (el motivo del cierre esZOMBIE).

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCLTambién pasa una IRecordProcessorCheckpointer interfaz ashutdown. Si el motivo del shutdown es TERMINATE, el procesador de registros debería terminar de procesar los registros de datos y llamar al método checkpoint en esta interfaz.

Interfaz actualizada (versión 2)

La interfaz actualizada IRecordProcessor (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) expone los siguientes métodos del procesador de registros que el consumidor debe implementar:

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

Se puede obtener acceso a todos los argumentos de la versión original de la interfaz mediante métodos "get" en los objetos del contenedor. Por ejemplo, para recuperar la lista de registros en processRecords(), puede utilizar processRecordsInput.getRecords().

A partir de la versión 2 de esta interfaz (KCL1.5.0 y posteriores), están disponibles las siguientes entradas nuevas además de las entradas proporcionadas por la interfaz original:

starting sequence number

En el objeto InitializationInput que se pasa a la operación initialize() el número secuencial inicial a partir del cual se facilitan los registros a la instancia del procesador de registros. Este es el último número secuencial objeto de un punto de comprobación por parte de la instancia del procesador de registros que procesara anteriormente el mismo fragmento. Estos datos se ofrecen por si su aplicación necesitara esta información.

pending checkpoint sequence number

En el objeto InitializationInput que se pasa a la operación initialize(), el número secuencial pendiente de punto de comprobación (si hay alguno) que no se ha podido confirmar antes de que se detuviera la instancia anterior del procesador de registros.

Implemente una fábrica de clases para la interfaz IRecordProcessor

También necesitará implementar un generador para la clase que implementa los métodos del procesador de registros. Cuando el consumidor crea instancias del proceso de trabajo, pasa una referencia a este generador.

La muestra implementa el generador de clases en el archivo AmazonKinesisApplicationSampleRecordProcessorFactory.java mediante la interfaz del procesador de registros original. Si desea que el generador de clases cree procesadores de registros de la versión 2, utilice el nombre de paquete com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

Crea un trabajador

Como se explica enImplemente los IRecordProcessor métodos, hay dos versiones de la interfaz del procesador de KCL registros entre las que elegir, lo que afecta a la forma en que se crearía un trabajador. La interfaz de procesador de registros original utiliza la siguiente estructura de código para crear un proceso de trabajo:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

Con la versión 2 del procesador de registros, puede utilizar la Worker.Builder para crear un proceso de trabajo sin preocuparse por qué constructor utilizar ni por el orden de los argumentos. La interfaz de procesador de registros actualizada utiliza la siguiente estructura de código para crear un proceso de trabajo:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

Modifique las propiedades de configuración

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Este datos de configuración del proceso de trabajo se consolidan posteriormente en un objeto KinesisClientLibConfiguration. Este objeto y una referencia al generador de clases de IRecordProcessor se pasan en la llamada que crea la instancia del proceso de trabajo. Puede sobrescribir cualquiera de estas propiedades con sus propios valores a través de un archivo de propiedades de Java (consulte AmazonKinesisApplicationSample.java).

Nombre de la aplicación

KCLRequiere un nombre de aplicación que sea único en todas sus aplicaciones y en todas las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:

  • Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta una instancia adicional del mismo código de aplicación, pero con un nombre de aplicación diferente, KCL trata la segunda instancia como una aplicación completamente independiente que también funciona en la misma transmisión.

  • KCLCrea una tabla de DynamoDB con el nombre de la aplicación y la utiliza para mantener la información de estado (como los puntos de control y el mapeo de fragmentos de trabajo) de la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte Utilice una tabla de arrendamientos para realizar un seguimiento de los fragmentos procesados por la aplicación de consumo KCL.

Configuración de credenciales

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Por ejemplo, si ejecuta su consumidor en una EC2 instancia, le recomendamos que lance la instancia con un IAM rol. AWS Las credenciales que reflejan los permisos asociados a este IAM rol se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de la instancia. Esta es la forma más segura de administrar las credenciales de un consumidor que ejecuta una EC2 instancia.

La aplicación de ejemplo primero intenta recuperar IAM las credenciales de los metadatos de la instancia:

credentialsProvider = new InstanceProfileCredentialsProvider();

Si la aplicación de muestra no puede obtener credenciales de los metadatos de la instancia, intenta recuperar las credenciales desde un archivo de propiedades:

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

Para obtener más información sobre los metadatos de instancia, consulta Metadatos de instancia en la Guía del EC2 usuario de Amazon.

Usa el ID de trabajador para varias instancias

El código de inicialización de muestra crea un ID para el proceso de trabajo, workerId, con el nombre del equipo local y un identificador global único anexo, tal y como se muestra en el siguiente fragmento de código. Este enfoque es compatible con un escenario con varias instancias de la aplicación consumidora ejecutándose en un único equipo.

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

Migre a la versión 2 de la interfaz del procesador de registros

Si desea migrar código que utilice la interfaz original, además de los pasos descritos anteriormente, tendrá que seguir estos pasos:

  1. Cambie la clase de su procesador de registros para importar la versión 2 de la interfaz del procesador de registros:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. Cambiar las referencias a las entradas para usar métodos get en los objetos del contenedor. Por ejemplo, en la operación shutdown(), cambie "checkpointer" por "shutdownInput.getCheckpointer()".

  3. Cambie la clase del generador de procesadores de registros para importar la interfaz del generador de procesadores de registros de la versión 2:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. Cambie la construcción del proceso de trabajo para usar Worker.Builder. Por ejemplo:

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();