小组排名 - Amazon Kinesis Data Analytics

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

小组排名

此函数会将 RANK() 函数应用于行的逻辑组,并且(可选)按排序顺序传送该组。

group_rank 的应用包括:

  • 对流式处理 GROUP BY 的结果进行排序。

  • 确定小组结果中的关系。

群组排名可以执行以下操作:

  • 将排名应用于指定的输入列。

  • 提供已排序或未排序的输出。

  • 允许用户为数据刷新指定非活动时间。

SQL 声明

函数属性和 DDL 将在后面的部分中介绍。

Group_Rank 的函数属性

该函数的如下操作:

  • 收集行,直到检测到行时间更改或超过指定的空闲时间限制。

  • 接受任何流媒体行集。

  • 使用包含基本 SQL 数据类型 INTEGERCHARVARCHAR 的任何列作为排名依据。

  • 按收到的顺序或所选列中值的升序或降序对输出行进行排序。

Group_Rank 的 DDL

group_rank(c cursor, rankByColumnName VARCHAR(128),    rankOutColumnName VARCHAR(128), sortOrder VARCHAR(10), outputOrder VARCHAR(10),    maxIdle INTEGER, outputMax INTEGER)  returns table(c.*, "groupRank" INTEGER)

下表中。

参数 描述
c CURSOR 到流式传输结果集

rankByColumnName

一个字符串,指定要用于对组进行排名的列。

rankOutColumnName

命名用于返回等级的列的字符串。

此字符串必须与 CREATE FUNCTION 语句的 RETURNS 子句中 groupRank 列的名称匹配。

sortOrder

控制等级分配的行顺序。

有效值如下所示:

  • 'asc' - 根据排名按升序输出。

  • 'desc'-根据等级降序。

outputOrder

控制输出的顺序。有效值如下所示:

  • 'asc' - 根据排名按升序输出。

  • 'desc'-根据等级降序。

maxIdle

持有群组进行排名的时间限制(以毫秒为单位)。

maxIdle 过期时,当前组将会释放到流中。值为 0 表示没有空闲超时。

outputMax

该函数将在给定组中输出的最大行数。

值为 0 表示没有限制。

示例

示例数据集

以下示例基于样本库存数据集,该数据集是 Amazon Kinesis Data Analytics 开发人员指南入门练习的一部分。要运行每个示例,您需要一个包含样本股票行情输入流的 Amazon Kinesis 数据分析应用程序。要了解如何创建分析应用程序和配置示例股票行情输入流,请参阅 Amazon Kinesis Data Analytics 开发者指南中的入门指南

示例股票数据集具有以下架构:

(ticker_symbol VARCHAR(4), sector VARCHAR(16), change REAL, price REAL)

示例 1:对 GROUP BY 子句的结果进行排序

在此示例中,聚合查询有一个GROUP BY子句ROWTIME,该子句将流分组为有限的行。然后,GROUP_RANK 函数对 GROUP BY 子句返回的行进行排序。

CREATE OR REPLACE STREAM "ticker_grouped" ( "group_time" TIMESTAMP, "ticker" VARCHAR(65520), "ticker_count" INTEGER); CREATE OR REPLACE STREAM "destination_sql_stream" ( "group_time" TIMESTAMP, "ticker" VARCHAR(65520), "ticker_count" INTEGER, "group_rank" INTEGER); CREATE OR REPLACE PUMP "ticker_pump" AS INSERT INTO "ticker_grouped" SELECT STREAM FLOOR(SOURCE_SQL_STREAM_001.ROWTIME TO SECOND), "TICKER_SYMBOL", COUNT(TICKER_SYMBOL) FROM SOURCE_SQL_STREAM_001 GROUP BY FLOOR(SOURCE_SQL_STREAM_001.ROWTIME TO SECOND), TICKER_SYMBOL; CREATE OR REPLACE PUMP DESTINATION_SQL_STREAM_PUMP AS INSERT INTO "destination_sql_stream" SELECT STREAM "group_time", "ticker", "ticker_count", "groupRank" FROM TABLE( GROUP_RANK( CURSOR(SELECT STREAM * FROM "ticker_grouped"), 'ticker_count', 'groupRank', 'desc', 'asc', 5, 0));

结果

上一示例输出的流与以下内容类似。

Data table showing ROWTIME, group_time, ticker, ticker_count, and group_rank columns with sample values.

操作概览

对于每个组(即具有相同行时间的行),行是从输入游标中缓冲的。行的排名要么在行数不同的行到达之后(或者在出现空闲超时时)完成。继续读取行,同时对行时间相同的行组进行排名。

在分配排名后,outputMax 参数将指定要为每个组返回的最大行数。

默认情况下,group_rank 支持列传递,如示例中所示:使用 c.* 作为标准快捷方式以指示按显示的顺序传递所有输入列。您可以改为使用表示法“c.columName”指定一个子集,以便对列进行重新排序。但是,使用特定的列名称会将 UDX 绑定到一个特定的输入集,而使用 c.* 表示法可让 UDX 处理任何输入集。

rankOutColumnName 参数将指定要用于返回排名的输出列。此列名称必须与 CREATE FUNCTION 语句的 RETURNS 子句中指定的列名称匹配。