步骤 2:创建应用程序 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 2:创建应用程序

在此部分中,您创建一个 Kinesis Data Analytics 应用程序。然后,通过添加将您在前一部分中创建的流式传输源映射到应用程序内部输入流的输入配置,您可以更新应用程序。

  1. 打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 选择创建应用程序。此示例使用应用程序名称 ProcessMultipleRecordTypes

  3. 在应用程序详细信息页面上,选择 连接流数据,以连接到源。

  4. 连接到源 页面上,执行以下操作:

    1. 选择在步骤 1:准备数据中创建的流。

    2. 选择以创建 IAM 角色。

    3. 等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。

    4. 选择 保存并继续

  5. 在应用程序中心上,选择 Go to SQL editor。要启动应用程序,请在显示的对话框中选择 是,启动应用程序

  6. 在 SQL 编辑器中编写应用程序代码并确认结果:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( "order_id" integer, "order_type" varchar(10), "ticker" varchar(4), "order_price" DOUBLE, "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Order'; --******************************************** --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" ("trade_id" integer, "order_id" integer, "trade_price" DOUBLE, "ticker" varchar(4), "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Trade'; --***************************************************************** --do some analytics on the Trade_Stream and Order_Stream. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ticker" varchar(4), "trade_count" integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "ticker", count(*) as trade_count FROM "Trade_Stream" GROUP BY "ticker", FLOOR("Trade_Stream".ROWTIME TO MINUTE);
    2. 选择 保存并运行 SQL。选择 Real-time analytics (实时分析) 选项卡可查看应用程序已创建的所有应用程序内部流并验证数据。

下一个步骤

您可以将应用程序输出配置为将结果保存到外部目标,例如另一个 Kinesis 流或 Firehose 数据传输流。