Implemente la desagregación de consumidores - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Implemente la desagregación de consumidores

A partir de la versión 1.4.0, KCL admite la desagregación automática de los registros de los usuarios. KPL El código de la aplicación para consumidores escrito con versiones anteriores de la KCL se compilará sin ninguna modificación una vez que la actualice. KCL Sin embargo, si se utiliza la KPL agregación por parte del productor, hay una sutileza relacionada con los puntos de control: todos los subregistros de un registro agregado tienen el mismo número de secuencia, por lo que se deben almacenar datos adicionales en el punto de control si es necesario distinguir entre los subregistros. Estos datos adicionales se denominan números subsecuenciales.

Migre desde versiones anteriores del KCL

No es necesario cambiar las llamadas existentes para crear puntos de control en conjunto con la agregación. La recuperación de todos los registros almacenados correctamente en Kinesis Data Streams seguirá estando garantizada. KCLAhora proporciona dos nuevas operaciones de puntos de control para respaldar casos de uso particulares, que se describen a continuación.

En el caso de que el código existente se haya escrito para la KCL versión anterior al KPL soporte y se llame a la operación de punto de control sin argumentos, equivale a comprobar el número de secuencia del último registro de KPL usuario del lote. Si la operación de punto de comprobación se llama con una cadena de número secuencial, equivaldrá a la creación de un punto de comprobación para el número secuencial determinado del lote junto con el número subsecuencial implícito 0 (cero).

Llamar a la nueva operación de KCL punto de control checkpoint() sin ningún argumento equivale semánticamente a comprobar el número de secuencia de la última Record llamada del lote, junto con el número de secuencia implícito 0 (cero).

Llamar a la nueva operación KCL de punto de control checkpoint(Record record) equivale semánticamente a poner un punto de control en el número de secuencia dado junto con el número Record de subsecuencia implícito 0 (cero). Si la llamada al Record es realmente un UserRecord, el número secuencial de UserRecord y el número subsecuencial se someten a un punto de comprobación.

Al llamar a la nueva operación KCL de punto de control, se comprueba checkpoint(String sequenceNumber, long subSequenceNumber) explícitamente el número de secuencia indicado junto con el número de subsecuencia indicado.

En cualquiera de estos casos, después de almacenar el punto de control en la tabla de puntos de control de Amazon DynamoDBKCL, podrá reanudar correctamente la recuperación de registros incluso cuando la aplicación se bloquee y se reinicie. Si la secuencia contiene más registros, la recuperación se produce a partir del registro con el siguiente número subsecuencial dentro del registro con el número secuencial que se haya comprobado más recientemente. Si el punto de comprobación más reciente incluyó el último número subsecuencial del registro con el número secuencial anterior, la recuperación se produce a partir del registro con el siguiente número secuencial.

En la siguiente sección se explican los detalles de la comprobación secuencial y subsecuencial para los consumidores que deban evitar la omisión y la duplicación de registros. Si la omisión (o duplicación) de registros al detener y reiniciar el procesamiento de registros del consumidor no resulta importante, puede ejecutar su código existente sin modificaciones.

KPLUtilice extensiones para la desagregación KCL

KPLla desagregación puede implicar puntos de control posteriores. Para facilitar el uso de los puntos de control posteriores, se ha añadido una UserRecord clase a: KCL

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

Esta categoría es la que se usa ahora en lugar de Record. Esto no afecta al código existente, porque es una subclase de Record. La clase UserRecord representa tanto subregistros reales como registros estándares no agregados. Se pueden describir los registros no agregados como registros agregados con solo un subregistro.

Además, se han añadido dos nuevas operaciones a IRecordProcessorCheckpointer:

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

Para empezar a utilizar la creación de puntos de comprobación de números subsecuenciales, puede realizar la siguiente conversión. Cambie el siguiente código de formulario:

checkpointer.checkpoint(record.getSequenceNumber());

Nuevo código de formulario:

checkpointer.checkpoint(record);

Le recomendamos que use el formulario checkpoint(Record record) para la generación de puntos de comprobación subsecuenciales. Sin embargo, si ya está almacenando sequenceNumbers en cadenas para la creación de puntos de comprobación, ahora también deberá almacenar subSequenceNumber, tal y como se muestra en el ejemplo siguiente:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

La conversión de Record a UserRecord siempre tiene éxito porque la implementación siempre usa. UserRecord A no ser que sea necesario realizar cálculos aritméticos en los números secuenciales, no recomendamos este enfoque.

Mientras procesa los registros de los KPL usuarios, KCL escribe el número de subsecuencia en Amazon DynamoDB como un campo adicional para cada fila. Las versiones anteriores KCL se utilizaban AFTER_SEQUENCE_NUMBER para buscar registros al reanudar los puntos de control. La corriente KCL con KPL soporte utiliza en su lugar. AT_SEQUENCE_NUMBER Cuando se recupera el registro del número secuencial del punto de comprobación, se comprueba el número secuencial sometido al punto de comprobación, y los subregistros se descartan según proceda (podrían ser todos ellos, si el último subregistro es el del punto de comprobación). De nuevo, se pueden entender los registros no agregados como registros agregados con un único subregistro, de modo que el mismo algoritmo funciona tanto para los registros agregados como para los no agregados.

Úselo GetRecords directamente

También puede optar por no utilizar la API operaciónKCL, sino invocarla GetRecords directamente para recuperar los registros de Kinesis Data Streams. Para desempaquetar estos registros recuperados en sus registros de KPL usuario originales, ejecute una de las siguientes operaciones estáticas en: UserRecord.java

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

La primera operación utiliza el valor predeterminado 0 (cero) para startingHashKey y el valor predeterminado 2^128 -1 para endingHashKey.

Cada una de estas operaciones desagrega la lista dada de registros de Kinesis Data Streams en una lista de registros de usuario. KPL Todos los registros de KPL usuario cuya clave hash o clave de partición explícita esté fuera del rango de los valores startingHashKey (inclusivos) e inclusivos se descartan de la lista de registros devuelta. endingHashKey