Exemple : Transformation de plusieurs types de données - Manuel du développeur des applications Amazon Kinesis Data Analytics pour SQL

Pour les nouveaux projets, nous vous recommandons d’utiliser le nouveau service géré pour Apache Flink Studio plutôt que les applications Kinesis Data Analytics pour SQL. Le service géré pour Apache Flink Studio allie facilité d’utilisation et capacités analytiques avancées, ce qui vous permet de créer des applications sophistiquées de traitement des flux en quelques minutes.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple : Transformation de plusieurs types de données

Une exigence commune des applications d'extraction, de transformation et de chargement (ETL) est de pouvoir traiter plusieurs types d'enregistrement sur une source de diffusion. Vous pouvez créer des applications Kinesis Data Analytics pour traiter ces types de sources de streaming. Procédez comme suit :

  1. D’abord, vous mappez la source de streaming à un flux d’entrée intégré à l’application, similaire à toutes les autres applications Kinesis Data Analytics.

  2. Ensuite, dans votre code d'application, vous écrivez des instructions SQL pour récupérer des lignes de types spécifiques depuis le flux d'entrée intégré à l'application. Puis vous les insérez dans des flux intégrés à l'application distincts. (Vous pouvez créer des flux intégrés à l'application supplémentaires dans votre code d'application.)

Dans cet exercice, vous disposez d'une source de diffusion qui reçoit des enregistrements de deux types (Order et Trade). Il s'agit d'ordres de bourse et des transactions correspondantes. Pour chaque ordre, il peut y avoir zéro ou plusieurs transactions. Des exemples d'enregistrements de chaque type sont illustrés ci-après :

Enregistrement d'ordre

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

Enregistrement de transaction

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

Lorsque vous créez une application à l'aide de AWS Management Console, la console affiche le schéma déduit suivant pour le flux d'entrée intégré à l'application créé. Par défaut, la console nomme ce flux intégré à l'application SOURCE_SQL_STREAM_001.

Capture d'écran de la console montrant l'exemple de flux intégré à l'application formaté.

Lorsque vous enregistrez la configuration, Amazon Kinesis Data Analytics lit en continu les données de la source de streaming et insère des lignes dans le flux intégré à l’application. Vous pouvez maintenant exécuter des analyses sur les données du flux intégré à l'application.

Dans le code d'application de cet exemple, vous créez d'abord deux autres flux intégrés à l'application : Order_Stream et Trade_Stream. Vous pouvez alors filtrer les lignes du flux SOURCE_SQL_STREAM_001 en fonction du type d'enregistrement et les insérer dans les flux nouvellement créées à l'aide de pompes. Pour plus d'informations sur ce modèle de codage, consultez Code d'application.

  1. Filtrer les lignes d'ordre et de transaction dans des flux intégrés à l'application distincts:

    1. Filtrez les enregistrements d'ordre dans le flux SOURCE_SQL_STREAM_001, puis enregistrez les ordres dans le flux Order_Stream.

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. Filtrez les enregistrements de transaction dans le flux SOURCE_SQL_STREAM_001, puis enregistrez les ordres dans le flux Trade_Stream.

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. Vous pouvez maintenant exécuter des analyses supplémentaires sur ces flux. Dans cet exemple, vous comptez le nombre de transactions par symbole boursier dans une fenêtre bascule d'une minute et enregistrez les résultats dans un autre flux, DESTINATION_SQL_STREAM.

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    Vous voyez le résultat, comme illustré ci-après :

    Capture d'écran de la console affichant les résultats dans l'onglet de résultats SQL.
Étape suivante

Étape 1 : Préparation des données