Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Desarrollo de un consumidor de Kinesis Client Library en .NET
Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de .NET.
La KCL es una biblioteca de Java; la compatibilidad con otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. MultiLangDaemon Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala la KCL para.NET y escribe la aplicación para el usuario en su totalidad en .NET, seguirá necesitando instalar Java en el sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del MultiLangDaemon proyecto KCL
Para descargar el KCL de.NET GitHub, vaya a la biblioteca de clientes de Kinesis (
Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en .NET:
Tareas
Implemente los métodos de clase I RecordProcessor
El consumidor debe implementar los siguientes métodos para IRecordProcessor
. En el consumidor de muestra se presentan implementaciones que puede utilizar como punto de partida (consulte la clase SampleRecordProcessor
en SampleConsumer/AmazonKinesisSampleConsumer.cs
).
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
Initialize
KCL llama a este método cuando se crea una instancia del procesador de registros y pasa un ID de partición específico en el parámetro input
(input.ShardId
). Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de al menos una vez, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte Cambio en los fragmentos, escalado y procesamiento paralelo.
public void Initialize(InitializationInput input)
ProcessRecords
KCL llama a este método y pasa una lista de registros de datos en el parámetro input
(input.Records
) desde la partición especificada por el método Initialize
. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).
public void ProcessRecords(ProcessRecordsInput input)
Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase Record
expone lo siguiente para obtener acceso a los datos, el número secuencial y la clave de partición del registro:
byte[] Record.Data
string Record.SequenceNumber
string Record.PartitionKey
En el ejemplo, el método ProcessRecordsWithRetries
tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.
Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto Checkpointer
a ProcessRecords
(input.Checkpointer
). El procesador de registros llama al método Checkpointer.Checkpoint
para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.
En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a Checkpointer.Checkpoint
para indicar que se ha completado el procesamiento en las particiones originales.
Si no se pasa un parámetro, KCL supone que la llamada a Checkpointer.Checkpoint
significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a Checkpointer.Checkpoint
después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a Checkpointer.Checkpoint
en cada llamada a ProcessRecords
. Un procesador podría, por ejemplo, llamar a Checkpointer.Checkpoint
en cada tercera o cuarta llamada. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para Checkpointer.Checkpoint
. En este caso, KCL supone que los registros se han procesado exclusivamente hasta ese registro.
En el ejemplo, el método privado Checkpoint(Checkpointer checkpointer)
muestra cómo llamar al método Checkpointer.Checkpoint
mediante la administración de excepciones y la lógica de reintentos apropiadas.
KCL para .NET administra las excepciones de forma diferente a las bibliotecas de KCL para el resto de lenguajes, ya que no administra las excepciones que surgen del procesamiento de los registros de datos. Las excepciones no detectadas procedentes del código del usuario harán que el programa se bloquee.
Shutdown
KCL llama al método Shutdown
cuando finaliza el procesamiento (el motivo del cierre es TERMINATE
) o cuando el proceso de trabajo ya no responde (el valor de input.Reason
del cierre es ZOMBIE
).
public void Shutdown(ShutdownInput input)
El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.
KCL también pasa un objeto Checkpointer
a shutdown
. Si el motivo del shutdown es TERMINATE
, el procesador de registros debería terminar de procesar los registros de datos y llamar al método checkpoint
en esta interfaz.
Modificación de las propiedades de configuración
En el consumidor muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte SampleConsumer/kcl.properties
).
Nombre de la aplicación
KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
-
Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
-
KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte Uso de una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL.
Configuración de credenciales
Debe poner sus credenciales de AWS a disposición de uno de los proveedores de credenciales en la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad AWSCredentialsProvider
para configurar un proveedor de credenciales. Las propiedades de muestra
El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en AmazonKinesisSampleConsumer.cs
.