流的窗口式聚合 - Amazon Kinesis Data Analytics SQL 参考

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

流的窗口式聚合

为了说明窗口式聚合在 Amazon Kinesis Data Streams 中的工作原理,假定下表中的数据将流经一个名为 WEATHERSTREAM 的流。

ROWTIME CITY TEMP

2018-11-01 01:00:00.0

丹佛

29

2018-11-01 01:00:00.0

安克雷奇

2

2018-11-01 06:00:00.0

迈阿密

65

2018-11-01 07:00:00.0

丹佛

32

2018-11-01 09:00:00.0

安克雷奇

9

2018-11-01 13:00:00.0

丹佛

50

2018-11-01 17:00:00.0

安克雷奇

10

2018-11-01 18:00:00.0

迈阿密

71

2018-11-01 19:00:00.0

丹佛

43

2018-11-02 01:00:00.0

安克雷奇

4

2018-11-02 01:00:00.0

丹佛

39

2018-11-02 07:00:00.0

丹佛

46

2018-11-02 09:00:00.0

安克雷奇

3

2018-11-02 13:00:00.0

丹佛

56

2018-11-02 17:00:00.0

安克雷奇

2

2018-11-02 19:00:00.0

丹佛

50

2018-11-03 01:00:00.0

丹佛

36

2018-11-03 01:00:00.0

安克雷奇

1

假设您要查找 24 小时期间内记录的全球(无论是哪个城市)的最低和最高温度(在任意给定读数之前)。为此,您需要定义 RANGE INTERVAL '1' DAY PRECEDING 的一个窗口,并在 MINMAX 分析函数的 OVER 子句中使用它:

SELECT STREAM        ROWTIME,        MIN(TEMP) OVER W1 AS WMIN_TEMP,        MAX(TEMP) OVER W1 AS WMAX_TEMP FROM WEATHERSTREAM WINDOW W1 AS (    RANGE INTERVAL '1' DAY PRECEDING );

结果

ROWTIME WMIN_TEMP WMAX_TEMP

2018-11-01 01:00:00.0

29

29

2018-11-01 01:00:00.0

2

29

2018-11-01 06:00:00.0

2

65

2018-11-01 07:00:00.0

2

65

2018-11-01 09:00:00.0

2

65

2018-11-01 13:00:00.0

2

65

2018-11-01 17:00:00.0

2

65

2018-11-01 18:00:00.0

2

71

2018-11-01 19:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 07:00:00.0

4

71

2018-11-02 09:00:00.0

3

71

2018-11-02 13:00:00.0

3

71

2018-11-02 17:00:00.0

2

71

2018-11-02 19:00:00.0

2

56

2018-11-03 01:00:00.0

2

56

2018-11-03 01:00:00.0

1

56

现在,假定您要查找在 24 小时期间内记录的在任意给定读数之前的最低、最高和平均温度(按城市划分)。为此,您可以向窗口规范添加针对 CITYPARTITION BY 子句,并向选择列表添加针对同一个窗口的 AVG 分析函数:

SELECT STREAM        ROWTIME,        CITY,        MIN(TEMP) over W1 AS WMIN_TEMP,        MAX(TEMP) over W1 AS WMAX_TEMP,        AVG(TEMP) over W1 AS WAVG_TEMP FROM AGGTEST.WEATHERSTREAM WINDOW W1 AS (        PARTITION BY CITY        RANGE INTERVAL '1' DAY PRECEDING );

结果

ROWTIME CITY WMIN_TEMP WMAX_TEMP WAVG_TEMP

2018-11-01 01:00:00.0

丹佛

29

29

29

2018-11-01 01:00:00.0

安克雷奇

2

2

2

2018-11-01 06:00:00.0

迈阿密

65

65

65

2018-11-01 07:00:00.0

丹佛

29

32

30

2018-11-01 09:00:00.0

安克雷奇

2

9

5

2018-11-01 13:00:00.0

丹佛

29

50

37

2018-11-01 17:00:00.0

安克雷奇

2

10

7

2018-11-01 18:00:00.0

迈阿密

65

71

68

2018-11-01 19:00:00.0

丹佛

29

50

38

2018-11-02 01:00:00.0

安克雷奇

2

10

6

2018-11-02 01:00:00.0

丹佛

29

50

38

2018-11-02 07:00:00.0

丹佛

32

50

42

2018-11-02 09:00:00.0

安克雷奇

3

10

6

2018-11-02 13:00:00.0

丹佛

39

56

46

2018-11-02 17:00:00.0

安克雷奇

2

10

4

2018-11-02 19:00:00.0

丹佛

39

56

46

2018-11-03 01:00:00.0

丹佛

36

56

45

2018-11-03 01:00:00.0

安克雷奇

1

4

2

行时间边界和窗口式聚合示例

以下是窗口式聚合查询的一个示例:

SELECT STREAM ROWTIME, ticker, amount, SUM(amount)    OVER (        PARTITION BY ticker        RANGE INTERVAL '1' HOUR PRECEDING) AS hourlyVolume FROM Trades

因为这是对流的查询,所以行一进入就会弹出此查询。例如,给定以下输入:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 10:25:00 Trades: IBM 30 11:05:00 Trades.bound: 11:10:00

在此示例中,输出如下:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 35 10:25:00 Trades: IBM 30 30 11:05:00 Trades.bound: 11:10:00

这些行仍然在后台逗留一个小时,因此第二个 ORCL 行输出的总计为 35;但原始 IBM 交易落在“上一个小时”窗口外,因此它未包含在 IBM 总计中。

示例

有些业务问题似乎需要对流的整个历史进行总计,但这通常无法计算。但是,此类业务问题通常可以通过查看最后一天、最后一小时或最近 N 条记录来解决。此类记录的集合称为窗口化聚合

它们在流数据库中易于计算,可以用 ANSI (SQL:2008) 标准 SQL 表示,如下所示:

SELECT STREAM ticker,  avg(price) OVER lastHour AS avgPrice,      max(price) OVER lastHour AS maxPrice   FROM Bids   WINDOW lastHour AS  (      PARTITION BY ticker      RANGE INTERVAL '1' HOUR PRECEDING)

注意

Interval_clause 必须属于以下合适的类型之一:

  • 包含 ROWS 的整数文字

  • 数字列上的 RANGE 的数字值

  • 范围超过 a 的间隔 date/time/timestamp