步驟 2:建立應用程式 - 亞馬遜 Kinesis SQL 應用程式資料分析開發人員指南

對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 2:建立應用程式

在本節建立 Amazon Kinesis Data Analytics 應用程式,如下所示:

  • 設定應用程式輸入,以使用您在 步驟 1:準備 中建立的 Kinesis 資料串流作為串流來源。

  • 使用主控台上的異常偵測範本。

建立應用程式
  1. 按照 Kinesis Data Analytics 入門練習中的步驟 1、2 和 3 進行操作 (請參閱 步驟 3.1:建立應用程式)。

    • 在來源設定中,執行下列動作:

      • 指定您在前一節建立的串流來源。

      • 主控台推斷結構描述後,請編輯結構描述,並將 heartRate 欄類型設定為 INTEGER

        大部分的心率值都是正常的,而且探索程序很可能會將 TINYINT 類型指派給此資料欄。但是,一小部分的值顯示出高心率。如果這些高數值不符合 TINYINT 類型,Kinesis Data Analytics 會將這些資料列傳送至錯誤串流。將資料類型更新為 INTEGER,以便容納所有產生的心率資料。

    • 使用主控台上的異常偵測範本。然後,您可以更新範本程式碼,以提供適當的資料欄名稱。

  2. 提供資料欄名稱以更新應用程式碼。生成的應用程式碼如下所示 (將此代碼貼到 SQL 編輯器中):

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

  3. 在 Kinesis Data Analytics 主控台上執行 SQL 程式碼並檢閱結果:

    主控台螢幕擷取畫面會顯示即時分析標籤,並在應用程式內串流中顯示生成的資料。

後續步驟

步驟 3:設定應用程式輸出