对于新项目,我们建议您使用适用于 Apache Flink Studio 的新托管服务,而不是应用程序版 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
滑动窗口
您可以不使用 GROUP BY
对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW
子句执行此操作。
在这种情况下,当窗口随着时间滑动时,只要流中出现新记录,Amazon Kinesis Data Analytics 就会发送输出。Kinesis Data Analytics 将会通过在此窗口中处理行来发送此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。
考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录分别于时间 t1、t2、t6 和 t7 到达,有三个记录于时间 t8 秒时到达。
记住以下内容:
-
此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。
-
对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。
-
该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。
假设查询在 t0 开始执行。那么,将出现以下情况:
-
在时间 t0,查询开始执行。查询不发送输出 (计数值),因为此时没有记录。
-
在时间 t1 处,新记录出现在流中,并且查询发送计数值 1。
-
在时间 t2,出现另一个记录,并且查询发送计数 2。
-
此 5 秒窗口随着时间滑动:
-
在 t3 处,滑动窗口为 t3 到 t0
-
在 t4 处 (滑动窗口为 t4 到 t0)
-
在 t5 处,滑动窗口为 t5 到 t0
在所有这些时间,5 秒窗口具有相同的记录——没有新记录。因此,查询不会发送任何输出。
-
-
在时间 t6 处,此 5 秒窗口为 (t6 到 t1)。查询在 t6 处检测到一个新记录,因此它发送了输出 2。t1 处的记录不再位于窗口中,计数时不考虑。
-
在时间 t7 处,此 5 秒窗口为 (t7 到 t2)。查询在 t7 处检测到一个新记录,因此它发送了输出 2。t2 处的记录不再位于 5 秒窗口中,因此计数时不考虑。
-
在时间 t8 处,此 5 秒窗口为 (t8 到 t3)。查询检测到三个新记录,因此发送了记录计数 5。
总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。
注意
我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。
以下示例查询使用 WINDOW
子句定义窗口和执行聚合。由于查询不指定 GROUP BY
,因此查询使用滑动窗口方法处理流中的记录。
示例 1:使用一个 1 分钟滑动窗口处理流
在填充应用程序内部流 SOURCE_SQL_STREAM_001
时,请考虑“入门”练习中的演示流。下面是架构。
(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)
假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。
您可以使用以下基于时间的窗口式查询。查询使用 WINDOW
子句定义 1 分钟范围间隔。WINDOW
子句中的 PARTITION BY
按照滑动窗口中的股票行情机值对记录进行分组。
SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT
查询替换应用程序代码中的SELECT
语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
示例 2:对滑动窗口应用聚合的查询
针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。
SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT
查询替换应用程序代码中的SELECT
语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
示例 3:从同一流的多个滑动窗口查询数据
您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。
在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2
和 a10
列值派生自 2 行和 10 行滑动窗口。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
要针对演示流测试此查询,请按照示例 1中介绍的测试过程操作。