本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
流的窗口式聚合
为了说明窗口式聚合在 Amazon Kinesis Data Streams 中的工作原理,假定下表中的数据将流经一个名为 WEATHERSTREAM 的流。
ROWTIME | CITY | TEMP |
---|---|---|
2018-11-01 01:00:00.0 |
丹佛 |
29 |
2018-11-01 01:00:00.0 |
安克雷奇 |
2 |
2018-11-01 06:00:00.0 |
迈阿密 |
65 |
2018-11-01 07:00:00.0 |
丹佛 |
32 |
2018-11-01 09:00:00.0 |
安克雷奇 |
9 |
2018-11-01 13:00:00.0 |
丹佛 |
50 |
2018-11-01 17:00:00.0 |
安克雷奇 |
10 |
2018-11-01 18:00:00.0 |
迈阿密 |
71 |
2018-11-01 19:00:00.0 |
丹佛 |
43 |
2018-11-02 01:00:00.0 |
安克雷奇 |
4 |
2018-11-02 01:00:00.0 |
丹佛 |
39 |
2018-11-02 07:00:00.0 |
丹佛 |
46 |
2018-11-02 09:00:00.0 |
安克雷奇 |
3 |
2018-11-02 13:00:00.0 |
丹佛 |
56 |
2018-11-02 17:00:00.0 |
安克雷奇 |
2 |
2018-11-02 19:00:00.0 |
丹佛 |
50 |
2018-11-03 01:00:00.0 |
丹佛 |
36 |
2018-11-03 01:00:00.0 |
安克雷奇 |
1 |
假设您要查找 24 小时期间内记录的全球(无论是哪个城市)的最低和最高温度(在任意给定读数之前)。为此,您需要定义 RANGE INTERVAL '1' DAY PRECEDING
的一个窗口,并在 MIN
和 MAX
分析函数的 OVER
子句中使用它:
SELECT STREAM ROWTIME, MIN(TEMP) OVER W1 AS WMIN_TEMP, MAX(TEMP) OVER W1 AS WMAX_TEMP FROM WEATHERSTREAM WINDOW W1 AS ( RANGE INTERVAL '1' DAY PRECEDING );
结果
ROWTIME | WMIN_TEMP | WMAX_TEMP |
---|---|---|
2018-11-01 01:00:00.0 |
29 |
29 |
2018-11-01 01:00:00.0 |
2 |
29 |
2018-11-01 06:00:00.0 |
2 |
65 |
2018-11-01 07:00:00.0 |
2 |
65 |
2018-11-01 09:00:00.0 |
2 |
65 |
2018-11-01 13:00:00.0 |
2 |
65 |
2018-11-01 17:00:00.0 |
2 |
65 |
2018-11-01 18:00:00.0 |
2 |
71 |
2018-11-01 19:00:00.0 |
2 |
71 |
2018-11-02 01:00:00.0 |
2 |
71 |
2018-11-02 01:00:00.0 |
2 |
71 |
2018-11-02 07:00:00.0 |
4 |
71 |
2018-11-02 09:00:00.0 |
3 |
71 |
2018-11-02 13:00:00.0 |
3 |
71 |
2018-11-02 17:00:00.0 |
2 |
71 |
2018-11-02 19:00:00.0 |
2 |
56 |
2018-11-03 01:00:00.0 |
2 |
56 |
2018-11-03 01:00:00.0 |
1 |
56 |
现在,假定您要查找在 24 小时期间内记录的在任意给定读数之前的最低、最高和平均温度(按城市划分)。为此,您可以向窗口规范添加针对 CITY
的 PARTITION BY
子句,并向选择列表添加针对同一个窗口的 AVG
分析函数:
SELECT STREAM ROWTIME, CITY, MIN(TEMP) over W1 AS WMIN_TEMP, MAX(TEMP) over W1 AS WMAX_TEMP, AVG(TEMP) over W1 AS WAVG_TEMP FROM AGGTEST.WEATHERSTREAM WINDOW W1 AS ( PARTITION BY CITY RANGE INTERVAL '1' DAY PRECEDING );
结果
ROWTIME | CITY | WMIN_TEMP | WMAX_TEMP | WAVG_TEMP |
---|---|---|---|---|
2018-11-01 01:00:00.0 |
丹佛 |
29 |
29 |
29 |
2018-11-01 01:00:00.0 |
安克雷奇 |
2 |
2 |
2 |
2018-11-01 06:00:00.0 |
迈阿密 |
65 |
65 |
65 |
2018-11-01 07:00:00.0 |
丹佛 |
29 |
32 |
30 |
2018-11-01 09:00:00.0 |
安克雷奇 |
2 |
9 |
5 |
2018-11-01 13:00:00.0 |
丹佛 |
29 |
50 |
37 |
2018-11-01 17:00:00.0 |
安克雷奇 |
2 |
10 |
7 |
2018-11-01 18:00:00.0 |
迈阿密 |
65 |
71 |
68 |
2018-11-01 19:00:00.0 |
丹佛 |
29 |
50 |
38 |
2018-11-02 01:00:00.0 |
安克雷奇 |
2 |
10 |
6 |
2018-11-02 01:00:00.0 |
丹佛 |
29 |
50 |
38 |
2018-11-02 07:00:00.0 |
丹佛 |
32 |
50 |
42 |
2018-11-02 09:00:00.0 |
安克雷奇 |
3 |
10 |
6 |
2018-11-02 13:00:00.0 |
丹佛 |
39 |
56 |
46 |
2018-11-02 17:00:00.0 |
安克雷奇 |
2 |
10 |
4 |
2018-11-02 19:00:00.0 |
丹佛 |
39 |
56 |
46 |
2018-11-03 01:00:00.0 |
丹佛 |
36 |
56 |
45 |
2018-11-03 01:00:00.0 |
安克雷奇 |
1 |
4 |
2 |
行时间边界和窗口式聚合示例
以下是窗口式聚合查询的一个示例:
SELECT STREAM ROWTIME, ticker, amount, SUM(amount) OVER ( PARTITION BY ticker RANGE INTERVAL '1' HOUR PRECEDING) AS hourlyVolume FROM Trades
因为这是对流的查询,所以行一进入就会弹出此查询。例如,给定以下输入:
Trades: IBM 10 10 10:00:00 Trades: ORCL 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 10:25:00 Trades: IBM 30 11:05:00 Trades.bound: 11:10:00
在此示例中,输出如下:
Trades: IBM 10 10 10:00:00 Trades: ORCL 20 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 35 10:25:00 Trades: IBM 30 30 11:05:00 Trades.bound: 11:10:00
这些行仍然在后台逗留一个小时,因此第二个 ORCL 行输出的总计为 35;但原始 IBM 交易落在“上一个小时”窗口外,因此它未包含在 IBM 总计中。
示例
有些业务问题似乎需要对流的整个历史进行总计,但这通常无法计算。但是,此类业务问题通常可以通过查看最后一天、最后一小时或最近 N 条记录来解决。此类记录的集合称为窗口化聚合。
它们在流数据库中易于计算,可以用 ANSI (SQL:2008) 标准 SQL 表示,如下所示:
SELECT STREAM ticker, avg(price) OVER lastHour AS avgPrice, max(price) OVER lastHour AS maxPrice FROM Bids WINDOW lastHour AS ( PARTITION BY ticker RANGE INTERVAL '1' HOUR PRECEDING)
注意
Interval_clause
必须属于以下合适的类型之一:
-
包含 ROWS 的整数文字
-
数字列上的 RANGE 的数字值
-
范围超过 a 的间隔 date/time/timestamp