例: Stagger Window - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

例: Stagger Window

ウィンドウクエリがそれぞれ固有のパーティションキーごとに別々のウィンドウを処理するとき、一致するキーを持つデータが届くと、このウィンドウはずらしウィンドウと呼ばれます。詳細については、「Stagger Windows」を参照してください。この Amazon Kinesis Data Analytics 例では、EVENT_TIME 列と TICKER 列を使用してずらしウィンドウを作成しています。ソースストリームには、同じ EVENT_TIME 値と TICKER 値を持つ、6 つのレコードのグループが含まれています。これらは 1 分間以内に届きますが、必ずしも同じ分値で届くわけではありません (例: 18:41:xx)。

この例では、次のレコードを Kinesis データストリームに次のタイミングで書き込みます。スクリプトは時間をストリームに書き込みませんが、レコードがアプリケーションによって取り込まれた時間が ROWTIME フィールドに書き込まれます。

{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:30 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:40 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:50 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:00 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:10 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:21 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:31 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:41 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:51 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:01 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:11 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:21 ...

次に、AWS Management Console でKinesis Data Analyticsアプリケーションを作成します。 データストリームをストリーミングソースとして使用します。検出プロセスでストリーミングソースのサンプルレコードが読み込まれます。次のように、アプリケーション内スキーマに 2 つの列 (EVENT_TIME および TICKER) があると推察します。


                料金列とティッカー列を含むアプリケーション内スキーマを表示するコンソールのスクリーンショット。

データのウィンドウ集約を作成するには、アプリケーションコードで COUNT 関数を使用します。続いて、次のスクリーンショットに示すように、生成されたデータを別のアプリケーション内ストリームに挿入します。


                アプリケーション内ストリームに結果のデータを表示するコンソールのスクリーンショット。

次の手順では、EVENT_TIME および TICKER に基づき、ずらしウィンドウの入力ストリームに値を集約する Kinesis Data Analytics アプリケーションを作成します。

ステップ 1: Kinesis データストリームを作成する

次のように、Amazon Kinesis データストリームを作成して、レコードを追加します。

  1. AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. ナビゲーションペインで、[データストリーム] を選択します。

  3. [Kinesis ストリームの作成] を選択後、1 つのシャードがあるストリームを作成します。詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「ストリームを作成する」を参照してください。

  4. 本稼働環境の Kinesis データストリームにレコードを書き込むには、Kinesis Producer Library または Kinesis Data Streams API を使用することをお勧めします。分かりやすいように、この例では、以下の Python スクリプトを使用してレコードを生成します。サンプルのティッカーレコードを入力するには、このコードを実行します。このシンプルなコードは、同じランダムな EVENT_TIME とティッカーシンボルを持つ 6 つのレコードのグループを、1 分間に渡ってストリームに継続的に書き込みます。後のステップでアプリケーションスキーマを生成できるように、スクリプトを実行したままにしておきます。

    import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

ステップ 2: Kinesis Data Analytics アプリケーションを作成する

次のように Kinesis Data Analytics アプリケーションを作成します。

  1. https://console.aws.amazon.com/kinesisanalytics にある Managed Service for Apache Flink コンソールを開きます。

  2. [アプリケーションの作成] を選択し、アプリケーション名を入力して、[アプリケーションの作成] を選択します。

  3. アプリケーション詳細ページで、[ストリーミングデータの接続] を選択してソースに接続します。

  4. [ソースに接続] ページで、以下の操作を実行します。

    1. 前のセクションで作成したストリームを選択します。

    2. [スキーマの検出] を選択します。作成されたアプリケーション内ストリーム用の推測スキーマと、推測に使用されたサンプルレコードがコンソールに表示されるまで待ちます。推測されたスキーマには 2 つの列があります。

    3. [スキーマの編集] を選択します。[EVENT_TIME] 列 の [列のタイプ] を TIMESTAMP に変更します。

    4. [Save schema and update stream samples] を選択します。コンソールでスキーマが保存されたら、[終了] を選択します。

    5. [Save and continue] を選択します。

  5. アプリケーション詳細ページで、[SQL エディタに移動] を選択します。アプリケーションを起動するには、表示されたダイアログボックスで [はい、アプリケーションを起動します] を選択します。

  6. SQL エディタで、次のように、アプリケーションコードを作成してその結果を確認します。

    1. 次のアプリケーションコードをコピーしてエディタに貼り付けます。

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
    2. [Save and run SQL] を選択します。

      [リアルタイム分析] タブに、アプリケーションで作成されたすべてのアプリケーション内ストリームが表示され、データを検証できます。