スライディングウィンドウ - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

英語の翻訳が提供されている場合で、内容が矛盾する場合には、英語版がオリジナルとして取り扱われます。翻訳は機械翻訳により提供されています。

スライディングウィンドウ

GROUP BY を使用してレコードをグループ化する代わりに、時間ベースまたは行ベースのウィンドウを定義できます。そのためには、WINDOW 句を明示的に追加します。

この場合、ウィンドウが時間とともにスライドしながら、新しいレコードがストリームに現れると Amazon Kinesis Data Analytics が出力を発行します。Kinesis Data Analytics は、ウィンドウ内で行を処理して出力を発行します。このタイプの処理ではウィンドウが重複するだけでなく、レコードが複数のウィンドウの一部となり、ウィンドウごとに処理される場合があります。次の例では、スライディングウィンドウについて説明します。

ストリームのレコードをカウントする簡単なクエリを考えます。この例では、5 秒のウィンドウを前提としています。次の例のストリームでは、新しいレコードは時間tに到着します。1\t2\t6、およびt7、および3つのレコードが時間tに到着8

以下に留意してください。

  • この例では、5 秒のウィンドウを前提としています。5 秒ウィンドウは時間とともに継続的にスライドします。

  • 行がウィンドウに入力されるごとに、出力行がスライディングウィンドウによって発行されます。アプリケーションを起動してすぐは、まだ 5 秒のウィンドウが経過していなくても、ストリームで受信された新しいレコードのそれぞれに対してクエリが出力を発行します。たとえば、1 秒目と 2 秒目にレコードが現れると、クエリは出力を発行します。その後、クエリは 5 秒ウィンドウでレコードを処理します。

  • ウィンドウは時間とともにスライドします。古いレコードがウィンドウから押し出されても、その 5 秒ウィンドウに含まれるストリームに新しいレコードがない限り、クエリは出力を発行しません。

クエリが t で実行を開始するとします。0そして次のようになります。

  1. t時点0、クエリが開始されます。この時点ではレコードがないため、クエリは出力 (カウント値) を発行しません。

  2. t時点1新しいレコードがストリームに表示され、クエリがカウント値1を放射します。

  3. t時点2別のレコードが表示され、クエリはカウント2を生成します。

  4. 5 秒ウィンドウは時間とともにスライドします。

    • t時3、スライドウィンドウt3 ~t0

    • t4(スライドウィンドウt4 ~t0)

    • t時5 スライディングウィンドウt5–t0

    この間、5 秒ウィンドウのレコードはまったく同じです。新規レコードはありません。そのため、クエリは出力を発行しません。

  5. t時点65秒ウィンドウは(t6 ~t1)。 クエリは、t に 1 つの新しいレコードを検出します。6 出力2を放射します。t のレコード1 はウィンドウに表示されなくなり、カウントされません。

  6. t時点7、5秒ウィンドウはtです。7 ~t2。 クエリは、t に 1 つの新しいレコードを検出します。7 出力2を放射します。t のレコード2 は 5 秒ウィンドウに表示されていないため、カウントされません。

  7. t時点8、5秒ウィンドウはtです。8 ~t3クエリが 3 つの新しいレコードを検出したため、レコードカウント 5 を発行します。

要約すると、このウィンドウは固定サイズであり、時間とともにスライドします。クエリは新しいレコードが現れたときに出力を発行します。

注記

スライディングウィンドウの使用は1 時間以内にすることをお勧めします。これよりも長いウィンドウを使用する場合、通常のシステムメンテナンス後のアプリケーションの再起動に時間がかかります。これは、ソースデータを再度ストリームから読み取る必要があるためです。

以下は、WINDOW 句を使用してウィンドウを定義し集計を実行するクエリの例です。クエリが GROUP BY を指定しないため、このクエリではスライディングウィンドウの方法を使用してストリームのレコードを処理します。

例 1 1 分間のスライド・ウィンドウを使用したストリームの処理

アプリケーション内ストリームに自動入力される「はじめに」演習のデモストリームを検討します。 SOURCE_SQL_STREAM_001。 以下はスキーマです。

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

1 分のスライディングウィンドウを使用して、アプリケーションで集計をコンピューティングすると仮定します。つまり、ストリームに現れる新しいレコードそれぞれについて、前の 1 分ウィンドウのレコードの集計を適用することで、アプリケーションに出力を発行させます。

以下の時間ベースのウィンドウクエリを使用できます。クエリは、WINDOW 句を使用して 1 分間隔の範囲を定義します。WINDOW 句の PARTITION BY はスライディングウィンドウ内のティッカー値でレコードをグループ化します。

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

クエリをテストするには

  1. 「使用開始」実習に従ってアプリケーションをセットアップします。

  2. アプリケーションコード内の SELECT ステートメントを前述の SELECT クエリに置き換えます。アプリケーションコードは次のようになります。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

例 2 スライディング ウィンドウでの集計の適用のクエリ

デモストリームに対する次のクエリは、10 秒ウィンドウの各ティッカーの価格の変動パーセントの平均を返します。

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

クエリをテストするには

  1. 「使用開始」実習に従ってアプリケーションをセットアップします。

  2. アプリケーションコード内の SELECT ステートメントを前述の SELECT クエリに置き換えます。アプリケーションコードは次のようになります。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

例 3 同じストリーム上の複数のスライディングウィンドウからデータをクエリする

同じストリームに対して定義された別々のスライディングウィンドウを使用して各列値を計算し出力を発行するクエリを作成できます。

次の例では、クエリは出力ティッカー、価格、a2、a10 を発行します。また、2 行の移動平均に 10 行の移動平均を交えたティッカーシンボルについて出力を発行します。列 a2 および a10 の値は、2 行および 10 行のスライディングウィンドウから取得されます。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

デモストリームに対してこのクエリをテストするには、「例 1」で説明されているテスト手順に従います。