Ventanas deslizantes - Guía para desarrolladores de aplicaciones de Amazon Kinesis Data Analytics para SQL

Para proyectos nuevos, le recomendamos que utilice el nuevo servicio gestionado para Apache Flink Studio en lugar de aplicaciones de Kinesis Data Analytics para SQL. El servicio gestionado para Apache Flink Studio combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones sofisticadas de procesamiento de flujos en cuestión de minutos.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ventanas deslizantes

En lugar de agrupar los registros utilizando GROUP BY, puede definir ventana basada en tiempo o filas. Para ello, añada una cláusula WINDOW explícita.

En este caso, como la ventana se mueve con el tiempo, Amazon Kinesis Data Analytics emite una salida cuando aparecen nuevos registros en la secuencia. Kinesis Data Analytics emite esta salida al procesar las filas en la ventana. Las ventanas pueden superponerse en este tipo de procesamiento, y un registro pueden formar parte de varias ventanas y ser procesado con cada ventana. El siguiente ejemplo ilustra una ventana deslizante.

Considere una simple consulta que calcule registros en la secuencia. En este ejemplo se supone una ventana de 5 segundos. En el siguiente ejemplo de secuencia, llegan nuevos registros en el tiempo t1, t2, t6 y t7 y tres registros llegan en el tiempo t8 segundos.

Tenga en cuenta lo siguiente:

  • En el ejemplo se supone una ventana de 5 segundos. La ventana de 5 segundos varía de forma continua con el tiempo.

  • Por cada fila que introduce en una ventana, la ventana deslizante emite una fila de salida. Poco después que se inicie la aplicación, verá que la consulta emite una salida para cada nuevo registro que aparece en la secuencia, a pesar de no haya pasado todavía la ventana de 5 segundos. Por ejemplo, la consulta emite una salida cuando un registro aparece en el primer segundo y segundo segundo. Después, la consulta procesa los registros en la ventana de 5 segundos.

  • Las ventanas varían con el tiempo. Si se un registro anterior en la secuencia cae fuera de la ventana, la consulta no emite una salida a menos que también exista un nuevo registro en la secuencia que entre en esa ventana de 5 segundos.

Imagine que la consulta empieza a ejecutarse en t0. Entonces, ocurriría lo siguiente:

  1. La consulta empieza en el tiempo t0. La consulta no emite una salida (valor de recuento), porque no existen registros en ese momento.

  2. En el tiempo t1, un nuevo registro aparece en la secuencia y la consulta emite el valor de recuento 1.

  3. En el tiempo t2, otro registro aparece y la consulta emite recuento 2.

  4. La ventana de 5 segundos se desliza con el tiempo:

    • En t3, la ventana deslizante abarca de t3 a t0

    • En t4 (ventana deslizante de t4 a t0)

    • En t5, la ventana deslizante abarca de t5 a t0

    Todas estas veces, la ventana de 5 segundos tiene los mismos registros, no hay nuevos registros. Por lo tanto, la consulta no emite ninguna salida.

  5. En el tiempo t6, la ventana de 5 segundos es (t6 a t1). La consulta detecta un nuevo registro en t6, por lo que emite la salida 2. El registro en t1 ya no está en la ventana y no se tiene en cuenta.

  6. En el tiempo t7, la ventana de 5 segundos es t7 a t2. La consulta detecta un nuevo registro en t7, por lo que emite la salida 2. El registro en t2 ya no está en la ventana de 5 segundos y, por lo tanto, no se tiene en cuenta.

  7. En el tiempo t8, la ventana de 5 segundos es t8 a t3. La consulta detecta tres nuevos registros, y, por lo tanto, emite el recuento de registro 5.

Resumiendo, la ventana es de tamaño fijo y varía con el tiempo. La consulta emite salidas cuando aparecen nuevos registros.

nota

Le recomendamos que utilice una ventana deslizante de no más de una hora. Si utiliza una ventana más larga, la aplicación tardará mas en reiniciarse después del mantenimiento normal del sistema. Esto se debe a que los datos de origen deben leerse de la secuencia de nuevo.

A continuación, se describe un ejemplo de consultas que utiliza la cláusula WINDOW para definir y realizar agregados de ventanas. Como las consultas no especifican GROUP BY, la consulta utiliza la ventana deslizante para procesar registros en la secuencia.

Ejemplo 1: Procesar una secuencia mediante una ventana deslizante de 1 minuto

Considere la secuencia de demostración en el ejercicio de introducción que rellena la secuencia en la aplicación SOURCE_SQL_STREAM_001. A continuación se muestra el esquema.

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

Supongamos que desea que su aplicación calcule agregados mediante una ventana deslizante de 1 minuto. Es decir, para cada nuevo registro que aparezca en la secuencia, desea que la aplicación emita una salida aplicando sumas en los registros de la anterior ventana de 1 minuto.

Puede utilizar la siguiente consulta en ventana basada en el tiempo. La consulta utiliza la cláusula WINDOW para definir el intervalo de 1 minuto. PARTITION BY en la cláusula WINDOW agrupa los registros por valor de símbolo dentro de la ventana deslizante.

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
Para probar la consulta
  1. Configure una aplicación siguiendo el Ejercicio de introducción.

  2. Sustituya la instrucción SELECT en el código de la aplicación por la consulta SELECT anterior. A continuación se muestra el código de la aplicación resultante.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

Ejemplo 2: Consulta que aplica agregados en una ventana deslizante

La siguiente consulta sobre la secuencia de demostración, devuelve el promedio del porcentaje de cambio en el precio de cada símbolo, en una ventana de 10 segundos.

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Para probar la consulta
  1. Configure una aplicación siguiendo el Ejercicio de introducción.

  2. Sustituya la instrucción SELECT en el código de la aplicación por la consulta SELECT anterior. A continuación se muestra el código de la aplicación resultante.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Ejemplo 3: Consultar datos de varias ventanas deslizantes en la misma secuencia

Puede escribir consultas de modo que emitan una salida en la cual cada valor de columna se calcule mediante diferentes ventanas deslizantes definidas para la misma secuencia.

En el siguiente ejemplo, la consulta emite el símbolo de salida, el precio, a2 y a10. Emite una salida para los símbolos de cotización cuya media móvil de dos filas cruza la media móvil de diez filas. Los valores de columna a2 y a10 se derivan de ventanas deslizantes de dos y diez filas.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

Para probar esta consulta con la secuencia de demostración, siga el procedimiento descrito en Ejemplo 1.