Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Desarrolle productores mediante la API Amazon Kinesis Data Streams con AWS SDK for Java
Puede desarrollar productores mediante la API de Amazon Kinesis Data Streams con AWS el SDK for Java. Si es la primera vez que utiliza Kinesis Data Streams, familiarícese antes con los conceptos y los términos que encontrará en ¿Qué es Amazon Kinesis Data Streams? y Úselo AWS CLI para realizar operaciones de Amazon Kinesis Data Streams.
En estos ejemplos se aborda la API de Kinesis Data Streams y se utiliza el SDK de AWS para Java
El código de ejemplo de Java de este capítulo demuestra cómo realizar operaciones básicas con la API de Kinesis Data Streams y está dividido lógicamente por tipo de operación. Estos ejemplos no representan códigos listos para producción, ya que no comprueban todas las excepciones posibles ni toman en cuenta todas las consideraciones de seguridad y desempeño posibles. Además, puede llamar a la API de Kinesis Data Streams mediante otros lenguajes de programación. Para obtener más información sobre todos los productos disponibles AWS SDKs, consulte Comience a desarrollar con Amazon Web Services
Cada tarea tiene requisitos previos. Por ejemplo, no se pueden agregar datos a una secuencia hasta que se haya creado una, para lo que se ha de crear un cliente. Para obtener más información, consulte Crear y administrar Kinesis Data Streams.
Agregar datos a un flujo
Una vez que se crea una secuencia, puede agregar datos a ella en forma de registros. Un registro es una estructura de datos que contiene los datos que se han de procesar en forma de un blob de datos. Después de almacenar los datos en el registro, Kinesis Data Streams no inspecciona, interpreta ni cambia los datos. Cada registro también tiene asociado un número secuencial y una clave de partición.
Existen dos operaciones diferentes en la API de Kinesis Data Streams que agregan datos a un flujo: PutRecords
y PutRecord
. La operación PutRecords
envía varios registros a su secuencia por solicitud HTTP, y la operación única PutRecord
envía registros a su secuencia, una por una (se necesita una solicitud HTTP independiente para cada registro). Es preferible utilizar PutRecords
para la mayoría de las aplicaciones, ya que conseguirá un mayor rendimiento por productor de datos. Para obtener más información sobre cada una de estas operaciones, consulte las subsecciones independientes que aparecen a continuación.
Siempre debe tener en mente que, a medida que la aplicación de origen agrega datos al flujo mediante la API de Kinesis Data Streams, muy probablemente haya una o varias aplicaciones de consumidor que procesan datos simultáneamente desde el flujo. Para obtener información sobre cómo los consumidores obtienen datos mediante la API de Kinesis Data Streams, consulte Obtener datos de un flujo.
importante
Añada varios registros con PutRecords
La operación PutRecords
envía varios registros a Kinesis Data Streams en una única solicitud. Al utilizar PutRecords
, los productores pueden conseguir un mayor rendimiento cuando envían datos a los flujos de datos de Kinesis. Cada solicitud PutRecords
puede admitir hasta 500 registros. Cada registro puede ser tan grande como 1 MB, hasta un límite de 5 MB para toda la solicitud, incluidas las claves de partición. Al igual que con la operación única PutRecord
que se describe a continuación, PutRecords
utiliza números secuenciales y claves de partición. Sin embargo, el parámetro de PutRecord
SequenceNumberForOrdering
no se incluye en una llamada PutRecords
. La operación PutRecords
intenta procesar todos los registros en el orden natural de la solicitud.
Cada registro de datos tiene un número secuencial único. Kinesis Data Streams asigna el número secuencial cuando se llama a client.putRecords
para agregar los registros de datos al flujo. Por lo general, los números secuenciales de una misma clave de partición aumentan con el tiempo; cuanto más grande sea el periodo de tiempo transcurrido entre las solicitudes a PutRecords
, más aumentan los números secuenciales.
nota
Los números secuenciales no se pueden utilizar como índices de conjuntos de datos dentro de la misma secuencia. Para separar lógicamente conjuntos de datos, utilice claves de partición o cree una secuencia independiente para cada conjunto de datos.
Una solicitud PutRecords
puede incluir registros con diferentes claves de partición. El ámbito de la solicitud es una secuencia; cada solicitud puede incluir cualquier combinación de claves de partición y registros hasta alcanzar los límites de la solicitud. Las solicitudes realizadas con varias claves de partición a secuencias con muchos fragmentos diferentes suelen ser más rápidas que las solicitudes con un número reducido de claves de partición a un número pequeño de fragmentos. El número de claves de partición debe ser mucho mayor que el número de fragmentos a fin de reducir la latencia y maximizar el rendimiento.
PutRecordsEjemplo de
El siguiente código crea 100 registros de datos con claves de partición secuenciales y los inserta en una secuencia llamada DataStream
.
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
La respuesta PutRecords
incluye una gama de Records
de respuesta. Cada registro de la matriz de respuestas se correlaciona directamente con un registro en la matriz de solicitudes siguiendo el orden natural, de arriba abajo de la solicitud y la respuesta. La matriz de Records
de respuesta siempre incluye el mismo número de registros que la matriz de solicitudes.
Controle las fallas al usar PutRecords
De forma predeterminada, el error de registros individuales en una solicitud no para el procesamiento de los registros siguientes en una solicitud PutRecords
. Esto significa que una matriz de Records
de respuesta incluye tanto los registros procesados correctamente como los que no. Debe detectar los registros procesados de forma incorrecta e incluirlos en una llamada posterior.
Los registros correctos incluyen los valores SequenceNumber
y ShardID
, y los registros incorrectos incluyen los valores ErrorCode
y ErrorMessage
. El parámetro ErrorCode
refleja el tipo de error y puede tomar uno de los siguientes valores: ProvisionedThroughputExceededException
o InternalFailure
. ErrorMessage
proporciona información más detallada sobre la excepción ProvisionedThroughputExceededException
, e incluye el ID de la cuenta, el nombre de la secuencia y el ID del fragmento del registro al que se ha aplicado limitación. El ejemplo siguiente tiene tres registros en una solicitud PutRecords
. El segundo registro ha generado un error y se refleja en la respuesta.
ejemplo PutRecords Solicita la sintaxis
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
ejemplo PutRecords Sintaxis de respuesta
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
Los registros que se procesan sin éxito se pueden incluir en las solicitudes PutRecords
posteriores. En primer lugar, compruebe el parámetro FailedRecordCount
en putRecordsResult
para confirmar si se hay registros con error en la solicitud. En caso afirmativo, cada putRecordsEntry
que tenga un ErrorCode
que no sea null
se debe agregar a una solicitud posterior. Para un ejemplo de este tipo de controlador, consulte el siguiente código.
ejemplo PutRecords manejador de fallas
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
Añada un único registro con PutRecord
Cada llamada a PutRecord
opera sobre un solo registro. Es preferible recurrir a la operación PutRecords
que se describe en Añada varios registros con PutRecords a menos que su aplicación necesite específicamente enviar siempre registros individuales en cada solicitud, o que por cualquier otro motivo no se pueda utilizar PutRecords
.
Cada registro de datos tiene un número secuencial único. Kinesis Data Streams asigna el número secuencial cuando se llama a client.putRecord
para agregar el registro de datos al flujo. Por lo general, los números secuenciales de una misma clave de partición aumentan con el tiempo; cuanto más grande sea el periodo de tiempo transcurrido entre las solicitudes a PutRecord
, más aumentan los números secuenciales.
Cuando se producen inserciones (puts) en una sucesión rápida, no hay garantía de que los números secuenciales devueltos aumenten, ya que las operaciones put aparecen esencialmente de manera simultánea a Kinesis Data Streams. Para garantizar unos números secuenciales estrictamente en aumento para la misma clave de partición, utilice el parámetro SequenceNumberForOrdering
tal y como se muestra en el código de muestra de PutRecordEjemplo de .
Independientemente de que use o no SequenceNumberForOrdering
, los registros que recibe Kinesis Data Streams mediante una llamada a GetRecords
están estrictamente ordenados por número secuencial.
nota
Los números secuenciales no se pueden utilizar como índices de conjuntos de datos dentro de la misma secuencia. Para separar lógicamente conjuntos de datos, utilice claves de partición o cree una secuencia independiente para cada conjunto de datos.
Una clave de partición se utiliza para agrupar datos dentro de una secuencia. Un registro de datos se asigna a un fragmento dentro de la secuencia en función de su clave de partición. En concreto, Kinesis Data Streams utiliza la clave de partición como entrada para una función hash que asigna la clave de partición (y los datos asociados) a una partición específica.
Como resultado de este mecanismo de hash, todos los registros de datos con la misma clave de partición se asignan al mismo fragmento dentro de la secuencia. Sin embargo, si el número de claves de partición supera el número de fragmentos, algunos fragmentos han de contener necesariamente registros con diferentes claves de partición. Desde el punto de vista del diseño, para garantizar que todos los fragmentos estén bien utilizados, el número de fragmentos (especificado mediante el método setShardCount
de CreateStreamRequest
) debe ser significativamente inferior al número de claves de partición únicas, y la cantidad de datos que entran en una única clave de partición debe ser significativamente inferior a la capacidad del fragmento.
PutRecordEjemplo de
El siguiente código crea diez registros de datos distribuidos en dos claves de partición y los inserta en una secuencia llamada myStreamName
.
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
El código de muestra anterior utiliza setSequenceNumberForOrdering
para garantizar un orden estrictamente creciente en cada clave de partición. Para utilizar este parámetro de forma eficaz, establezca como SequenceNumberForOrdering
del registro actual (registro n) el número secuencial del registro anterior (registro n-1). Para obtener el número secuencial de un registro que se ha añadido a la secuencia, llame a getSequenceNumber
en el resultado de putRecord
.
El parámetro SequenceNumberForOrdering
garantiza números de secuencia estrictamente crecientes para la misma clave de partición. SequenceNumberForOrdering
no permite ordenar los registros en varias claves de partición.