ステップ 2: 分析アプリケーションを作成する - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 2: 分析アプリケーションを作成する

このセクションでは、Amazon Kinesis Data Analytics アプリケーションを作成し、「ステップ 1: データを準備する」でストリーミングソースとして作成した Kinesis データストリームを使用するように設定します。RANDOM_CUT_FOREST_WITH_EXPLANATION 関数を使用するアプリケーションコードを実行します。

アプリケーションを作成するには
  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. ナビゲーションペインで、[データ分析]、[アプリケーションの作成] の順に選択します。

  3. アプリケーション名と説明 (オプション) を指定して、[Create application] を選択します。

  4. [ストリーミングデータの接続] を選択し、リストから [ExampleInputStream] を選択します。

  5. [スキーマの検出] を選択して、INTEGER 列として Systolic および Diastolic が表示されていることを確認します。別のタイプがある場合は、[Edit schema] を選択し、INTEGER タイプを両方に割り当てます。

  6. [Real time analytics] の下で、[Go to SQL editor] を選択します。プロンプトが表示されたら、アプリケーションの実行を選択します。

  7. 次のコードを SQL エディタに貼り付けて、[Save and run SQL] を選択します。

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "Systolic" INTEGER, "Diastolic" INTEGER, "BloodPressureLevel" varchar(20), "ANOMALY_SCORE" DOUBLE, "ANOMALY_EXPLANATION" varchar(512)); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "Systolic" INTEGER, "Diastolic" INTEGER, "BloodPressureLevel" varchar(20), "ANOMALY_SCORE" DOUBLE, "ANOMALY_EXPLANATION" varchar(512)); -- Compute an anomaly score with explanation for each record in the input stream -- using RANDOM_CUT_FOREST_WITH_EXPLANATION CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true)); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
次のステップ

ステップ 3: 結果の検証