Menu
Streaming Analytics Pipeline
Streaming Analytics Pipeline

Appendix C: Sample Amazon Kinesis Analytics Applications

Amazon Kinesis Analytics implements the ANSI 2008 SQL standard with extensions. These extensions enable you to process streaming data. For detailed information on Amazon Kinesis Analytics SQL concepts, please see to the Amazon Kinesis Analytics SQL Reference. Here are some examples of Amazon Kinesis Analytics application code.

Simple Continuous Filter

This application performs a continuous SELECT statement on stock ticker data in the source stream (SOURCE_SQL_STREAM_001) based on a WHERE condition, and insert the results into an output in-application stream (DESTINATION_SQL_STREAM).

Copy
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, sector, price, change FROM "SOURCE_SQL_STREAM_001" WHERE sector SIMILAR TO '%TECH%';

Multiple-Step Application

This application uses multiple intermediate in-application streams (IN_APP_STREAM_001 and IN_APP_STREAM_002) to process data in multiple steps. The results of a query against one in-application stream feed into another in-application stream.

Copy
CREATE OR REPLACE STREAM "IN_APP_STREAM_001" ( ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "PUMP_001" AS INSERT INTO "IN_APP_STREAM_001" SELECT STREAM APPROXIMATE_ARRIVAL_TIME, ticker_symbol, sector, price, change FROM "SOURCE_SQL_STREAM_001"; CREATE OR REPLACE STREAM "IN_APP_STREAM_002" ( ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "PUMP_002" AS INSERT INTO "IN_APP_STREAM_002" SELECT STREAM ingest_time, ticker_symbol, sector, price, change

Pre-Processing Streams

This application retrieves rows of specific types from the in-application input stream and inserts them in separate in-application streams. Once the record types have been filtered, you can perform analytics on a particular in-application stream.

Copy
CREATE OR REPLACE STREAM "Order_Stream" ( "order_id" integer, "order_type" varchar(10), "ticker" varchar(4), "order_price" DOUBLE, "record_type" varchar(10)); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Order'; CREATE OR REPLACE STREAM "Trade_Stream" ( "trade_id" integer, "order_id" integer, "trade_price" DOUBLE, "ticker" varchar(4), "record_type" varchar(10)); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Trade'; CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ticker" varchar(4), "trade_count" integer); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "ticker", count(*) as trade_count FROM "Trade_Stream" GROUP BY "ticker", FLOOR("Trade_Stream".ROWTIME TO MINUTE);