Desarrolle productores mediante la API Amazon Kinesis Data Streams con AWS SDK for Java - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrolle productores mediante la API Amazon Kinesis Data Streams con AWS SDK for Java

Puede desarrollar productores mediante la API de Amazon Kinesis Data Streams con AWS el SDK for Java. Si es la primera vez que utiliza Kinesis Data Streams, familiarícese antes con los conceptos y los términos que encontrará en ¿Qué es Amazon Kinesis Data Streams? y Úselo AWS CLI para realizar operaciones de Amazon Kinesis Data Streams.

En estos ejemplos se aborda la API de Kinesis Data Streams y se utiliza el SDK de AWS para Java para insertar datos en un flujo. Sin embargo, en la mayoría de los casos de uso, es preferible optar por la biblioteca KPL de Kinesis Data Streams. Para obtener más información, consulte Desarrollo de productores con Amazon Kinesis Producer Library (KPL).

El código de ejemplo de Java de este capítulo demuestra cómo realizar operaciones básicas con la API de Kinesis Data Streams y está dividido lógicamente por tipo de operación. Estos ejemplos no representan códigos listos para producción, ya que no comprueban todas las excepciones posibles ni toman en cuenta todas las consideraciones de seguridad y desempeño posibles. Además, puede llamar a la API de Kinesis Data Streams mediante otros lenguajes de programación. Para obtener más información sobre todos los productos disponibles AWS SDKs, consulte Comience a desarrollar con Amazon Web Services.

Cada tarea tiene requisitos previos. Por ejemplo, no se pueden agregar datos a una secuencia hasta que se haya creado una, para lo que se ha de crear un cliente. Para obtener más información, consulte Crear y administrar Kinesis Data Streams.

Agregar datos a un flujo

Una vez que se crea una secuencia, puede agregar datos a ella en forma de registros. Un registro es una estructura de datos que contiene los datos que se han de procesar en forma de un blob de datos. Después de almacenar los datos en el registro, Kinesis Data Streams no inspecciona, interpreta ni cambia los datos. Cada registro también tiene asociado un número secuencial y una clave de partición.

Existen dos operaciones diferentes en la API de Kinesis Data Streams que agregan datos a un flujo: PutRecords y PutRecord. La operación PutRecords envía varios registros a su secuencia por solicitud HTTP, y la operación única PutRecord envía registros a su secuencia, una por una (se necesita una solicitud HTTP independiente para cada registro). Es preferible utilizar PutRecords para la mayoría de las aplicaciones, ya que conseguirá un mayor rendimiento por productor de datos. Para obtener más información sobre cada una de estas operaciones, consulte las subsecciones independientes que aparecen a continuación.

Siempre debe tener en mente que, a medida que la aplicación de origen agrega datos al flujo mediante la API de Kinesis Data Streams, muy probablemente haya una o varias aplicaciones de consumidor que procesan datos simultáneamente desde el flujo. Para obtener información sobre cómo los consumidores obtienen datos mediante la API de Kinesis Data Streams, consulte Obtener datos de un flujo.

Añada varios registros con PutRecords

La operación PutRecords envía varios registros a Kinesis Data Streams en una única solicitud. Al utilizar PutRecords, los productores pueden conseguir un mayor rendimiento cuando envían datos a los flujos de datos de Kinesis. Cada solicitud PutRecords puede admitir hasta 500 registros. Cada registro puede ser tan grande como 1 MB, hasta un límite de 5 MB para toda la solicitud, incluidas las claves de partición. Al igual que con la operación única PutRecord que se describe a continuación, PutRecords utiliza números secuenciales y claves de partición. Sin embargo, el parámetro de PutRecord SequenceNumberForOrdering no se incluye en una llamada PutRecords. La operación PutRecords intenta procesar todos los registros en el orden natural de la solicitud.

Cada registro de datos tiene un número secuencial único. Kinesis Data Streams asigna el número secuencial cuando se llama a client.putRecords para agregar los registros de datos al flujo. Por lo general, los números secuenciales de una misma clave de partición aumentan con el tiempo; cuanto más grande sea el periodo de tiempo transcurrido entre las solicitudes a PutRecords, más aumentan los números secuenciales.

nota

Los números secuenciales no se pueden utilizar como índices de conjuntos de datos dentro de la misma secuencia. Para separar lógicamente conjuntos de datos, utilice claves de partición o cree una secuencia independiente para cada conjunto de datos.

Una solicitud PutRecords puede incluir registros con diferentes claves de partición. El ámbito de la solicitud es una secuencia; cada solicitud puede incluir cualquier combinación de claves de partición y registros hasta alcanzar los límites de la solicitud. Las solicitudes realizadas con varias claves de partición a secuencias con muchos fragmentos diferentes suelen ser más rápidas que las solicitudes con un número reducido de claves de partición a un número pequeño de fragmentos. El número de claves de partición debe ser mucho mayor que el número de fragmentos a fin de reducir la latencia y maximizar el rendimiento.

PutRecordsEjemplo de

El siguiente código crea 100 registros de datos con claves de partición secuenciales y los inserta en una secuencia llamada DataStream.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

La respuesta PutRecords incluye una gama de Records de respuesta. Cada registro de la matriz de respuestas se correlaciona directamente con un registro en la matriz de solicitudes siguiendo el orden natural, de arriba abajo de la solicitud y la respuesta. La matriz de Records de respuesta siempre incluye el mismo número de registros que la matriz de solicitudes.

Controle las fallas al usar PutRecords

De forma predeterminada, el error de registros individuales en una solicitud no para el procesamiento de los registros siguientes en una solicitud PutRecords. Esto significa que una matriz de Records de respuesta incluye tanto los registros procesados correctamente como los que no. Debe detectar los registros procesados de forma incorrecta e incluirlos en una llamada posterior.

Los registros correctos incluyen los valores SequenceNumber y ShardID, y los registros incorrectos incluyen los valores ErrorCode y ErrorMessage. El parámetro ErrorCode refleja el tipo de error y puede tomar uno de los siguientes valores: ProvisionedThroughputExceededException o InternalFailure. ErrorMessage proporciona información más detallada sobre la excepción ProvisionedThroughputExceededException, e incluye el ID de la cuenta, el nombre de la secuencia y el ID del fragmento del registro al que se ha aplicado limitación. El ejemplo siguiente tiene tres registros en una solicitud PutRecords. El segundo registro ha generado un error y se refleja en la respuesta.

ejemplo PutRecords Solicita la sintaxis
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
ejemplo PutRecords Sintaxis de respuesta
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

Los registros que se procesan sin éxito se pueden incluir en las solicitudes PutRecords posteriores. En primer lugar, compruebe el parámetro FailedRecordCount en putRecordsResult para confirmar si se hay registros con error en la solicitud. En caso afirmativo, cada putRecordsEntry que tenga un ErrorCode que no sea null se debe agregar a una solicitud posterior. Para un ejemplo de este tipo de controlador, consulte el siguiente código.

ejemplo PutRecords manejador de fallas
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

Añada un único registro con PutRecord

Cada llamada a PutRecord opera sobre un solo registro. Es preferible recurrir a la operación PutRecords que se describe en Añada varios registros con PutRecords a menos que su aplicación necesite específicamente enviar siempre registros individuales en cada solicitud, o que por cualquier otro motivo no se pueda utilizar PutRecords.

Cada registro de datos tiene un número secuencial único. Kinesis Data Streams asigna el número secuencial cuando se llama a client.putRecord para agregar el registro de datos al flujo. Por lo general, los números secuenciales de una misma clave de partición aumentan con el tiempo; cuanto más grande sea el periodo de tiempo transcurrido entre las solicitudes a PutRecord, más aumentan los números secuenciales.

Cuando se producen inserciones (puts) en una sucesión rápida, no hay garantía de que los números secuenciales devueltos aumenten, ya que las operaciones put aparecen esencialmente de manera simultánea a Kinesis Data Streams. Para garantizar unos números secuenciales estrictamente en aumento para la misma clave de partición, utilice el parámetro SequenceNumberForOrdering tal y como se muestra en el código de muestra de PutRecordEjemplo de .

Independientemente de que use o no SequenceNumberForOrdering, los registros que recibe Kinesis Data Streams mediante una llamada a GetRecords están estrictamente ordenados por número secuencial.

nota

Los números secuenciales no se pueden utilizar como índices de conjuntos de datos dentro de la misma secuencia. Para separar lógicamente conjuntos de datos, utilice claves de partición o cree una secuencia independiente para cada conjunto de datos.

Una clave de partición se utiliza para agrupar datos dentro de una secuencia. Un registro de datos se asigna a un fragmento dentro de la secuencia en función de su clave de partición. En concreto, Kinesis Data Streams utiliza la clave de partición como entrada para una función hash que asigna la clave de partición (y los datos asociados) a una partición específica.

Como resultado de este mecanismo de hash, todos los registros de datos con la misma clave de partición se asignan al mismo fragmento dentro de la secuencia. Sin embargo, si el número de claves de partición supera el número de fragmentos, algunos fragmentos han de contener necesariamente registros con diferentes claves de partición. Desde el punto de vista del diseño, para garantizar que todos los fragmentos estén bien utilizados, el número de fragmentos (especificado mediante el método setShardCount de CreateStreamRequest) debe ser significativamente inferior al número de claves de partición únicas, y la cantidad de datos que entran en una única clave de partición debe ser significativamente inferior a la capacidad del fragmento.

PutRecordEjemplo de

El siguiente código crea diez registros de datos distribuidos en dos claves de partición y los inserta en una secuencia llamada myStreamName.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

El código de muestra anterior utiliza setSequenceNumberForOrdering para garantizar un orden estrictamente creciente en cada clave de partición. Para utilizar este parámetro de forma eficaz, establezca como SequenceNumberForOrdering del registro actual (registro n) el número secuencial del registro anterior (registro n-1). Para obtener el número secuencial de un registro que se ha añadido a la secuencia, llame a getSequenceNumber en el resultado de putRecord.

El parámetro SequenceNumberForOrdering garantiza números de secuencia estrictamente crecientes para la misma clave de partición. SequenceNumberForOrdering no permite ordenar los registros en varias claves de partición.