Desarrollo de un consumidor de Kinesis Client Library en .NET - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrollo de un consumidor de Kinesis Client Library en .NET

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de .NET.

La KCL es una biblioteca de Java; la compatibilidad con otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. MultiLangDaemon Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala la KCL para.NET y escribe la aplicación para el usuario en su totalidad en .NET, seguirá necesitando instalar Java en el sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del MultiLangDaemon proyecto KCL.

Para descargar el KCL de.NET GitHub, vaya a la biblioteca de clientes de Kinesis (.NET). Para descargar un código de muestra para una aplicación para consumidores de KCL de.NET, visite la página de ejemplos de proyectos para consumidores de KCL para .NET en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en .NET:

Implemente los métodos de clase I RecordProcessor

El consumidor debe implementar los siguientes métodos para IRecordProcessor. En el consumidor de muestra se presentan implementaciones que puede utilizar como punto de partida (consulte la clase SampleRecordProcessor en SampleConsumer/AmazonKinesisSampleConsumer.cs).

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)
Initialize

KCL llama a este método cuando se crea una instancia del procesador de registros y pasa un ID de partición específico en el parámetro input (input.ShardId). Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de al menos una vez, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte Cambio en los fragmentos, escalado y procesamiento paralelo.

public void Initialize(InitializationInput input)
ProcessRecords

KCL llama a este método y pasa una lista de registros de datos en el parámetro input (input.Records) desde la partición especificada por el método Initialize. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

public void ProcessRecords(ProcessRecordsInput input)

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase Record expone lo siguiente para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

En el ejemplo, el método ProcessRecordsWithRetries tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto Checkpointer a ProcessRecords (input.Checkpointer). El procesador de registros llama al método Checkpointer.Checkpoint para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a Checkpointer.Checkpoint para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a Checkpointer.Checkpoint significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a Checkpointer.Checkpoint después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a Checkpointer.Checkpoint en cada llamada a ProcessRecords. Un procesador podría, por ejemplo, llamar a Checkpointer.Checkpoint en cada tercera o cuarta llamada. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para Checkpointer.Checkpoint. En este caso, KCL supone que los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado Checkpoint(Checkpointer checkpointer) muestra cómo llamar al método Checkpointer.Checkpoint mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL para .NET administra las excepciones de forma diferente a las bibliotecas de KCL para el resto de lenguajes, ya que no administra las excepciones que surgen del procesamiento de los registros de datos. Las excepciones no detectadas procedentes del código del usuario harán que el programa se bloquee.

Shutdown

KCL llama al método Shutdown cuando finaliza el procesamiento (el motivo del cierre es TERMINATE) o cuando el proceso de trabajo ya no responde (el valor de input.Reason del cierre es ZOMBIE).

public void Shutdown(ShutdownInput input)

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto Checkpointer a shutdown. Si el motivo del shutdown es TERMINATE, el procesador de registros debería terminar de procesar los registros de datos y llamar al método checkpoint en esta interfaz.

Modificación de las propiedades de configuración

En el consumidor muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte SampleConsumer/kcl.properties).

Nombre de la aplicación

KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:

  • Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.

  • KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte Uso de una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL.

Configuración de credenciales

Debe poner sus credenciales de AWS a disposición de uno de los proveedores de credenciales en la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad AWSCredentialsProvider para configurar un proveedor de credenciales. Las propiedades de muestra deben poner sus credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Si ejecuta su aplicación de consumo en una instancia de EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para un consumidor que se ejecute en una instancia EC2.

El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en AmazonKinesisSampleConsumer.cs.