Realizar operaciones básicas en Kinesis Data Stream con la AWS CLI - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Realizar operaciones básicas en Kinesis Data Stream con la AWS CLI

En esta sección se describe el uso básico de un flujo de datos de Kinesis desde la línea de comandos mediante la AWS CLI. Asegúrese de estar familiarizado con los conceptos que se abordan en Terminología y conceptos de Amazon Kinesis Data Streams.

nota

Una vez creado un flujo, su cuenta generará gastos nominales por el uso de Kinesis Data Streams, ya que Kinesis Data Streams no está disponible en el nivel gratuito de AWS. Cuando haya terminado con este tutorial, elimine sus recursos de AWS para dejar de incurrir en cargos. Para obtener más información, consulte Paso 4: Eliminación.

Paso 1: Crear una secuencia

El primer paso es crear una secuencia y verificar que se haya creado correctamente. Utilice el siguiente comando para crear una secuencia llamada "Foo":

aws kinesis create-stream --stream-name Foo

A continuación, escriba el siguiente comando para comprobar el progreso de creación de la secuencia:

aws kinesis describe-stream-summary --stream-name Foo

Debería obtener un resultado similar al siguiente ejemplo:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

En este ejemplo, la secuencia tiene el estado CREATING, lo que significa que aún no está lista para su uso. Compruébelo de nuevo en unos minutos, y debería ver un resultado parecido al siguiente ejemplo:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Este resultado contiene también información de la que no debería preocuparse para el objetivo de este tutorial. El elemento principal, por el momento, es "StreamStatus": "ACTIVE", que le indica que la secuencia está lista para ser utilizada, y la información en el fragmento único que ha solicitado. También puede verificar la existencia de su nuevo secuencia mediante el comando list-streams, tal y como se muestra aquí:

aws kinesis list-streams

Salida:

{ "StreamNames": [ "Foo" ] }

Paso 2: Insertar un registro

Ahora que ya tiene una secuencia activa, está listo para insertar algunos datos. En este tutorial, utilizará el comando más sencillo posible, put-record, que inserta un único registro de datos que contiene el texto "testdata" en la secuencia:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Este comando, si se ejecuta correctamente, dará como resultado algo similar a lo del siguiente ejemplo:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

¡Enhorabuena, acaba de agregar datos a una secuencia! A continuación verá cómo obtener datos a partir de la secuencia.

Paso 3: Obtener el registro

GetShardIterator

Antes de poder obtener datos de la secuencia, necesita obtener el iterador de fragmentos para el fragmento que le interese. Un iterador de fragmentos representa la posición de la secuencia y el fragmento a partir de la cual realizará la lectura el consumidor (en este caso, el comando get-record). Utilizará el comando get-shard-iterator, tal y como se indica a continuación:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

Recuerde que los comandos de aws kinesis utilizan la API de Kinesis Data Streams, por lo que si le interesa alguno de los parámetros que se muestran, puede obtener más información sobre ellos en el tema de referencia de la API GetShardIterator. Una ejecución correcta se traducirá en un resultado parecido al siguiente ejemplo (desplace la ventana hacia los lados para leer el resultado completo):

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

Esta larga cadena de caracteres aparentemente aleatorios es el iterador de fragmentos (el suyo será diferente). Tendrá que copiar y pegar el iterador de fragmentos en el comando "get" que se muestra a continuación. Los iteradores de fragmentos tienen una vida útil de 300 segundos, un tiempo que debería ser suficiente para que pueda copiar y pegar el iterador de fragmentos en el siguiente comando. Tenga en cuenta que tendrá que eliminar las líneas nuevas de su iterador de fragmentos antes de pegarlo en el siguiente comando. Si recibe un mensaje de error que informa de que el iterador de fragmentos ya no es válido, solo tiene que ejecutar de nuevo el comando get-shard-iterator.

GetRecords

El comando get-records obtiene los datos del flujo y llama a GetRecords en la API de Kinesis Data Streams. El iterador de fragmentos especifica la posición del fragmento desde la que quiera empezar a leer los registros de datos de forma secuencial. Si no hay registros disponibles en la parte del fragmento a la que señala el iterador, GetRecords devolverá una lista vacía. Tenga en cuenta que puede necesitar varias llamadas para dar con una parte del fragmento que contenga registros.

En el siguiente ejemplo del comando get-records (desplace la ventana hacia los lados para ver el comando completo):

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Si ejecuta este tutorial a partir de un procesador de comandos de tipo Unix, como bash, puede automatizar la adquisición del iterador de fragmentos utilizando un comando anidado, como este (desplace la ventana hacia los lados para ver el comando completo):

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Si ejecuta este tutorial a partir de un sistema compatible con PowerShell, puede automatizar la adquisición del iterador de fragmentos utilizando un comando como este (desplace la ventana hacia los lados para ver el comando completo):

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

El resultado satisfactorio del comando get-records solicitará registros de su secuencia para el fragmento especificado al obtener el iterador de fragmentos, como en el siguiente ejemplo (desplace la ventana hacia los lados para leer el resultado completo):

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Tenga en cuenta que get-records se describe anteriormente como una solicitud, lo que significa que puede recibir cero o más registros incluso cuando haya registros en la secuencia, y los registros recibidos pueden no ser todos los que se encuentren actualmente en la secuencia. Esto es perfectamente normal, y el código de producción sondeará la secuencia en busca de registros a intervalos apropiados (esta velocidad de sondeo variará en función de los requisitos de diseño específicos de su aplicación).

Lo primero de lo que se dará cuenta acerca de su registro en esta parte del tutorial es que los datos parecen ser inútiles: no se tratará de un texto claro como el testdata que le enviamos. Esto se debe a la forma en la que put-record utiliza la codificación Base64 para permitirle enviar datos binarios. Sin embargo, la compatibilidad con Kinesis Data Streams de la AWS CLI no incluye la descodificación Base64, porque la descodificación Base64 aplicada a contenido binario sin procesar que se envía a stdout puede producir comportamientos no deseados y problemas de seguridad potenciales en determinadas plataformas y terminales. Si utiliza un descodificador Base64 (por ejemplo, https://www.base64decode.org/) para descodificar manualmente dGVzdGRhdGE= verá que, en realidad, es testdata. Esto es suficiente para los objetivos de este tutorial porque, en la práctica, la AWS CLI no suele utilizarse para consumir datos, sino más bien para monitorear el estado de la secuencia y obtener información, como se ha mostrado anteriormente (describe-stream y list-streams). En futuros tutoriales le mostraremos cómo crear aplicaciones consumidoras de calidad mediante Kinesis Client Library (KCL), en las que los procesos relacionados con Base64 se realizan en su nombre. Para más información acerca de KCL, consulte Desarrollo de consumidores personalizados con rendimiento compartido mediante KCL.

El comando get-records no siempre devolverá todos los registros en la secuencia/fragmento especificado. Si ocurre esto, use el NextShardIterator del último resultado para obtener el siguiente conjunto de registros. Por lo tanto, si se estaban insertando más datos en la secuencia (la situación normal en aplicaciones de producción), podría mantener el sondeo de datos cada vez con get-records. Sin embargo, si no llama a get-records utilizando el siguiente iterador de fragmentos dentro del plazo de vida útil del iterador (300 segundos), obtendrá un mensaje de error y tendrá que utilizar el comando get-shard-iterator para obtener un nuevo iterador de fragmentos.

Además, en este resultado también se incluye MillisBehindLatest, que es el número de milisegundos a los que se encuentra la respuesta de la operación GetRecords del extremo del flujo, lo que indica el retraso de la aplicación consumidora con respecto al momento actual. Un valor de cero indica que el procesamiento de registros está actualizado y que no hay nuevos registros para procesar en este momento. En el caso de este tutorial, es posible que vea un número bastante grande si se ha ido tomando el tiempo de ir leyendo sobre la marcha. Eso no supone ningún problema. De manera predeterminada, los registros de datos permanecen en un flujo durante 24 horas, y podrá recuperarlos. Este periodo de tiempo se denomina periodo de retención y se puede configurar para durar hasta 365 días.

Tenga en cuenta que un resultado satisfactorio de get-records siempre tendrá un NextShardIterator, aunque no haya más registros actualmente en la secuencia. Este es un modelo de sondeo que asume que un productor puede insertar más registros en la secuencia en cualquier momento determinado. Aunque puede escribir sus propias rutinas de sondeo, si utiliza la KCL mencionada anteriormente para el desarrollo de aplicaciones consumidoras, este sondeo se realizará automáticamente.

Si llama a get-records hasta que no haya más registros en la secuencia y el fragmento a los que está recurriendo, verá resultados con registros vacíos similares a los del siguiente ejemplo (desplace la ventana hacia los lados para ver el resultado completo):

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Paso 4: Eliminación

Por último, tendrá que eliminar su secuencia para liberar recursos y evitar cargos no deseados en su cuenta, como hemos advertido anteriormente. Haga esto cada vez que haya creado una secuencia y no lo vaya a usar, ya que los cargos se acumulan por cada secuencia, independientemente de si inserta o extrae datos de él o no. El comando de limpieza es sencillo:

aws kinesis delete-stream --stream-name Foo

Si se completa correctamente, no se obtienen resultados. Por eso, puede que prefiera usar describe-stream para comprobar el progreso de la eliminación:

aws kinesis describe-stream-summary --stream-name Foo

Si ejecuta este comando inmediatamente después del comando eliminado, probablemente verá un resultado que en parte es parecido al siguiente ejemplo:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Tras eliminar por completo la secuencia, describe-stream devolverá un error del tipo "no encontrado":

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.