Desarrollo de un consumidor de Kinesis Client Library en Node.js - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrollo de un consumidor de Kinesis Client Library en Node.js

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Node.js

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. MultiLangDaemon Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Node.js y escribe su aplicación para consumidores completamente en Node.js, seguirá necesitando instalar Java en su sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del MultiLangDaemon proyecto KCL.

Para descargar el archivo KCL de Node.js GitHub, vaya a la biblioteca de clientes de Kinesis (Node.js).

Descargas de código de muestra

Hay dos códigos de muestra disponibles para la KCL en Node.js:

  • basic-sample

    Se utiliza en las siguientes secciones para ilustrar los aspectos fundamentales de la creación de una aplicación de consumo de KCL en Node.js.

  • click-stream-sample

    Es ligeramente más avanzado y utiliza una situación real. Para cuando se haya familiarizado con el código de muestra básico. Esta muestra no se trata aquí, pero tiene un archivo README con más información.

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Node.js:

Implementar el procesador de registros

El consumidor más sencillo posible que utilice KCL para Node.js debe implementar una función recordProcessor, que a su vez contenga las funciones initialize, processRecords y shutdown. En la muestra se presenta una implementación que puede utilizar como punto de partida (consulte sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
initialize

KCL llama a la función initialize cuando se inicia el procesador de registros. Este procesador de registros procesa solo la ID de fragmento que se haya pasado como initializeInput.shardId, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de al menos una vez, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte Cambio en los fragmentos, escalado y procesamiento paralelo.

initialize: function(initializeInput, completeCallback)
processRecords

KCL llama a esta función con una entrada que contiene una lista de registros de datos de la partición especificados para la función initialize. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

processRecords: function(processRecordsInput, completeCallback)

Además de los datos en sí, el registro también contiene un número secuencial y una clave de partición, que el proceso de trabajo puede utilizar al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. El diccionario record expone los siguientes pares clave-valor para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

record.data record.sequenceNumber record.partitionKey

Tenga en cuenta que los datos se codifican en Base64.

En el ejemplo básico, la función processRecords tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento con un objeto checkpointer que pasa como processRecordsInput.checkpointer. Su procesador de registros llama a la función checkpointer.checkpoint para informar a KCL de su avance en el procesamiento de los registros de la partición. En el caso de que se produzca un error en el proceso de trabajo, KCL utiliza esta información al reiniciar el procesamiento de la partición para continuar desde el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a checkpoint para indicar que se ha completado el procesamiento en las particiones originales.

Si no pasa el número secuencial a la función checkpoint, KCL supone que la llamada a checkpoint significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros debería llamar a checkpoint solo después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a checkpoint en cada llamada a processRecords. Un procesador podría, por ejemplo, llamar a checkpoint en cada tercera llamada, o a algún evento externo a su procesador de registros, como un servicio de validación/verificación personalizado que haya implementado.

Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para checkpoint. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

La aplicación de muestra básica muestra la llamada más sencilla posible a la función checkpointer.checkpoint. Puede agregar otra lógica de creación de puntos de comprobación que necesite para su consumidor en este punto en la función.

shutdown

KCL llama a la función shutdown cuando finaliza el procesamiento (shutdownInput.reason es TERMINATE) o cuando el proceso de trabajo ya no responde (shutdownInput.reason es ZOMBIE).

shutdown: function(shutdownInput, completeCallback)

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto shutdownInput.checkpointer a shutdown. Si el motivo del cierre es TERMINATE, debe asegurarse de que el procesador de registros haya terminado de procesar los registros de datos y después llame a la función checkpoint en esta interfaz.

Modificación de las propiedades de configuración

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte sample.properties en el ejemplo básico).

Nombre de la aplicación

KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:

  • Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.

  • KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte Uso de una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL.

Configuración de credenciales

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad AWSCredentialsProvider para configurar un proveedor de credenciales. El archivo sample.properties debe poner sus credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Si ejecuta su consumidor en una instancia de Amazon EC2, le recomendamos que configure la instancia con un rol de IAM. AWS Las credenciales que reflejan los permisos asociados a esta función de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de la instancia. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

En el siguiente ejemplo, se configura KCL para procesar un flujo de datos de Kinesis denominado kclnodejssample utilizando el procesador de registros facilitado en sample_kcl_app.js:

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON