Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Uno de los métodos para desarrollar Kinesis Data Streams personalizados para los consumidores con contenido compartido en todo momento consiste en utilizar Amazon Kinesis Data APIs Streams con. AWS SDK for Java En esta sección se describe el uso de Kinesis Data APIs Streams con AWS SDK for Java. Puede llamar a Kinesis Data APIs Streams mediante otros lenguajes de programación diferentes. Para obtener más información sobre todos los productos disponibles AWS SDKs, consulte Comience a desarrollar con Amazon Web Services
El código de ejemplo de Java de esta sección muestra cómo realizar las operaciones básicas de Kinesis Data API Streams y se divide lógicamente por tipo de operación. Estos ejemplos no representan código listo para producción. No comprueban todas las excepciones posibles ni tienen en cuenta todas las consideraciones de seguridad y de rendimiento.
Temas
Obtener datos de un flujo
Los Kinesis Data APIs Streams incluyen getShardIterator
los métodos getRecords
y que puede invocar para recuperar registros de un flujo de datos. Se trata del modelo de extracción, donde el código extrae registros de datos directamente de las particiones del flujo de datos.
importante
Le recomendamos que utilice el soporte de procesador de registros que proporciona KCL para recuperar los registros de sus flujos de datos. Se trata del modelo de inserción, en el que debe implementar el código que procesa los datos. KCLRecupera los registros de datos del flujo de datos y los envía al código de su aplicación. Además, KCL proporciona funciones de conmutación por error, recuperación y equilibrio de carga. Para obtener más información, consulte Desarrollo de consumidores personalizados mediante el uso de un rendimiento compartido. KCL
Sin embargo, en algunos casos puede que prefiera utilizar Kinesis Data APIs Streams. Por ejemplo, para implementar herramientas personalizadas para la supervisión o la depuración de los flujos de datos.
importante
Kinesis Data Streams admite cambios en el periodo de retención de los registros de datos del flujo de datos. Para obtener más información, consulte Cambiar el periodo de retención de datos.
Usar iteradores de particiones
Puede recuperar registros desde la secuencia por fragmentos. Para cada fragmento y cada lote de registros obtenido de ese fragmento debe conseguir un iterador de fragmentos. El iterador de fragmentos se utiliza en el objeto getRecordsRequest
para especificar el fragmento a partir del cual deben recuperarse los registros. El tipo asociado con el iterador de fragmentos determina el punto del fragmento a partir del cual deben recuperarse los registros (consulte la información que se incluye más adelante en esta sección para obtener más detalles). Antes de trabajar con el iterador de particiones, tendrá que recuperar la partición. Para obtener más información, consulte Obtener lista de particiones.
Obtenga el iterador de fragmentos inicial con el método getShardIterator
. Obtenga iteradores de fragmentos para lotes adicionales de registros utilizando el método getNextShardIterator
del objeto getRecordsResult
que devuelve el método getRecords
. Un iterador de fragmentos es válido durante 5 minutos. Si utiliza un iterador de fragmentos mientras sea válido, obtendrá uno nuevo. Cada iterador de fragmentos mantiene su validez durante 5 minutos, incluso después de utilizarlo.
Para obtener el iterador de fragmentos inicial, cree instancias de GetShardIteratorRequest
y páselas al método getShardIterator
. Para configurar la solicitud, especifique la secuencia y el ID del fragmento. Para obtener información sobre cómo obtener las transmisiones de su AWS cuenta, consulteLista de secuencias. Para obtener información sobre cómo obtener los fragmentos en una secuencia, consulte Obtener lista de particiones.
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
Este código de muestra especifica TRIM_HORIZON
como el tipo de iterador que se utiliza para obtener el iterador de fragmentos inicial. Este tipo de iterador implica que se deben devolver los registros y comenzar por el primer registro agregado a la partición, en lugar de comenzar por el registro agregado más recientemente, también denominado extremo. Los tipos de iteradores posibles son los siguientes:
-
AT_SEQUENCE_NUMBER
-
AFTER_SEQUENCE_NUMBER
-
AT_TIMESTAMP
-
TRIM_HORIZON
-
LATEST
Para obtener más información, consulte ShardIteratorType.
Algunos tipos de iteradores requieren que se especifique un número de secuencia además del tipo, por ejemplo:
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
Después de obtener un registro mediante getRecords
, puede obtener el número de secuencia del registro si llama al método getSequenceNumber
del registro.
record.getSequenceNumber()
Además, el código que añade registros a la secuencia de datos puede obtener el número de secuencia para un registro añadido llamando a getSequenceNumber
en el resultado de putRecord
.
lastSequenceNumber = putRecordResult.getSequenceNumber();
Puede utilizar números secuenciales para garantizar que los registros tengan un orden estrictamente ascendente. Para obtener más información, consulte el código de ejemplo en Ejemplo de PutRecord.
Utilice GetRecords
Una vez que haya obtenido el iterador de fragmentos, cree una instancia de un objeto GetRecordsRequest
. Especifique el iterador para la solicitud con el método setShardIterator
.
También puede establecer el número de registros que quiera recuperar mediante el método setLimit
. El número de registros que devuelve getRecords
es siempre igual o inferior a este límite. Si no especifica este límite, getRecords
devuelve 10 MB de registros recuperados. El código de muestra que aparece a continuación establece este límite en 25 registros.
Si no se devuelven, significa que no hay registros de datos disponibles actualmente en este fragmento con el número de secuencia al que hace referencia el iterador de fragmentos. En una situación así, la aplicación debe esperar una cantidad de tiempo adecuada para los orígenes de datos del flujo. Intente obtener datos de nuevo a partir del fragmento mediante el iterador de fragmentos que ha devuelto la llamada anterior a getRecords
.
Pase la getRecordsRequest
al método getRecords
y capture el valor devuelto como un objeto getRecordsResult
. Para obtener los registros de datos, llame al método getRecords
en el objeto getRecordsResult
.
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);
GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
Para prepararse para otra llamada a getRecords
, obtenga el siguiente iterador de fragmentos desde getRecordsResult
.
shardIterator = getRecordsResult.getNextShardIterator();
Para obtener resultados óptimos, suspenda la actividad durante al menos 1 segundo (1000 milisegundos) entre las llamadas a getRecords
para evitar que se supere el límite de frecuencia de getRecords
.
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {}
Normalmente, debe llamar a getRecords
en bucle, incluso cuando recupere un solo registro en un entorno de pruebas. Una única llamada a getRecords
podría devolver una lista de registros vacía, incluso si el fragmento contiene más registros en números secuenciales posteriores. Si ocurre esto, el NextShardIterator
que se devuelve junto con la lista de registros vacía hace referencia a un número de secuencia posterior en el fragmento, y las llamadas sucesivas a getRecords
acabarán por devolver los registros. El siguiente ejemplo ilustra el uso de un bucle.
Ejemplo: getRecords
El siguiente ejemplo de código refleja las sugerencias sobre getRecords
que hemos planteado en esta sección, incluidas la realización de llamadas en bucle.
// Continuously read data records from a shard
List<Record> records;
while (true) {
// Create a new getRecordsRequest with an existing shardIterator
// Set the maximum records to return to 25
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);
GetRecordsResult result = client.getRecords(getRecordsRequest);
// Put the result into record list. The result can be empty.
records = result.getRecords();
try {
Thread.sleep(1000);
}
catch (InterruptedException exception) {
throw new RuntimeException(exception);
}
shardIterator = result.getNextShardIterator();
}
Si utiliza Kinesis Client Library, esta podría hacer varias llamadas antes de devolver los datos. Este comportamiento se debe a un diseño y no indica ningún problema con los datos KCL o con los suyos.
Adaptarse a una nueva partición
Si getRecordsResult.getNextShardIterator
devuelve null
, indica que se ha producido una división o combinación de una partición que ha implicado esta partición. Esta partición se encuentra ahora en un estado CLOSED
y se han leído todos los registros de datos disponibles de esta partición.
En este escenario, puede utilizar getRecordsResult.childShards
para obtener información sobre las nuevas particiones secundarias de la partición que se procesa y que se crearon mediante la división o la combinación. Para obtener más información, consulte ChildShard.
En el caso de una división, los dos nuevos fragmentos tienen un parentShardId
igual al ID de fragmento del fragmento que estuviera procesando anteriormente. El valor de adjacentParentShardId
para ambos fragmentos es null
.
En el caso de una fusión, el único fragmento nuevo creado por la fusión tiene un parentShardId
igual al ID del fragmento de uno de los fragmentos de origen y un adjacentParentShardId
igual al ID de fragmento del otro fragmento de origen. La aplicación ya ha leído todos los datos de uno de estos fragmentos. Este es el fragmento para el que getRecordsResult.getNextShardIterator
ha devuelto null
. Si el orden de los datos es importante en la aplicación, debe asegurarse de que esta también lea todos los datos del otro fragmento principal antes de leer datos nuevos del fragmento secundario creado por la fusión.
Si utiliza varios procesadores para recuperar los datos de la secuencia (por ejemplo, un procesador por fragmento) y se produce una división o fusión de fragmentos, debe aumentar o disminuir el número de procesadores para adaptarse a los cambios en el número de fragmentos.
Para obtener más información acerca de cómo realizar cambios en los fragmentos, incluida una explicación de los estados de los fragmentos, como CLOSED
, consulte Cambiar las particiones de un flujo.