Fenêtres défilantes - Manuel du développeur des applications Amazon Kinesis Data Analytics pour SQL

Pour les nouveaux projets, nous vous recommandons d’utiliser le nouveau service géré pour Apache Flink Studio plutôt que les applications Kinesis Data Analytics pour SQL. Le service géré pour Apache Flink Studio allie facilité d’utilisation et capacités analytiques avancées, ce qui vous permet de créer des applications sophistiquées de traitement des flux en quelques minutes.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Fenêtres défilantes

Au lieu de regrouper des enregistrements à l'aide de GROUP BY, vous pouvez définir une fenêtre temporelle ou basées sur les lignes. Pour ce faire, vous pouvez ajouter une clause WINDOW explicite.

Dans ce cas, au fur et à mesure que la fenêtre défile dans le temps, Amazon Kinesis Data Analytics émet une sortie lorsque de nouveaux enregistrements apparaissent dans le flux. Kinesis Data Analytics émet cette sortie en traitant les lignes de la fenêtre. Les fenêtres peuvent se chevaucher dans ce type de traitement et un enregistrement peut faire partie de plusieurs fenêtres et être traité avec chaque fenêtre. L'exemple suivant illustre une fenêtre défilante.

Prenons l'exemple d'une requête simple qui compte les enregistrements du flux. Pour cet exemple, supposons une fenêtre de 5 secondes. Dans l’exemple de flux suivant, de nouveaux enregistrements arrivent aux instants t1, t2, t6 et t7, et trois enregistrements à l’instant t8 secondes.

Gardez à l'esprit les points suivants :

  • Pour cet exemple, supposons une fenêtre de 5 secondes. Cette fenêtre défile en continu avec le temps.

  • Pour chaque ligne qui entre dans une fenêtre, une ligne de sortie est émise par la fenêtre défilante. Peu de temps après le démarrage de l'application, vous voyez la requête émettre une sortie pour chaque nouvel enregistrement qui s'affiche dans le flux, même si la fenêtre de 5 secondes ne s'est pas encore écoulée. Par exemple, la requête émet une sortie lorsqu'un enregistrement apparaît à la première et à la deuxième secondes. Puis, elle traite les enregistrements dans la fenêtre de 5 secondes.

  • Les fenêtres défilent avec le temps. Si un ancien enregistrement du flux se trouve en dehors de la fenêtre, la requête n'émet aucune sortie, à moins qu'un nouvel enregistrement apparaisse dans le flux dans cette fenêtre de 5 secondes.

Supposons que la requête commence à s’exécuter à t0. Les actions suivantes se produisent :

  1. À l’instant t0, la requête démarre. La requête n'émet aucune sortie (valeur de comptage), car il n'y a aucun enregistrement à cet instant.

  2. À l’instant t1, un nouvel enregistrement apparaît dans le flux et la requête émet la valeur de comptage 1.

  3. À l’instant t2, un autre enregistrement apparaît, et la requête émet le nombre 2.

  4. La fenêtre de 5 secondes défile avec le temps :

    • À t3, la fenêtre défilante t3 à t0

    • À t4 (fenêtre défilante t4 à t0)

    • À t5, la fenêtre défilante t5 à t0

    À tous ces instants, la fenêtre de 5 secondes comporte les mêmes enregistrement, sil n’y a aucun nouvel enregistrement. La requête n'émet donc aucune sortie.

  5. À l’instant t6, la fenêtre de 5 secondes est (t6 à t1). La requête détecte un nouvel enregistrement à l’instant t6 et émet donc la sortie 2. L’enregistrement à t1 n’est plus dans la fenêtre et n’est donc pas comptabilisé.

  6. À l’instant t7, la fenêtre de 5 secondes est t7 à t2. La requête détecte un nouvel enregistrement à l’instant t7 et émet donc la sortie 2. L’enregistrement à t2 n’est plus dans la fenêtre de 5 secondes et n’est donc pas comptabilisé.

  7. À l’instant t8, la fenêtre de 5 secondes est t8 à t3. La requête détecte trois nouveaux enregistrements et comptabilise donc 5 enregistrements.

Pour résumer, la fenêtre est de taille fixe et défile avec le temps. La requête émet une sortie lorsque de nouveaux enregistrements apparaissent.

Note

Nous vous recommandons d'utiliser une fenêtre coulissante d'une heure maximum. Si vous utilisez une fenêtre plus longue, le redémarrage de l'application sera plus long après la maintenance standard du système. En effet, la source de données doit de nouveau être lue à partir du flux.

Voici des exemples de requêtes qui utilisent la clause WINDOW pour définir des fenêtres et effectuer des regroupements. Comme les requêtes ne spécifient pas GROUP BY, elles utilisent l'approche de fenêtre défilante pour traiter les enregistrements du flux.

Exemple 1 : Traitement d'un flux à l'aide d'une fenêtre défilante de 1 minute

Prenons le flux de démonstration de l'exercice de mise en route qui remplit le flux intégré à l'application SOURCE_SQL_STREAM_001. Voici le schéma.

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

Supposons que vous vouliez que votre application effectue des regroupements à l'aide d'une fenêtre défilante de 1 minute. En d'autres termes, pour chaque nouvel enregistrement qui apparaît dans le flux, vous souhaitez que l'application émette une sortie en appliquant des regroupements aux enregistrements de la fenêtre de 1 minute précédente.

Vous pouvez utiliser la requête à fenêtres temporelle suivante. La requête utilise la clause WINDOW pour définir l'intervalle de 1 minute. PARTITION BY dans la clause WINDOW regroupe les enregistrements par valeurs de symbole boursier dans la fenêtre défilante.

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
Pour tester la requête
  1. Configurez une application en suivant l'exercice de mise en route.

  2. Remplacez l'instruction SELECT dans le code d'application par la requête SELECT précédente. Le code d'application résultant est le suivant.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

Exemple 2 : Requête appliquant des regroupements sur une fenêtre défilante

La requête suivante sur le flux de démonstration retourne la variation moyenne en pourcentage du prix de chaque symbole boursier dans une fenêtre de 10 secondes.

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Pour tester la requête
  1. Configurez une application en suivant l'exercice de mise en route.

  2. Remplacez l'instruction SELECT dans le code d'application par la requête SELECT précédente. Le code d'application résultant est le suivant.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Exemple 3 : Interrogation des données à partir de plusieurs fenêtres défilantes sur le même flux

Vous pouvez écrire des requêtes pour émettre une sortie dans laquelle chaque valeur de colonne est calculée à l'aide de différentes fenêtres défilantes définies sur le même flux.

Dans l'exemple suivant, la requête émet une sortie pour le symbole, le prix, a2 et a10. Il émet une sortie pour les symboles boursiers dont la moyenne mobile sur deux lignes croise la moyenne mobile sur dix lignes. Les valeurs de colonne a2 et a10 proviennent de fenêtres défilantes à deux et dix lignes.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

Pour tester cette requête sur le flux de démonstration, suivez la procédure de test décrite dans Exemple 1.