步驟 2:建立應用程式 - 適用於 SQL 應用程式的 Amazon Kinesis Data Analytics 開發人員指南

針對新專案,我們建議您優先選擇新的 Managed Service for Apache Flink Studio,而非 Kinesis Data Analytics for SQL 應用程式。Managed Service for Apache Flink Studio 易於使用且具備進階分析功能,可讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 2:建立應用程式

在本節建立 Amazon Kinesis Data Analytics 應用程式。然後新增輸入組態,將上一節建立的串流來源對應至應用程式內輸入串流,藉此更新應用程式。

  1. 前往 https://console.aws.amazon.com/kinesisanalytics 開啟 Managed Service for Apache Flink 主控台。

  2. 選擇建立應用程式 。此範例使用應用程式名稱 ProcessMultipleRecordTypes

  3. 在應用程式詳細資料頁面上,選擇連接串流資料來連接至來源。

  4. 連接至來源頁面,執行下列動作:

    1. 選擇您在 步驟 1:準備資料 中建立的串流。

    2. 選擇建立 IAM 角色。

    3. 等待主控台顯示推斷的結構描述和範例記錄,這些記錄可用來推斷應用程式內串流所建立的結構描述。

    4. 選擇儲存並繼續

  5. 在應用程式中樞,選擇至 SQL 編輯器。若要啟動應用程式,請在出現的對話方塊中選擇是,啟動應用程式

  6. 在 SQL 編輯器中,編寫應用程式碼並驗證結果:

    1. 請複製以下應用程式碼,然後貼到編輯器中。

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( "order_id" integer, "order_type" varchar(10), "ticker" varchar(4), "order_price" DOUBLE, "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Order'; --******************************************** --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" ("trade_id" integer, "order_id" integer, "trade_price" DOUBLE, "ticker" varchar(4), "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Trade'; --***************************************************************** --do some analytics on the Trade_Stream and Order_Stream. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ticker" varchar(4), "trade_count" integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "ticker", count(*) as trade_count FROM "Trade_Stream" GROUP BY "ticker", FLOOR("Trade_Stream".ROWTIME TO MINUTE);
    2. 選擇儲存並執行 SQL。選擇即時分析標籤,查看應用程式建立的所有應用程式內串流,並驗證資料。

後續步驟

您可以設定應用程式輸出,將結果保留至外部目的地,例如另一個 Kinesis 串流或 Firehose 資料傳遞串流。