メニュー
Amazon Kinesis Analytics
開発者ガイド

スライディングウィンドウ

GROUP BY を使用してレコードをグループ化する代わりに、ウィンドウを定義できます (時間ベースまたは行ベース)。たとえば、明示的に WINDOW 句を追加してこれを行うことができます。この場合、ウィンドウが時間とともにスライドしながら、新しいレコードがストリームに現れると Amazon Kinesis Analytics がウィンドウ内で行を処理して出力を発行します。このタイプの処理ではウィンドウが重複することがあり、レコードが複数のウィンドウの一部となり、ウィンドウごとに処理される場合があることに注意してください。次の例はスライディングウインドウを説明します。

ストリームのレコードをカウントする簡単なクエリを考えます。5 秒のウィンドウがあるとします。次のストリームの例では、新しいレコードが t1、t2、t6、t7 の時間に受信され、t8 秒では同時に 3 つのレコードを受信しています。

以下に留意してください。

  • 5 秒のウィンドウがあるとします。5 秒ウィンドウは時間とともにスライドし続けます。

  • 行がウィンドウに入力されるごとに、出力行がスライディングウィンドウによって発行されます。アプリケーションを起動してすぐは、まだ 5 秒のウィンドウになっていなくても、最初はストリームで受信された新しいレコードのそれぞれに対してクエリが出力を発行します。たとえば、1 秒目と 2 秒目にレコードが現れると、クエリは出力を発行します。その後、クエリは 5 秒ウィンドウでレコードを処理します。

  • ウィンドウは時間とともにスライドします。古いレコードがウィンドウから押し出されても、その 5 秒ウィンドウのストリームに新しいデータもない限り、クエリは出力を発行しません。

クエリが t0 に実行を開始するとします。

  1. t0 にクエリが開始されます。この時点ではレコードがないため、クエリは出力 (カウント値) を発行しません。

  2. 時間 t1 に、新しいレコードがストリームに現れ、クエリはカウント値 1 を発行します。

  3. 時間 t2 に、別のレコードが現れ、クエリはカウント値 2 を発行します。

  4. 5 秒ウィンドウは時間とともにスライドします。

    • t3 では、スライディングウィンドウは t3 から t0 です。

    • t4 (スライディングウィンドウは t4 から t0)、そして

    • t5 には、スライディングウィンドウは t5 から t0 です。

    この間、5 秒ウィンドウのレコードはまったく同じです。新規レコードはありません。そのため、クエリは出力を発行しません。

  5. 時間 t6 には、5 秒ウィンドウは (t6 から t1) であり、t6 でクエリが新しいレコードを検出して、出力 2 を発行します。t1 のレコードはウィンドウ内になくなったため、カウントされません。

  6. 時間 t7 では、5 秒ウィンドウは t7 から t2 であり、t7 でクエリが新しいレコードを検出して出力 2 を発行します。t2 のレコードは 5 秒ウィンドウ内になくなったため、カウントされません。

  7. 時間 t8 では、5 秒ウィンドウは t8 から t3 であり、クエリが 3 つの新しいレコードを検出したため、レコードカウント 5 を発行します。

要約すると、このウィンドウは固定サイズであり、時間とともにスライドします。クエリは新しいレコードが現れたときに出力を発行します。

以下は、WINDOW 句を使用してウィンドウを定義し集計を実行するクエリの例です。クエリが GROUP BY を指定しないため、このクエリではスライディングウィンドウの方法を使用してストリームのレコードを処理します。

例 1: 1 分のスライディングウィンドウを使用してストリームを処理する

たとえば、アプリケーション内ストリームに入力する「使用開始」実習のデモストリーム、SOURCE_SQL_STREAM_001 を考えてみます。スキーマは次のとおりです。

Copy
(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

1 分のスライディングウィンドウを使用して、アプリケーションで集計をコンピューティングするとします。つまり、ストリームに現れる新しいレコードそれぞれについて、前の 1 分ウィンドウのレコードの集計を適用することで、アプリケーションに出力を発行させます。

以下の時間ベースのウィンドウクエリを使用できます。クエリは、WINDOW 句を使用して 1 分間隔の範囲を定義します。WINDOW 句の PARTITION BY はスライディングウィンドウ内のティッカー値でレコードをグループ化します。

Copy
SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

クエリをテストするには

  1. 「使用開始」実習に従ってアプリケーションをセットアップします。

  2. アプリケーションコード内の SELECT ステートメントを前述の SELECT クエリに置き換えます。アプリケーションコードは次のようになります。

    Copy
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

例 2: スライディングウインドウに集計を適用するクエリ

デモストリームに対する次のクエリは、10 秒ウィンドウの各ティッカーの価格の変動パーセントの平均を返します。

Copy
SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

クエリをテストするには

  1. 「使用開始」実習に従ってアプリケーションをセットアップします。

  2. アプリケーションコード内の SELECT ステートメントを前述の SELECT クエリに置き換えます。アプリケーションコードは次のようになります。

    Copy
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

例 3: 同じストリームの複数のスライディングウィンドウからのデータのクエリ

同じストリームに対して定義された別々のスライディングウィンドウを使用して各列値を計算し出力を発行するクエリを作成できます。

この例では、クエリは、2 行の移動平均に 10 行の移動平均を交えたティッカーシンボルについて出力を発行します (つまり、ティッカー、価格、a2、および a10)。列 a2 および a10 の値は、2 行および 10 行のスライディングウィンドウから取得されることに注意してください。

Copy
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

デモストリームに対してこのクエリをテストするには、「例 1」で説明されているテスト手順に従います。別のアプリケーション内ストリーム DESTINATION_SQL_STREAM を作成する次のアプリケーションコードを使用します。