Ejemplo: ventana de saltos mediante una marca temporal de evento - Guía para desarrolladores de aplicaciones de Amazon Kinesis Data Analytics para SQL

Para proyectos nuevos, le recomendamos que utilice el nuevo servicio gestionado para Apache Flink Studio en lugar de aplicaciones de Kinesis Data Analytics para SQL. El servicio gestionado para Apache Flink Studio combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones sofisticadas de procesamiento de flujos en cuestión de minutos.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplo: ventana de saltos mediante una marca temporal de evento

Cuando una consulta en ventana procesa cada ventana de forma que no se superpongan, la ventana se denomina ventana de saltos. Para obtener más información, consulte Ventanas de saltos de tamaño constante (Agregados utilizando GROUP BY). Este ejemplo de Amazon Kinesis Data Analytics muestra una ventana de saltos que utiliza una marca temporal de evento, que es una marca temporal creada por el usuario que se incluye en los datos de streaming. Utiliza este enfoque en lugar de solo usar ROWTIME, que es una marca temporal que crea Kinesis Data Analytics cuando la aplicación recibe el registro. Se usa una marca temporal de evento en los datos de streaming si se desea crear una agregación basada en el momento en que se produjo un evento, en lugar del momento en que lo recibió la aplicación. En este ejemplo, el valor de ROWTIME dispara la agregación cada minuto y los registros se agregan tanto por ROWTIME como por el tiempo de evento incluido.

En este ejemplo escribirá los siguientes registros en una secuencia de Amazon Kinesis. El valor de EVENT_TIME se establece en 5 segundos en el pasado, para simular el retraso de procesamiento y flujo que podría crear un retardo desde el momento en que se produjo el evento hasta el momento en que el registro se recibe en Kinesis Data Analytics.

{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65} {"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61} {"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48} {"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64} ...

A continuación, cree una aplicación de Kinesis Data Analytics en la AWS Management Console con la secuencia de datos de Kinesis como origen de streaming. El proceso de detección lee los registros de muestra en el origen de streaming e infiere un esquema en la aplicación con tres columnas (EVENT_TIME, TICKER y PRICE), tal como se muestra a continuación.


                Imagen de pantalla de la consola que muestra el esquema en la aplicación con las columnas event time, ticker y price.​

Utilice el código de aplicación con las funciones MIN y MAX para crear una agregación en ventana de los datos. A continuación, inserte los datos resultantes en otra secuencia en la aplicación, tal y como se muestra en la siguiente captura de pantalla:


                Imagen de pantalla de la consola que muestra los datos resultantes en una secuencia en la aplicación.

En el siguiente procedimiento se crea una aplicación de Kinesis Data Analytics que agrega valores en el flujo de entrada en una ventana de saltos basada en una hora de evento.

Paso 1: Crear una secuencia de datos de Kinesis

Cree una secuencia de datos de Amazon Kinesis y rellene los registros como se indica a continuación:

  1. Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. Elija Data Streams (Secuencias de datos) en el panel de navegación.

  3. Elija Create Kinesis Stream (Crear secuencia de Kinesis) y, a continuación, cree una secuencia con un fragmento. Para obtener más información, consulte Crear secuencia en la Guía para desarrolladores de Amazon Kinesis Data Streams.

  4. Para escribir registros en un flujo de datos de Kinesis en un entorno de producción, recomendamos utilizar Kinesis Client Library o la API de Kinesis Data Streams. Para simplificar, en este ejemplo se utiliza el siguiente script Python para generar registros. Ejecute el código para rellenar los registros de ticker de muestra. Este código simple escribe continuamente un registro de ticker aleatorio en el flujo. Mantenga el script ejecutándose para poder generar el esquema de la aplicación en un paso posterior.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Paso 2: Creación de una aplicación de Kinesis Data Analytics

Cree una aplicación de análisis de datos de Kinesis Data Analytics de la siguiente manera:

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/kinesisanalytics.

  2. Elija Create application (Crear aplicación), introduzca el nombre de la aplicación y elija Create application (Crear aplicación).

  3. En la página de detalles de la aplicación, elija Connect streaming data (Conectar datos de streaming) para conectarse al origen.

  4. En la página Connect to source (Conectarse al origen), haga lo siguiente:

    1. Elija la secuencia que ha creado en la sección anterior.

    2. Elija Discover Schema (Detectar esquema). Espere a que la consola muestre el esquema inferido y los registros de muestra utilizados para inferir en el esquema de la secuencia en la aplicación que ha creado. El esquema inferido cuenta con tres columnas.

    3. Elija Edit Schema (Editar esquema). Cambie el Column type (Tipo de columna) de la columna EVENT_TIME a TIMESTAMP.

    4. Elija Save schema and update stream samples. Después de que la consola guarde el esquema, elija Exit (Salir).

    5. Elija Save and continue.

  5. En la página de detalles de la aplicación, elija Go to SQL editor (Ir al editor de SQL). Para iniciar la aplicación, elija Yes, start application (Sí, iniciar la aplicación) en el cuadro de diálogo que aparece.

  6. En el editor de SQL, escriba el código de la aplicación y verifique los resultados como se indica a continuación:

    1. Copie el siguiente código de la aplicación y péguelo en el editor.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND), TICKER, MIN(PRICE) AS MIN_PRICE, MAX(PRICE) AS MAX_PRICE FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
    2. Elija Save and run SQL.

      En la pestaña Real-time analytics (Análisis en tiempo real), puede ver todas las secuencias en la aplicación creadas por esta y comprobar los datos.