Ejemplo: agregar resultados parciales de una consulta - Guía para desarrolladores de aplicaciones de Amazon Kinesis Data Analytics para SQL

Para proyectos nuevos, le recomendamos que utilice el nuevo servicio gestionado para Apache Flink Studio en lugar de aplicaciones de Kinesis Data Analytics para SQL. El servicio gestionado para Apache Flink Studio combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones sofisticadas de procesamiento de flujos en cuestión de minutos.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplo: agregar resultados parciales de una consulta

Si un flujo de datos de Amazon Kinesis contiene registros con una hora de evento que no coincide exactamente con la hora de recepción, una selección de resultados en una ventana de saltos contiene los registros que llegaron, pero que no se produjeron necesariamente, dentro de la ventana. En este caso, la ventana de saltos solo contiene un conjunto parcial de los resultados que desea. Existen varios métodos que puede utilizar para corregir este problema:

  • Utilizar solo una ventana de saltos y agregar los resultados parciales en el posprocesamiento a través de una base de datos o un almacén de datos mediante upserts. Este enfoque es eficaz en el procesamiento de una aplicación. Gestiona los datos con retraso de forma indefinida para operadores de agregación (sum, min, max, etc.). La desventaja de este enfoque es que debe desarrollar y mantener lógica de aplicación adicional en la capa de la base de datos.

  • Utilizar una ventana de saltos y deslizante que produzca resultados parciales de forma anticipada, pero que también siga produciendo resultados completos durante el periodo de la ventana deslizante. Este enfoque trata datos tardíos con una sobrescritura en lugar de un upsert, de modo que no es necesario añadir ninguna lógica de aplicación adicional en la capa de la base de datos. La desventaja de este enfoque es que utiliza más unidades de procesamiento (KPU) de Kinesis y produce dos resultados, que podrían no funcionar en algunos casos de uso.

Para obtener más información sobre las ventanas de saltos y deslizantes, consulte Consultas en ventana.

En el siguiente procedimiento, la agregación de la ventana de saltos produce dos resultados parciales (enviados al flujo de la aplicación CALC_COUNT_SQL_STREAM) que deben combinarse para producir un resultado final. A continuación, la aplicación produce una segunda agregación (enviada al flujo de la aplicación DESTINATION_SQL_STREAM) que combina los dos resultados parciales.

Para crear una aplicación que agregue resultados parciales utilizando una hora de evento
  1. Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. Elija Data Analytics (Análisis de datos) en el panel de navegación. Cree una aplicación de Kinesis Data Analytics como se describe en el tutorial Introducción a aplicaciones de Amazon Kinesis Data Analytics para SQL.

  3. En el editor de SQL, sustituya el código de la aplicación por el siguiente:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    La instrucción SELECT del código de la aplicación filtra las filas de SOURCE_SQL_STREAM_001 para ver cambios en las cotizaciones de los valores superiores al 1 por ciento e introduce las filas en otra secuencia en la aplicación CHANGE_STREAM mediante una bomba.

  4. Elija Save and run SQL.

La primera bomba envía un flujo a CALC_COUNT_SQL_STREAM similar al siguiente. Tenga en cuenta que el conjunto de resultados está incompleto:


                Imagen de pantalla de la consola con resultados parciales.

A continuación, la segunda bomba envía un flujo a DESTINATION_SQL_STREAM que contiene el conjunto completo de resultados:


                Imagen de pantalla de la consola con resultados completos.