Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド
SQL 開発者ガイド

スライディングウィンドウ

GROUP BY を使用してレコードをグループ化する代わりに、時間ベースまたは行ベースのウィンドウを定義できます。そのためには、WINDOW 句を明示的に追加します。

この場合、ウィンドウが時間とともにスライドしながら、新しいレコードがストリームに現れると Amazon Kinesis Data Analytics が出力を発行します。Kinesis Data Analytics は、ウィンドウ内で行を処理して出力を発行します。このタイプの処理ではウィンドウが重複するだけでなく、レコードが複数のウィンドウの一部となり、ウィンドウごとに処理される場合があります。次の例では、スライディングウィンドウについて説明します。

ストリームのレコードをカウントする簡単なクエリを考えます。この例では、5 秒のウィンドウを前提としています。次のストリームの例では、新しいレコードが t1、t2、t6、t7 の時間に受信され、t8 秒では同時に 3 つのレコードを受信しています。

以下に留意してください。

  • この例では、5 秒のウィンドウを前提としています。5 秒ウィンドウは時間とともに継続的にスライドします。

  • 行がウィンドウに入力されるごとに、出力行がスライディングウィンドウによって発行されます。アプリケーションを起動してすぐは、まだ 5 秒のウィンドウが経過していなくても、ストリームで受信された新しいレコードのそれぞれに対してクエリが出力を発行します。たとえば、1 秒目と 2 秒目にレコードが現れると、クエリは出力を発行します。その後、クエリは 5 秒ウィンドウでレコードを処理します。

  • ウィンドウは時間とともにスライドします。古いレコードがウィンドウから押し出されても、その 5 秒ウィンドウに含まれるストリームに新しいレコードがない限り、クエリは出力を発行しません。

クエリが t0 に実行を開始するとします。そして次のようになります。

  1. t0 にクエリが開始されます。この時点ではレコードがないため、クエリは出力 (カウント値) を発行しません。

  2. 時間 t1 に、新しいレコードがストリームに現れ、クエリはカウント値 1 を発行します。

  3. 時間 t2 に、別のレコードが現れ、クエリはカウント値 2 を発行します。

  4. 5 秒ウィンドウは時間とともにスライドします。

    • t3 では、スライディングウィンドウは t3 から t0 です。

    • t4 (スライディングウィンドウは t4 から t0)

    • t5 には、スライディングウィンドウは t5 から –t0 です。

    この間、5 秒ウィンドウのレコードはまったく同じです。新規レコードはありません。そのため、クエリは出力を発行しません。

  5. t6 時、5 秒ウィンドウは (t6 から t1) です。クエリは、t6 で新しいレコードを検出するため、出力 2 を発行します。t1 のレコードはウィンドウ内になくなったため、カウントされません。

  6. t7 時、5 秒ウィンドウは t7 から t2 です。クエリは、t7 で新しいレコードを検出するため、出力 2 を発行します。t2 のレコードは 5 秒ウィンドウ内になくなったため、カウントされません。

  7. t8 時、5 秒ウィンドウは t8 から t3 です。クエリが 3 つの新しいレコードを検出したため、レコードカウント 5 を発行します。

要約すると、このウィンドウは固定サイズであり、時間とともにスライドします。クエリは新しいレコードが現れたときに出力を発行します。

注記

スライディングウィンドウの使用は1 時間以内にすることをお勧めします。これよりも長いウィンドウを使用する場合、通常のシステムメンテナンス後のアプリケーションの再起動に時間がかかります。これは、ソースデータを再度ストリームから読み取る必要があるためです。

以下は、WINDOW 句を使用してウィンドウを定義し集計を実行するクエリの例です。クエリが GROUP BY を指定しないため、このクエリではスライディングウィンドウの方法を使用してストリームのレコードを処理します。

例 1: 1 分のスライディングウィンドウを使用してストリームを処理する

アプリケーション内ストリームに入力する「はじめに」実習のデモストリーム、SOURCE_SQL_STREAM_001 を考えてみます。スキーマは次のとおりです。

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

1 分のスライディングウィンドウを使用して、アプリケーションで集計をコンピューティングすると仮定します。つまり、ストリームに現れる新しいレコードそれぞれについて、前の 1 分ウィンドウのレコードの集計を適用することで、アプリケーションに出力を発行させます。

以下の時間ベースのウィンドウクエリを使用できます。クエリは、WINDOW 句を使用して 1 分間隔の範囲を定義します。WINDOW 句の PARTITION BY はスライディングウィンドウ内のティッカー値でレコードをグループ化します。

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

クエリをテストするには

  1. 「使用開始」実習に従ってアプリケーションをセットアップします。

  2. アプリケーションコード内の SELECT ステートメントを前述の SELECT クエリに置き換えます。アプリケーションコードは次のようになります。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

例 2: スライディングウインドウに集計を適用するクエリ

デモストリームに対する次のクエリは、10 秒ウィンドウの各ティッカーの価格の変動パーセントの平均を返します。

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

クエリをテストするには

  1. 「使用開始」実習に従ってアプリケーションをセットアップします。

  2. アプリケーションコード内の SELECT ステートメントを前述の SELECT クエリに置き換えます。アプリケーションコードは次のようになります。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

例 3: 同じストリームの複数のスライディングウィンドウからのデータのクエリ

同じストリームに対して定義された別々のスライディングウィンドウを使用して各列値を計算し出力を発行するクエリを作成できます。

次の例では、クエリは出力ティッカー、価格、a2、a10 を発行します。また、2 行の移動平均に 10 行の移動平均を交えたティッカーシンボルについて出力を発行します。列 a2 および a10 の値は、2 行および 10 行のスライディングウィンドウから取得されます。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

デモストリームに対してこのクエリをテストするには、「例 1」で説明されているテスト手順に従います。