示例:转换多个数据类型 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:转换多个数据类型

提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以通过创建一个 Kinesis Data Analytics 应用程序来处理此类流式传输源。流程如下:

  1. 首先,您将流式传输源映射到应用程序内部输入流,与所有其他 Kinesis Data Analytics 应用程序类似。

  2. 然后,在应用程序代码中编写 SQL 语句,从应用程序内部输入流中检索特定类型的行。然后,将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。

在本练习中,您具有一个可接收两种类型 (OrderTrade) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录:

Order record

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

Trade record

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

使用创建应用程序时 AWS Management Console,控制台会显示所创建的应用程序内输入流的以下推断架构。默认情况下,控制台将该应用程序内部流命名为 SOURCE_SQL_STREAM_001

控制台屏幕截图,显示格式化的应用程序内部流示例。

在保存配置时,Amazon Kinesis Data Analytics 持续从流式传输源中读取数据,并在应用程序内部流中插入行。现在,您可以对应用程序内部流中的数据进行分析。

在本示例的应用程序代码中,首先创建两个额外的应用程序内部流:Order_StreamTrade_Stream。然后,根据记录类型筛选 SOURCE_SQL_STREAM_001 流中的行,使用数据泵将这些行插入到新创建的流。有关此编码模式的信息,请参阅应用程序代码

  1. 将订单和交易行筛选到单独的应用程序内部流:

    1. 筛选 SOURCE_SQL_STREAM_001 中的订单记录,并将订单保存到 Order_Stream

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. 筛选 SOURCE_SQL_STREAM_001 中的交易记录,并将订单保存到 Trade_Stream

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. 现在,您可以对这些流进行其他分析。在本示例中,您将按股票在时长一分钟的滚动窗口中计算交易数,并将结果保存到另一个流 DESTINATION_SQL_STREAM

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    此时将显示如下结果:

    控制台屏幕截图,在 SQL 结果选项卡中显示结果。
下一个步骤

步骤 1:准备数据