Ejemplo: Ventana escalonada - Guía para desarrolladores de Amazon Kinesis Data Analytics

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplo: Ventana escalonada

Cuando una consulta de ventanas procesa ventanas distintas para cada clave de partición única, empezando cuando se reciben datos con la clave coincidente, esta ventana recibe el nombre de ventana escalonada. Para obtener más información, consulte Ventanas escalonadas. Este ejemplo de Amazon Kinesis Data Analytics utiliza las columnas EVENT_TIME y TICKER para crear ventanas escalonadas. La secuencia de origen contiene grupos de seis registros con idénticos valores de EVENT_TIME y TICKER que llegan en un periodo de un minuto, pero no necesariamente con el mismo valor de minuto (por ejemplo, 18:41:xx).

En este ejemplo, escribirá los siguientes registros en una secuencia de datos de Kinesis en los siguientes momentos. El script no escribe las horas en la secuencia, sino la hora a la que la aplicación introduce el registro que se escribe en el campo ROWTIME:

{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:30 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:40 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:50 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:00 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:10 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:21 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:31 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:41 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:51 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:01 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:11 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:21 ...

A continuación, se crea una aplicación de Kinesis Data Analytics en elAWS Management Console, con el flujo de datos de Kinesis como el origen de streaming. El proceso de detección lee los registros de muestra en el origen de streaming e infiere un esquema en la aplicación con dos columnas (EVENT_TIME y TICKER), tal como se muestra a continuación.


                Imagen de pantalla de la consola que muestra el esquema en la aplicación con las columnas price y ticker.

Utilice el código de aplicación con la función COUNT para crear una agregación en ventana de los datos. A continuación, inserte los datos resultantes en otra secuencia en la aplicación, tal y como se muestra en la siguiente captura de pantalla:


                Imagen de pantalla de la consola que muestra los datos resultantes en una secuencia en la aplicación.

En el siguiente procedimiento, se crea una aplicación de Kinesis Data Analytics que agrega valores en el flujo de entrada en una ventana escalonada basada en EVENT_TIME y TICKER.

Paso 1: Creación de un flujo de datos de Kinesis

Cree una secuencia de datos de Amazon Kinesis y rellene los registros como se indica a continuación:

  1. Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. Elija Data Streams (Secuencias de datos) en el panel de navegación.

  3. Elija Create Kinesis Stream (Crear secuencia de Kinesis) y, a continuación, cree una secuencia con un fragmento. Para obtener más información, consulteCreación de una secuenciaen laGuía para desarrolladores de Amazon Kinesis Data Streams.

  4. Para escribir registros en un flujo de datos de Kinesis en un entorno de producción, recomendamos utilizar Kinesis Producer Library o la API de Kinesis Data Streams. Para simplificar, en este ejemplo se utiliza el siguiente script Python para generar registros. Ejecute el código para rellenar los registros de ticker de muestra. Este código sencillo escribe continuamente un grupo de seis registros con el mismo valor aleatorio de EVENT_TIME y clave de cotización en la secuencia, en el transcurso de un minuto. Mantenga el script ejecutándose para poder generar el esquema de la aplicación en un paso posterior.

    import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { 'EVENT_TIME': event_time.isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])} def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") time.sleep(10) if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

Paso 2: Creación de la aplicación de Kinesis Data Analytics

Cree una aplicación de Kinesis Data Analytics como se indica a continuación:

  1. Abra la consola de Kinesis Data Analytics enhttps://console.aws.amazon.com/kinesisanalytics.

  2. Elija Create application (Crear aplicación), escriba el nombre de la aplicación y elija Create application (Crear aplicación).

  3. En la página de detalles de la aplicación, elija Connect streaming data (Conectar datos de streaming) para conectarse al origen.

  4. En la página Connect to source (Conectarse al origen), haga lo siguiente:

    1. Elija la secuencia que ha creado en la sección anterior.

    2. Elija Discover Schema (Detectar esquema). Espere a que la consola muestre el esquema inferido y los registros de muestra utilizados para inferir en el esquema de la secuencia en la aplicación que ha creado. El esquema inferido cuenta con dos columnas.

    3. Elija Edit Schema (Editar esquema). Cambie el Column type (Tipo de columna) de la columna EVENT_TIME a TIMESTAMP.

    4. Elija Save schema and update stream samples. Después de que la consola guarde el esquema, elija Exit (Salir).

    5. Elija Save and continue.

  5. En la página de detalles de la aplicación, elija Go to SQL editor (Ir al editor de SQL). Para iniciar la aplicación, elija Yes, start application (Sí, iniciar la aplicación) en el cuadro de diálogo que aparece.

  6. En el editor de SQL, escriba el código de la aplicación y verifique los resultados como se indica a continuación:

    1. Copie el siguiente código de la aplicación y péguelo en el editor.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
    2. Elija Save and run SQL.

      En la pestaña Real-time analytics (Análisis en tiempo real), puede ver todas las secuencias en la aplicación creadas por esta y comprobar los datos.