ステップ 2: アプリケーションの作成 - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 2: アプリケーションの作成

このセクションでは、Kinesis Data Analytics アプリケーションを作成します。その後、前のセクションで作成したストリーミングソースをアプリケーション内入力ストリームにマッピングする入力設定を追加して、アプリケーションを更新します。

  1. https://console.aws.amazon.com/kinesisanalytics にある Managed Service for Apache Flink コンソールを開きます。

  2. [Create application] を選択します。この例では、アプリケーション名 ProcessMultipleRecordTypes を使用します。

  3. アプリケーション詳細ページで、[ストリーミングデータの接続] を選択してソースに接続します。

  4. [ソースに接続] ページで、以下の操作を実行します。

    1. ステップ 1: データを準備する」で作成したストリームを選択します。

    2. IAM ロールの作成を選択します。

    3. 作成されたアプリケーション内ストリーム用の推測スキーマと、推測に使用されたサンプルレコードがコンソールに表示されるまで待ちます。

    4. [Save and continue] を選択します。

  5. アプリケーションハブで、[Go to SQL editor] を選択します。アプリケーションを起動するには、表示されたダイアログボックスで [はい、アプリケーションを起動します] を選択します。

  6. SQL エディタで、アプリケーションコードを作成してその結果を確認します。

    1. 次のアプリケーションコードをコピーしてエディタに貼り付けます。

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( "order_id" integer, "order_type" varchar(10), "ticker" varchar(4), "order_price" DOUBLE, "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Order'; --******************************************** --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" ("trade_id" integer, "order_id" integer, "trade_price" DOUBLE, "ticker" varchar(4), "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Trade'; --***************************************************************** --do some analytics on the Trade_Stream and Order_Stream. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ticker" varchar(4), "trade_count" integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "ticker", count(*) as trade_count FROM "Trade_Stream" GROUP BY "ticker", FLOOR("Trade_Stream".ROWTIME TO MINUTE);
    2. [Save and run SQL] を選択します。[リアルタイム分析] タブを選択して、アプリケーションで作成されたすべてのアプリケーション内ストリームを表示し、データを検証します。

次のステップ

別の Kinesis ストリームや Firehose データ配信ストリームなどの外部送信先に結果を永続化するようにアプリケーション出力を設定できます。