Janelas deslizantes - Guia do Desenvolvedor de Amazon Kinesis Data Analytics para aplicativos SQL

Para novos projetos, recomendamos que você use o novo Managed Service for Apache Flink Studio em vez do Kinesis Data Analytics para aplicativos SQL. O Managed Service for Apache Flink Studio combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicativos sofisticados de processamento de stream em minutos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Janelas deslizantes

Em vez de agrupar registros usando GROUP BY, defina uma janela baseada em horário ou em linha. Para isso, adicione uma cláusula WINDOW explícita.

Nesse caso, como a janela desliza com o tempo, o Amazon Kinesis Data Analytics emite uma saída quando novos registros aparecem no stream. O Kinesis Data Analytics emite essa saída ao processar as linhas na janela. As janelas podem se sobrepor neste tipo de processamento, e um registro pode fazer parte de várias janelas e ser processado em cada janela. O exemplo a seguir ilustra uma janela deslizante.

Considere uma consulta simples que conta registros no stream. Este exemplo assume uma janela de cinco segundos. No stream de exemplo a seguir, novos registros chegam no horário t1, t2, t6, and t7, e três registros chegam no horário t8 segundos.

Lembre-se do seguinte:

  • O exemplo assume uma janela de cinco segundos. A janela de 5 segundos desliza continuamente com o tempo.

  • Para cada linha que entra em uma janela, uma linha de saída é emitida pela janela deslizante. Logo após o aplicativo iniciar, a consulta emitirá uma saída para cada novo registro que aparecer no stream, embora uma janela de 5 segundos não tenha passado ainda. Por exemplo, a consulta emite a saída quando um registro aparece no primeiro segundo e no segundo. Mais tarde, a consulta processará registros na janela de 5 segundos.

  • As janelas deslizam com o tempo. Se um registro antigo no stream ficar fora da janela, a consulta não emitirá a saída, a menos que também haja um novo registro no stream que fique dentro da janela de 5 segundos.

Suponha que a consulta comece a ser executada em t0. Ocorrerá o seguinte:

  1. No horário t0, a consulta será iniciada. A consulta não emitirá a saída (valor da contagem), porque não haverá registros nesse horário.

  2. No momento t1, um novo registro aparece no stream e a consulta emite o valor de contagem 1.

  3. No momento t2, outro registro aparece e a consulta emite a contagem 2.

  4. A janela de 5 segundos desliza com o tempo:

    • No t3, a janela deslizante t3 para t0

    • No t4 (janela deslizante t4 para t0)

    • No t5, a janela deslizante t5 - t0

    Em todos esses horários, a janela de 5 segundos tem os mesmos registros - não há novos registros. Portanto, a consulta não emite saídas.

  5. No momento t6, a janela de 5 segundos é (t6 para t1). A consulta detecta um novo registro no t6; assim, ele emite a saída 2. O registro no t1 não está mais na janela e não conta.

  6. No momento t7, a janela de 5 segundos é t7 para t2. A consulta detecta um novo registro no t7; assim, ele emite a saída 2. O registro no t2 não está mais na janela de 5 segundos e, portanto, não é contado.

  7. No momento t8, a janela de 5 segundos é t8 para t3. A consulta detecta três novos registros e, portanto, emite a contagem de registro 5.

Resumindo, a janela tem um tamanho fixo e desliza com o tempo. A consulta emite a saída quando novos registros são exibidos.

nota

Recomendamos que você use uma janela deslizante por não mais de uma hora. Se você usar uma janela maior, o aplicativo levará mais tempo para reiniciar após a manutenção regular do sistema. Isso ocorre porque os dados de origem precisam ser lidos no fluxo novamente.

Os exemplos de consultas a seguir usam a cláusula WINDOW para definir janelas e executar agregados. Como as consultas não especificam o GROUP BY, a consulta usa a abordagem de janela deslizante para processar registros no stream.

Exemplo 1: processar um fluxo usando uma janela deslizante de um minuto

Considere o stream de demonstração no exercício de conceitos básicos que preenche o stream no aplicativo, SOURCE_SQL_STREAM_001. Este é o esquema.

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

Suponha que você queira que o aplicativo calcule os agregados usando uma janela deslizante de 1 minuto. Ou seja, para cada novo registro que aparecer no stream, você quer que o aplicativo emita uma saída aplicando agregados aos registros na janela de 1 minuto anterior.

É possível usar as seguintes consultas em janela baseadas em horário. A consulta usa a cláusula WINDOW para definir o intervalo de 1 minuto. O PARTITION BY na cláusula WINDOW agrupa os registros por valores do marcador dentro da janela deslizante.

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
Para testar a consulta
  1. Configure um aplicativo seguindo as instruções em Exercício de conceitos básicos.

  2. Substitua a instrução SELECT no código de aplicativo pela consulta SELECT anterior. O código de aplicativo resultante é o seguinte.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

Exemplo 2: agregações de aplicação de consulta em uma janela deslizante

A consulta a seguir no stream de demonstração retorna a média de percentual de alteração no preço de cada marcador em uma janela de 10 segundos.

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Para testar a consulta
  1. Configure um aplicativo seguindo as instruções em Exercício de conceitos básicos.

  2. Substitua a instrução SELECT no código de aplicativo pela consulta SELECT anterior. O código de aplicativo resultante é o seguinte.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Exemplo 3: dados de consulta de várias janelas deslizantes no mesmo stream

É possível gravar consultas para emitir saída em que cada valor da coluna que é calculado usando diferentes janelas deslizantes definidas no mesmo stream.

No exemplo a seguir, a consulta emite o marcador de saída, preço, a2 e a10. Ela emite a saída de símbolos de marcador cuja média de movimentação de duas cruza a média de movimentação de dez linhas. Os valores das colunas a2 e a10 são derivados de janelas deslizantes de duas linhas e dez linhas.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

Para testar essa consulta em relação ao stream de demonstração, siga o procedimento de teste descrito no Exemplo 1.