Desarrolle consumidores personalizados con un rendimiento compartido mediante el AWS SDK for Java - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrolle consumidores personalizados con un rendimiento compartido mediante el AWS SDK for Java

Uno de los métodos para desarrollar Kinesis Data Streams personalizados con contenido compartido en todo momento consiste en utilizar Amazon Kinesis Data Streams. APIs En esta sección se describe el uso de Kinesis Data APIs Streams con AWS SDK el para Java. El código de ejemplo de Java de esta sección muestra cómo realizar KDS API operaciones básicas y se divide lógicamente por tipo de operación.

Estos ejemplos no representan código listo para producción. No comprueban todas las excepciones posibles ni tienen en cuenta todas las consideraciones de seguridad y de rendimiento.

Puede llamar a Kinesis Data APIs Streams mediante otros lenguajes de programación diferentes. Para obtener más información sobre todos los productos disponibles AWS SDKs, consulte Comience a desarrollar con Amazon Web Services.

importante

El método recomendado para desarrollar Kinesis Data Streams personalizados con contenido compartido en todo momento es utilizar la biblioteca de clientes de Kinesis (). KCL KCLle ayuda a consumir y procesar datos de una transmisión de datos de Kinesis al encargarse de muchas de las tareas complejas asociadas a la informática distribuida. Para obtener más información, consulte Desarrollo de consumidores personalizados con un rendimiento compartido mediante el uso. KCL

Obtenga datos de una transmisión

Los Kinesis Data APIs Streams incluyen getShardIterator los métodos getRecords y que puede invocar para recuperar registros de un flujo de datos. Se trata del modelo de extracción, donde el código extrae registros de datos directamente de las particiones del flujo de datos.

importante

Le recomendamos que utilice el soporte de procesador de registros que proporciona KCL para recuperar los registros de sus flujos de datos. Se trata del modelo de inserción, en el que debe implementar el código que procesa los datos. KCLRecupera los registros de datos del flujo de datos y los envía al código de su aplicación. Además, KCL proporciona funciones de conmutación por error, recuperación y equilibrio de carga. Para obtener más información, consulte Desarrollo de consumidores personalizados mediante el uso de un rendimiento compartido. KCL

Sin embargo, en algunos casos puede que prefiera utilizar Kinesis Data APIs Streams. Por ejemplo, para implementar herramientas personalizadas para la supervisión o la depuración de los flujos de datos.

importante

Kinesis Data Streams admite cambios en el periodo de retención de los registros de datos del flujo de datos. Para obtener más información, consulte Cambie el período de retención de los datos.

Utilice iteradores compartidos

Puede recuperar registros desde la secuencia por fragmentos. Para cada fragmento y cada lote de registros obtenido de ese fragmento debe conseguir un iterador de fragmentos. El iterador de fragmentos se utiliza en el objeto getRecordsRequest para especificar el fragmento a partir del cual deben recuperarse los registros. El tipo asociado con el iterador de fragmentos determina el punto del fragmento a partir del cual deben recuperarse los registros (consulte la información que se incluye más adelante en esta sección para obtener más detalles). Antes de poder trabajar con el iterador de fragmentos, debe recuperar el fragmento. Para obtener más información, consulte Haz una lista de fragmentos.

Obtenga el iterador de fragmentos inicial con el método getShardIterator. Obtenga iteradores de fragmentos para lotes adicionales de registros utilizando el método getNextShardIterator del objeto getRecordsResult que devuelve el método getRecords. Un iterador de fragmentos es válido durante 5 minutos. Si utiliza un iterador de fragmentos mientras sea válido, obtendrá uno nuevo. Cada iterador de fragmentos mantiene su validez durante 5 minutos, incluso después de utilizarlo.

Para obtener el iterador de fragmentos inicial, cree instancias de GetShardIteratorRequest y páselas al método getShardIterator. Para configurar la solicitud, especifique la secuencia y el ID del fragmento. Para obtener información sobre cómo obtener las transmisiones de su AWS cuenta, consulte. Lista de secuencias Para obtener información sobre cómo obtener los fragmentos en una secuencia, consulte Haz una lista de fragmentos.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Este código de muestra especifica TRIM_HORIZON como el tipo de iterador que se utiliza para obtener el iterador de fragmentos inicial. Este tipo de iterador implica que se deben devolver los registros y comenzar por el primer registro agregado a la partición, en lugar de comenzar por el registro agregado más recientemente, también denominado extremo. Los tipos de iteradores posibles son los siguientes:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Para obtener más información, consulte ShardIteratorType.

Algunos tipos de iteradores requieren que se especifique un número de secuencia además del tipo, por ejemplo:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Después de obtener un registro mediante getRecords, puede obtener el número de secuencia del registro si llama al método getSequenceNumber del registro.

record.getSequenceNumber()

Además, el código que añade registros a la secuencia de datos puede obtener el número de secuencia para un registro añadido llamando a getSequenceNumber en el resultado de putRecord.

lastSequenceNumber = putRecordResult.getSequenceNumber();

Puede utilizar números secuenciales para garantizar que los registros tengan un orden estrictamente ascendente. Para obtener más información, consulte el código de ejemplo en PutRecordejemplo.

Utilice GetRecords

Una vez que haya obtenido el iterador de fragmentos, cree una instancia de un objeto GetRecordsRequest. Especifique el iterador para la solicitud con el método setShardIterator.

También puede establecer el número de registros que quiera recuperar mediante el método setLimit. El número de registros que devuelve getRecords es siempre igual o inferior a este límite. Si no especifica este límite, getRecords devuelve 10 MB de registros recuperados. El código de muestra que aparece a continuación establece este límite en 25 registros.

Si no se devuelven, significa que no hay registros de datos disponibles actualmente en este fragmento con el número de secuencia al que hace referencia el iterador de fragmentos. En una situación así, la aplicación debe esperar una cantidad de tiempo adecuada para los orígenes de datos del flujo. Intente obtener datos de nuevo a partir del fragmento mediante el iterador de fragmentos que ha devuelto la llamada anterior a getRecords.

Pase la getRecordsRequest al método getRecords y capture el valor devuelto como un objeto getRecordsResult. Para obtener los registros de datos, llame al método getRecords en el objeto getRecordsResult.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Para prepararse para otra llamada a getRecords, obtenga el siguiente iterador de fragmentos desde getRecordsResult.

shardIterator = getRecordsResult.getNextShardIterator();

Para obtener resultados óptimos, suspenda la actividad durante al menos 1 segundo (1000 milisegundos) entre las llamadas a getRecords para evitar que se supere el límite de frecuencia de getRecords.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

Normalmente, debe llamar a getRecords en bucle, incluso cuando recupere un solo registro en un entorno de pruebas. Una única llamada a getRecords podría devolver una lista de registros vacía, incluso si el fragmento contiene más registros en números secuenciales posteriores. Si ocurre esto, el NextShardIterator que se devuelve junto con la lista de registros vacía hace referencia a un número de secuencia posterior en el fragmento, y las llamadas sucesivas a getRecords acabarán por devolver los registros. El siguiente ejemplo ilustra el uso de un bucle.

Ejemplo: getRecords

El siguiente ejemplo de código refleja las sugerencias sobre getRecords que hemos planteado en esta sección, incluidas la realización de llamadas en bucle.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Si utiliza Kinesis Client Library, esta podría hacer varias llamadas antes de devolver los datos. Este comportamiento se debe a un diseño y no indica ningún problema con los datos KCL o con los suyos.

Adáptese a una nueva fragmentación

Si getRecordsResult.getNextShardIterator devuelve null, indica que se ha producido una división o combinación de una partición que ha implicado esta partición. Esta partición se encuentra ahora en un estado CLOSED y se han leído todos los registros de datos disponibles de esta partición.

En este escenario, puede utilizar getRecordsResult.childShards para obtener información sobre las nuevas particiones secundarias de la partición que se procesa y que se crearon mediante la división o la combinación. Para obtener más información, consulte. ChildShard

En el caso de una división, los dos nuevos fragmentos tienen un parentShardId igual al ID de fragmento del fragmento que estuviera procesando anteriormente. El valor de adjacentParentShardId para ambos fragmentos es null.

En el caso de una fusión, el único fragmento nuevo creado por la fusión tiene un parentShardId igual al ID del fragmento de uno de los fragmentos de origen y un adjacentParentShardId igual al ID de fragmento del otro fragmento de origen. La aplicación ya ha leído todos los datos de uno de estos fragmentos. Este es el fragmento para el que getRecordsResult.getNextShardIterator ha devuelto null. Si el orden de los datos es importante en la aplicación, debe asegurarse de que esta también lea todos los datos del otro fragmento principal antes de leer datos nuevos del fragmento secundario creado por la fusión.

Si utiliza varios procesadores para recuperar los datos de la secuencia (por ejemplo, un procesador por fragmento) y se produce una división o fusión de fragmentos, debe aumentar o disminuir el número de procesadores para adaptarse a los cambios en el número de fragmentos.

Para obtener más información acerca de cómo realizar cambios en los fragmentos, incluida una explicación de los estados de los fragmentos, como CLOSED, consulte Vuelva a compartir una transmisión.

Interactúe con los datos mediante el registro AWS Glue de esquemas

Puede integrar sus flujos de datos de Kinesis con el registro de AWS Glue esquemas. El registro de AWS Glue esquemas le permite descubrir, controlar y desarrollar los esquemas de forma centralizada y, al mismo tiempo, garantizar que los datos generados se validen continuamente mediante un esquema registrado. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. El registro de AWS Glue esquemas le permite mejorar la end-to-end calidad y el gobierno de los datos en sus aplicaciones de streaming. Para obtener más información, consulte AWS Glue Schema Registry. Una de las formas de configurar esta integración es a través de GetRecords Kinesis Data API Streams, disponible en Java AWS . SDK

Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con Schema Registry mediante Kinesis Data APIs Streams, consulte GetRecords la sección «Interacción con datos mediante Kinesis Data Streams» en Caso de uso: integración de Amazon Kinesis APIs Data Streams con el registro de Glue Schema. AWS