Ejemplos de migración a Managed Service para Apache Flink - Guía para desarrolladores de Amazon Kinesis Data Analytics SQL para aplicaciones

Tras considerarlo detenidamente, hemos decidido interrumpir Amazon Kinesis Data Analytics SQL para aplicaciones en dos pasos:

1. A partir del 15 de octubre de 2025, no podrá crear nuevos Kinesis Data Analytics SQL para aplicaciones.

2. Eliminaremos sus aplicaciones a partir del 27 de enero de 2026. No podrá iniciar ni utilizar Amazon Kinesis Data Analytics SQL para aplicaciones. A partir de ese momento, el soporte para Amazon Kinesis Data Analytics dejará SQL de estar disponible. Para obtener más información, consulte Suspensión de Amazon Kinesis Data Analytics SQL for Applications.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplos de migración a Managed Service para Apache Flink

Tras considerarlo detenidamente, hemos tomado la decisión de interrumpir Amazon Kinesis Data Analytics SQL para aplicaciones. Para ayudarlo a planificar y migrar Amazon Kinesis Data Analytics SQL para aplicaciones, suspenderemos la oferta gradualmente a lo largo de 15 meses. Hay que tener en cuenta dos fechas importantes: el 15 de octubre de 2025 y el 27 de enero de 2026.

  1. A partir del 15 de octubre de 2025, no podrá crear nuevos Amazon Kinesis Data Analytics SQL para aplicaciones.

  2. Eliminaremos sus aplicaciones a partir del 27 de enero de 2026. No podrá iniciar ni utilizar Amazon Kinesis Data Analytics SQL para aplicaciones. A partir de ese momento, el soporte para Amazon Kinesis Data Analytics SQL para aplicaciones dejará de estar disponible. Para obtener más información, consulte Suspensión de Amazon Kinesis Data Analytics SQL for Applications.

Le recomendamos que utilice Amazon Managed Service para Apache Flink. Combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones de procesamiento de flujos en cuestión de minutos.

En esta sección se proporcionan ejemplos de código y arquitectura que le ayudarán a trasladar sus cargas de trabajo de Amazon Kinesis Data Analytics SQL para aplicaciones a Managed Service for Apache Flink.

Para obtener más información, consulte también esta entrada de AWS blog: Migración de Amazon Kinesis Data Analytics SQL for Applications a Managed Service para Apache Flink Studio.

Para migrar sus cargas de trabajo a Managed Service para Apache Flink Studio o Managed Service para Apache Flink, en esta sección se proporcionan traducciones de consultas que puede utilizar en casos de uso habituales.

Antes de explorar estos ejemplos, le recomendamos que consulte Uso de un cuaderno de Studio con Managed Service para Apache Flink.

Recreación de Kinesis Data Analytics SQL para consultas en Managed Service para Apache Flink Studio

Las siguientes opciones proporcionan traducciones de consultas de aplicaciones SQL de Kinesis Data Analytics habituales a Managed Service for Apache Flink Studio.

SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "IN_APP_STREAM_001" ( ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "IN_APP_STREAM_001" SELECT STREAM APPROXIMATE_ARRIVAL_TIME, ticker_symbol, sector, price, change FROM "SOURCE_SQL_STREAM_001"; -- Second in-app stream and pump CREATE OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "IN_APP_STREAM_02" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_001"; -- Destination in-app stream and third pump CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_03" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_02";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE, APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_001; CREATE TABLE IN_APP_STREAM_001 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_02; CREATE TABLE IN_APP_STREAM_02 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_02', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_001 SELECT APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM SOURCE_SQL_STREAM_001; Query 3 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_02 SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_001; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_02;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CHANGE_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1; -- ** Trigger Count and Limit ** -- Counts "triggers" or those values that evaluated true against the previous where clause -- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM ( ticker_symbol VARCHAR(4), change REAL, trigger_count INTEGER); CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol, change, trigger_count FROM ( SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING ) ) WHERE trigger_count >= 1;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE, EVENT_TIME AS PROCTIME()) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM; CREATE TABLE TRIGGER_COUNT_STREAM ( TICKER_SYMBOL VARCHAR(4), CHANGE DOUBLE, TRIGGER_COUNT INT) PARTITIONED BY (TICKER_SYMBOL); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1; Query 3 - % flink.ssql(type = update ) SELECT * FROM( SELECT TICKER_SYMBOL, CHANGE, COUNT(*) AS TRIGGER_COUNT FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER_SYMBOL, CHANGE ) WHERE TRIGGER_COUNT > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM"( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001", "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount " FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 ( TICKER_SYMBOL VARCHAR(4), TRADETIME AS PROCTIME(), APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM; CREATE TABLE CALC_COUNT_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'CALC_COUNT_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); Query 2 - % flink.ssql(type = update ) INSERT INTO CALC_COUNT_SQL_STREAM SELECT TICKER, TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME, TICKERCOUNT FROM ( SELECT TICKER_SYMBOL AS TICKER, DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME, COUNT(*) AS TICKERCOUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TRADETIME, INTERVAL '1' MINUTE), DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'), DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'), TICKER_SYMBOL ) ; Query 3 - % flink.ssql(type = update ) SELECT * FROM CALC_COUNT_SQL_STREAM; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT TICKER, TRADETIME, SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT FROM CALC_COUNT_SQL_STREAM WINDOW W1 AS ( PARTITION BY TICKER ORDER BY TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING ) ; Query 5 - % flink.ssql(type = update ) SELECT * FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", SUBSTRING("referrer", 12, ( POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4 ) ) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, substring(referrer, 12, 6) as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME()) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, REGEXP_REPLACE(referrer, 'http', 'https') as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( sector VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM T.SECTOR, T.REC.COLUMN1, T.REC.COLUMN2 FROM ( SELECT STREAM SECTOR, REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC FROM SOURCE_SQL_STREAM_001 ) AS T;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( CHANGE DOUBLE, PRICE DOUBLE, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16)) PARTITIONED BY (SECTOR) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT SECTOR, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 FROM DESTINATION_SQL_STREAM ) WHERE MATCH1 IS NOT NULL AND MATCH2 IS NOT NULL;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND, TICKER VARCHAR(4), TICKER_COUNT INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json' Query 2 - % flink.ssql(type = update ) SELECT EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '60' second), EVENT_TIME, TICKER;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '60' SECOND);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( ticker VARCHAR(4), price DOUBLE, event_time VARCHAR(32), processing_time AS PROCTIME()) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ticker, min(price) AS MIN_PRICE, max(price) AS MAX_PRICE FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(processing_time, INTERVAL '60' second), ticker;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM"TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001". "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT TICKER, COUNT(*) as MOST_FREQUENT_VALUES, ROW_NUMBER() OVER (PARTITION BY TICKER ORDER BY TICKER DESC) AS row_num FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER ) WHERE row_num <= 5;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ITEM, ITEM_COUNT FROM TABLE(TOP_K_ITEMS_TUMBLING(CURSOR( SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
Managed Service for Apache Flink Studio
%flinkssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), PRICE DOUBLE) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); %flink.ssql(type=update) SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW ORDER BY ITEM_COUNT DESC) as rownum FROM ( select AGG_WINDOW, ITEM, ITEM_COUNT from ( select TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW, ITEM, count(*) as ITEM_COUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TS, INTERVAL '60' SECONDS), ITEM ) ) ) where rownum <= 3
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), column2 VARCHAR(16), column3 VARCHAR(16), column4 VARCHAR(16), column5 VARCHAR(16), column6 VARCHAR(16), column7 VARCHAR(16)); CREATE OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM l.r.COLUMN1, l.r.COLUMN2, l.r.COLUMN3, l.r.COLUMN4, l.r.COLUMN5, l.r.COLUMN6, l.r.COLUMN7 FROM ( SELECT STREAM W3C_LOG_PARSE("log", 'COMMON') FROM "SOURCE_SQL_STREAM_001" ) AS l(r);
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5), SPLIT_INDEX(log, ' ', 6) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM ( SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001" ) as t;
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5) ) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, priceFROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(12), CHANGE INT, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - CREATE TABLE CompanyName ( Ticker VARCHAR(4), Company VARCHAR(4)) WITH ( 'connector' = 'filesystem', 'path' = 's3://kda-demo-sample/TickerReference.csv', 'format' = 'csv' ); Query 3 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, c.Company, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM LEFT JOIN CompanyName as c ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
SQL-based Kinesis Data Analytics application
SELECT STREAM ticker_symbol, sector, change, ( price / 0 ) as ProblemColumnFROM "SOURCE_SQL_STREAM_001" WHERE sector SIMILAR TO '%TECH%';
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()], result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 except : return - 1 st_env.register_function("DivideByZero", DivideByZero) Query 3 - % flink.ssql(type = update ) SELECT CURRENT_TIMESTAMP AS ERROR_TIME, * FROM ( SELECT TICKER_SYMBOL, SECTOR, CHANGE, DivideByZero(PRICE) as ErrorColumn FROM DESTINATION_SQL_STREAM WHERE SECTOR SIMILAR TO '%TECH%' ) AS ERROR_STREAM;

Si quiere trasladar cargas de trabajo que utilizan Random Cut Forest de Kinesis Analytics SQL para a Managed Service para Apache Flink, en AWS esta entrada de blog se muestra cómo utilizar Managed Service for Apache Flink para ejecutar un algoritmo RCF en línea para la detección de anomalías.

Consulte Converting- KDASQL -KDAStudio/para ver un tutorial completo.

En el siguiente ejercicio, cambiará su flujo de datos para usar Amazon Managed Service para Apache Flink Studio. Esto también implicará cambiar de Amazon Kinesis Data Firehose a Amazon Kinesis Data Streams.

En primer lugar, compartimos una SQL arquitectura típicaKDA, antes de mostrar cómo puede sustituirla mediante Amazon Managed Service para Apache Flink Studio y Amazon Kinesis Data Streams. Como alternativa, puede lanzar la AWS CloudFormation plantilla aquí:

Amazon Kinesis Data Analytics y Amazon SQL Kinesis Data Firehose

Este es el flujo de arquitectura de Amazon Kinesis Data SQL Analytics:

Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.

En primer lugar, examinamos la configuración de una Amazon Kinesis Data Analytics y una Amazon SQL Kinesis Data Firehose antiguas. El caso de uso es un mercado bursátil en el que los datos de negociación, incluidos el precio y el precio de las acciones, se transmiten desde fuentes externas a los sistemas Amazon Kinesis. Amazon Kinesis Data Analytics SQL for utiliza el flujo de entrada para ejecutar consultas en ventana, como Tumbling Windowmax, para determinar el volumen de operaciones y min el precio de negociación en un período average de un minuto para cada cotización bursátil. 

Amazon Kinesis Data AnalyticsSQL: está configurado para ingerir datos de la Amazon Kinesis Data Firehose. API Tras el procesamiento, Amazon Kinesis Data Analytics envía SQL los datos procesados a otra Amazon Kinesis Data Firehose, que luego guarda la salida en un bucket de Amazon S3.

En este caso, utiliza Amazon Kinesis Data Generator. Amazon Kinesis Data Generator le permite enviar datos de prueba a sus flujos de entrega de Amazon Kinesis Data Streams o Amazon Kinesis Data Firehose. Para empezar, siga las instrucciones que aparecen aquí. Utilice la AWS CloudFormation plantilla que aparece aquí en lugar de la que se proporciona en las instrucciones:.

Una vez que ejecute la AWS CloudFormation plantilla, la sección de resultados proporcionará la URL del generador de datos de Amazon Kinesis. Inicie sesión en el portal con el ID y la contraseña de Cognito que configuró aquí. Seleccione la región y el nombre del flujo de destino. Para ver el estado actual, elija los flujos de Amazon Kinesis Data Firehose Delivery. Para ver el estado nuevo, elija los flujos de Amazon Kinesis Data Firehose. Puede crear varias plantillas, en función de sus requisitos, y probarlas con el botón Probar plantilla antes de enviarlas al flujo de destino.

A continuación, se presenta un ejemplo de carga útil con Amazon Kinesis Data Generator. El generador de datos se dirige a la entrada de Amazon Kinesis Firehose Streams para transmitir los datos de forma continua. El SDK cliente de Amazon Kinesis también puede enviar datos de otros productores. 

2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582

JSONSe utiliza lo siguiente para generar una serie aleatoria de fecha y hora de negociación, cotización bursátil y precio bursátil:

date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)

Una vez que seleccione Enviar datos, el generador empezará a enviar datos simulados.

Los sistemas externos transmiten los datos a Amazon Kinesis Data Firehose. Con Amazon Kinesis Data Analytics SQL for Applications, puede analizar los datos de streaming de forma estándar. SQL El servicio le permite crear y ejecutar SQL código en fuentes de streaming para realizar análisis de series temporales, alimentar cuadros de mando en tiempo real y crear métricas en tiempo real. Amazon Kinesis Data Analytics SQL for Applications podría crear una transmisión SQL de destino a partir de consultas en la transmisión de entrada y enviar la transmisión de destino a otra Amazon Kinesis Data Firehose. El Amazon Kinesis Data Firehose de destino podría enviar los datos analíticos a Amazon S3 como estado final.

El código heredado de Amazon Kinesis Data Analytics SQL se basa en una extensión SQL de Standard.

Utilice la siguiente consulta en Amazon Kinesis Data Analytics SQL -. Primero debe crear un flujo de destino para el resultado de la consulta. A continuaciónPUMP, utilizaría un objeto de repositorio de Amazon Kinesis Data Analytics (una extensión SQL del estándar) que proporciona una funcionalidad de consulta en INSERT INTO stream SELECT ... FROM ejecución continua, lo que permite introducir los resultados de una consulta de forma continua en una transmisión con nombre. 

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);

La anterior SQL utiliza dos intervalos de tiempo, tradeTimestamp que provienen de la carga útil de la transmisión entrante y también ROWTIME.tradeTimestamp se denominan Event Time o. client-side time Suele ser conveniente utilizar estos momentos en análisis, ya que es el momento en el que se produjo un evento. No obstante, muchas fuentes de eventos como, por ejemplo, clientes de teléfonos móviles y web, no tienen relojes de confianza, lo que puede provocar tiempos inexactos. Además, los problemas de conectividad pueden hacer que los registros aparezcan en la secuencia y no lo en el mismo orden los eventos. 

Las secuencias en la aplicación incluyen una columna especial llamada ROWTIME. Almacena una marca temporal cuando Amazon Kinesis Data Analytics inserta una fila en la primera secuencia en la aplicación. ROWTIME refleja la marca temporal en la que Amazon Kinesis Data Analytics insertó un registro en la primera secuencia en la aplicación después de leer desde el origen de streaming. Este valor ROWTIME se mantiene en toda su aplicación. 

SQLDetermina el número de ticker como volume minmax, y el average precio en un intervalo de 60 segundos. 

Utilizar cada uno de estos tiempos en las consultas en ventana basadas en el tiempo tiene ventajas y desventajas. Le recomendamos que elija uno o varios de estos tiempos, y una estrategia para abordar las posibles desventajas en función de su caso de uso. 

Recomendamos una estrategia de dos ventanas que utilice dos ventanas basadas en el tiempo: una ROWTIME y una para los otros tiempos, como el tiempo de evento.

  • Utilice ROWTIME como la primera ventana, que controla la frecuencia con la que la consulta emite los resultados, tal y como se muestra en el siguiente ejemplo. No se utiliza como tiempo lógico.

  • Utilice uno de los otros tiempos que es el tiempo lógico que desea asociar a su análisis. Este tiempo representa cuándo se produjo el evento. En el siguiente ejemplo, el objetivo de análisis es agrupar los registros y devolver un recuento por cada símbolo.

Amazon Managed Service para Apache Flink Studio 

En la arquitectura actualizada, se sustituye Amazon Kinesis Data Firehose por Amazon Kinesis Data Streams. Amazon Kinesis Data Analytics SQL for Applications se sustituye por Amazon Managed Service para Apache Flink Studio. El código de Apache Flink se ejecuta de forma interactiva en un cuaderno Apache Zeppelin. Amazon Managed Service para Apache Flink Studio envía los datos de comercio agregado a un bucket de Amazon S3 para su almacenamiento. Los pasos se muestran a continuación:

Este es el flujo de arquitectura de Amazon Managed Service para Apache Flink:

Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.

Cree de un flujo de datos de Kinesis

Para crear un flujo de datos con la consola
  1. Inicie sesión en la consola de Kinesis AWS Management Console y ábrala en https://console.aws.amazon.com /kinesis.

  2. En la barra de navegación, expanda el selector de regiones y seleccione una región.

  3. Elija Create data stream (Crear flujo de datos).

  4. En la página Crear flujo de Kinesis, escriba un nombre para su flujo de datos y, a continuación, elija el modo de capacidad Bajo demanda predeterminado.

    Con el modo Bajo demanda, puede seleccionar Crear flujo de Kinesis para crear su flujo de datos.

    En la página Flujos de Kinesis, el valor Estado del flujo es Creándose mientras se crea. Cuando el flujo está listo para usarse, el valor Estado cambia a Activo.

  5. Elija el nombre del flujo. La página Detalles del flujo muestra un resumen de la configuración del flujo, junto con información de monitoreo.

  6. En el generador de datos de Amazon Kinesis, cambie la transmisión o entrega por la nueva transmisión de Amazon Kinesis Data Streams: _ _. TRADE SOURCE STREAM

    JSONy la carga útil será la misma que utilizó para Amazon Kinesis Data Analytics-. SQL Utilice el generador de datos de Amazon Kinesis para generar algunos ejemplos de datos de carga útil de negociación y segmente el flujo de STREAM datos TRADE_ SOURCE _ para este ejercicio:

    {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
  7. AWS Management Console Vaya a Managed Service for Apache Flink y, a continuación, seleccione Crear aplicación.

  8. En el panel de navegación izquierdo, elija Bloc de notas de Studio y, a continuación, seleccione Crear bloc de notas de Studio.

  9. Escriba el nombre del bloc de notas de Studio.

  10. En AWS Glue database, proporcione una base de datos AWS Glue existente que defina los metadatos de sus fuentes y destinos. Si no tiene una AWS Glue base de datos, elija Crear y haga lo siguiente:

    1. En la consola AWS Glue, selecciona Bases de datos en Catálogo de datos en el menú de la izquierda.

    2. Elija Crear base de datos.

    3. En la página Crear base de datos, ingrese el nombre de la base de datos. En la sección Ubicación - opcional, elija Examinar Amazon S3 y seleccione el bucket de Amazon S3. Si aún no tiene configurado un bucket de Amazon S3, puede omitir este paso y volver a él más tarde.

    4. (Opcional). Ingrese la descripción de la base de datos.

    5. Elija Create database (Creación de base de datos).

  11. Elija Crear bloc de notas.

  12. Una vez creado el bloc de notas, seleccione Ejecutar.

  13. Una vez que el cuaderno se haya iniciado correctamente, abra un cuaderno Zeppelin seleccionando Abrir en Apache Zeppelin.

  14. En la página del bloc de notas de Zeppelin, selecciona Crear nueva nota y asígnale un nombre. MarketDataFeed

El SQL código de Flink se explica a continuación, pero primero así es como se ve la pantalla de un cuaderno Zeppelin. Cada ventana del bloc de notas es un bloque de códigos independiente y se pueden ejecutar de una en una.

Código de Amazon Managed Service para Apache Flink Studio

Amazon Managed Service para Apache Flink utiliza Zeppelin Notebooks para ejecutar el código. En este ejemplo, la asignación se realiza a código ssql basado en Apache Flink 1.13. El código del cuaderno Zeppelin se muestra debajo de un bloque a la vez. 

Antes de ejecutar cualquier código en su bloc de notas Zeppelin, debe ejecutar los comandos de configuración de Flink. Si necesita cambiar algún ajuste de configuración después de ejecutar el código (ssql, Python o Scala), tendrá que detener y reiniciar el cuaderno. En este ejemplo, tendrá que establecer puntos de control. Se requieren puntos de control para poder transmitir datos a un archivo en Amazon S3. Esto permite que los datos que se transmiten a Amazon S3 se vacíen en un archivo. La siguiente afirmación establece el intervalo en 5000 milisegundos. 

%flink.conf execution.checkpointing.interval 5000

%flink.conf indica que este bloque son declaraciones de configuración. Para obtener más información sobre la configuración de Flink, incluidos los puntos de control, consulte Puntos de control de Apache Flink

La tabla de entrada para la fuente Amazon Kinesis Data Streams se crea con el código ssql de Flink que aparece a continuación. Tenga en cuenta que el campo TRADE_TIME almacena la fecha y la hora creadas por el generador de datos.

%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');

Puede ver el flujo de entrada con esta declaración:

%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;

Antes de enviar los datos agregados a Amazon S3, puede verlos directamente en Amazon Managed Service para Apache Flink Studio con una consulta de selección en una ventana desplegable. Esto agrega los datos de negociación en intervalos de tiempo de un minuto. Tenga en cuenta que la sentencia %flink.ssql debe tener una designación (type=update):

%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

A continuación, podrá crear una tabla para el destino en Amazon S3. Tiene que utilizar una marca de agua. Una marca de agua es una métrica de progreso que indica un momento en el que está seguro de que no se producirán más eventos retrasados. El motivo de la marca de agua es tener en cuenta las llegadas tardías. El intervalo ‘5’ Second permite que las operaciones entren en Amazon Kinesis Data Streams con 5 segundos de retraso y que se sigan incluyendo si tienen una marca de tiempo dentro de la ventana. Para obtener más información, consulte Generating Watermarks.   

%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING,  VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');

Esta declaración inserta los datos en TRADE_DESTINATION_S3. TUMPLE_ROWTIME es la marca de tiempo del límite superior inclusivo de la ventana de saltos.

%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

Deje que su estado de cuenta se ejecute durante 10 a 20 minutos para acumular algunos datos en Amazon S3. A continuación, aborte su instrucción.

Esto cierra el archivo en Amazon S3 para que se pueda ver.

Este es el aspecto del contenido:

Financial data table showing stock prices and volumes for tech companies on March 1, 2023.

Puede usar la plantilla de AWS CloudFormation para crear la infraestructura.

AWS CloudFormation creará los siguientes recursos en tu cuenta: AWS

  • Amazon Kinesis Data Streams

  • Amazon Managed Service para Apache Flink Studio

  • AWS Glue base de datos

  • Bucket de Amazon S3

  • IAMfunciones y políticas de Amazon Managed Service para que Apache Flink Studio acceda a los recursos adecuados

Importe el bloc de notas y cambie el nombre del bucket de Amazon S3 por el nuevo bucket de Amazon S3 creado por AWS CloudFormation.

SQL code snippet creating a table with timestamp, ticker, volume, and price fields.
Ver más

Estos son algunos recursos adicionales que puede utilizar para obtener más información sobre el uso de Managed Service para Apache Flink Studio:

El propósito del patrón es demostrar cómo aprovechar las libretas Zeppelin de Kinesis Data Analytics-Studio para procesar datos UDFs en la transmisión de Kinesis. Managed Service para Apache Flink Studio utiliza Apache Flink para proporcionar capacidades analíticas avanzadas, que incluyen semántica de procesamiento de una sola vez, ventanas temporales de eventos, extensibilidad mediante funciones definidas por el usuario e integraciones de clientes, compatibilidad con lenguajes imperativos, estado de aplicación duradero, escalado horizontal, soporte para múltiples orígenes de datos, integraciones extensibles y más. Son fundamentales para garantizar la precisión, la integridad, la coherencia y la fiabilidad del procesamiento de las transmisiones de datos y no están disponibles con Amazon Kinesis Data Analytics for. SQL

En este ejemplo de aplicación, demostraremos cómo utilizar UDFs el bloc de notas KDA -Studio Zeppelin para procesar datos en la transmisión de Kinesis. Los cuadernos de Studio para Kinesis Data Analytics le permiten consultar flujos de datos de forma interactiva en tiempo real y crear y ejecutar fácilmente aplicaciones de procesamiento de flujos mediante estándaresSQL, Python y Scala. Con unos pocos clics AWS Management Console, puede abrir un bloc de notas sin servidor para consultar flujos de datos y obtener resultados en cuestión de segundos. Para obtener más información, consulte Uso de un bloc de notas de Studio con Kinesis Data Analytics para Apache Flink.

Funciones Lambda utilizadas para el procesamiento previo y posterior de datos en aplicaciones: KDA SQL

Data flow diagram showing SQL App processing between source and destination streams.

Funciones definidas por el usuario para el procesamiento previo y posterior de los datos utilizando los cuadernos -Studio Zeppelin KDA

Flink Studio Zeppelin Notebook workflow with in-memory tables and user-defined functions.

Funciones UDFs definidas por el usuario ()

Para reutilizar la lógica empresarial habitual en un operador, puede resultar útil hacer referencia a una función definida por el usuario para transformar el flujo de datos. Esto se puede hacer desde el bloc de notas Managed Service para Apache Flink Studio o como un archivo jar de aplicación con referencia externa. El uso de funciones definidas por el usuario puede simplificar las transformaciones o los enriquecimientos de datos que se podrían realizar a través del flujo de datos.

En su bloc de notas, hará referencia a un sencillo contenedor de aplicaciones Java que tiene la funcionalidad de anonimizar números de teléfono personales. También puedes escribir Python o Scala UDFs para usarlos en el cuaderno. Elegimos una aplicación Java jar para resaltar la funcionalidad de importar una aplicación jar a un bloc de notas de Pyflink.

Configuración del entorno

Para seguir esta guía e interactuar con sus datos de flujo, utilizará un script AWS CloudFormation para lanzar los siguientes recursos:

  • Flujo de datos de origen y destino de Kinesis

  • Base de datos Glue

  • Rol IAM

  • Aplicación Managed Service para Apache Flink Studio

  • Función de Lambda para iniciar la aplicación Managed Service para Apache Flink Studio

  • Rol de Lambda para ejecutar la anterior función de Lambda

  • Recurso personalizado para invocar la función de Lambda

Descarga la AWS CloudFormation plantilla aquí.

Crea la AWS CloudFormation pila
  1. Ve a AWS Management Console y elige CloudFormationen la lista de servicios.

  2. En la CloudFormationpágina, selecciona Pilas y, a continuación, selecciona Crear pila con nuevos recursos (estándar).

  3. En la página Crear pila, elija Cargar un archivo de plantilla y, a continuación, elija el kda-flink-udf.yml que haya descargado anteriormente. Elija el archivo y después elija Siguiente.

  4. Asigne un nombre a la plantilla, como kinesis-UDF de modo que sea fácil de recordar, y actualice los parámetros de entrada, como flujo de entrada, si desea un nombre diferente. Elija Next (Siguiente).

  5. En la página Configurar opciones de pila, añada Etiquetas si lo desea y, a continuación, seleccione Siguiente.

  6. En la página de revisión, marca las casillas que permiten la creación de IAM recursos y, a continuación, selecciona Enviar.

El lanzamiento de la AWS CloudFormation pila puede tardar entre 10 y 15 minutos, en función de la región en la que lo hagas. Cuando vea el estado CREATE_COMPLETE de toda la pila, estará listo para continuar.

Uso del bloc de notas de Managed Service para Apache Flink Studio

Los cuadernos de Studio para Kinesis Data Analytics le permiten consultar flujos de datos de forma interactiva en tiempo real y crear y ejecutar fácilmente aplicaciones de procesamiento de flujos mediante estándaresSQL, Python y Scala. Con unos pocos clics AWS Management Console, puede lanzar un bloc de notas sin servidor para consultar flujos de datos y obtener resultados en cuestión de segundos.

Un bloc de notas es un entorno de desarrollo basado en la web. Con los blocs de notas, obtiene una experiencia de desarrollo interactiva sencilla combinada con las capacidades avanzadas de procesamiento de flujos de datos que proporciona Apache Flink. Los cuadernos de Studio utilizan la tecnología Apache Zeppelin y utilizan Apache Flink como motor de procesamiento de flujos. Los blocs de notas de Studio combinan estas tecnologías a la perfección para que los desarrolladores con todas las habilidades puedan acceder a los análisis avanzados de los flujos de datos.

Apache Zeppelin proporciona a sus blocs de notas de Studio un conjunto completo de herramientas de análisis, entre las que se incluyen las siguientes:

  • Visualización de datos

  • Exportación de datos a un archivo CSV

  • Control del formato de salida para facilitar el análisis

Uso del bloc de notas
  1. Vaya a Amazon Kinesis AWS Management Console y elija Amazon Kinesis en la lista de servicios.

  2. En la página de navegación de la izquierda, elija Aplicaciones de análisis y, a continuación, elija blocs de notas de Studio.

  3. Compruebe que el KinesisDataAnalyticsStudioportátil esté funcionando.

  4. Elija el bloc de notas y, a continuación, elija Abrir en Apache Zeppelin.

  5. Descargue el archivo de Data Producer Zeppelin Notebook que utilizará para leer y cargar datos en Kinesis Stream.

  6. Importe el bloc de notas Zeppelin Data Producer. Asegúrese de modificar la entrada STREAM_NAME y REGION en el código del bloc de notas. El nombre del flujo de entrada se encuentra en la salida de la pila AWS CloudFormation.

  7. Ejecute el bloc de notas Data Producer pulsando el botón Ejecutar este párrafo para insertar datos de muestra en la entrada de Kinesis Data Stream.

  8. Mientras se cargan los datos de muestra, descargue MaskPhoneNumber-Interactive Notebook, que leerá los datos de entrada, anonimizará los números de teléfono del flujo de entrada y almacenará los datos anónimos en el flujo de salida.

  9. Importe el bloc de notas Zeppelin MaskPhoneNumber-interactive.

  10. Ejecute cada párrafo del bloc de notas.

    1. En el párrafo 1, se importa una función definida por el usuario para anonimizar los números de teléfono.

      %flink(parallelism=1) import com.mycompany.app.MaskPhoneNumber stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
    2. En el siguiente párrafo, creará una tabla en memoria para leer los datos del flujo de entrada. Asegúrese de que el nombre de la transmisión y la región sean correctos. AWS

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews; CREATE TABLE customer_reviews ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phone VARCHAR ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json');
    3. Compruebe si los datos están cargados en la tabla en memoria.

      %flink.ssql(type=update) select * from customer_reviews
    4. Invoque la función definida por el usuario para anonimizar el número de teléfono.

      %flink.ssql(type=update) select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    5. Ahora que los números de teléfono están enmascarados, cree una vista con un número enmascarado.

      %flink.ssql(type=update) DROP VIEW IF EXISTS sentiments_view; CREATE VIEW sentiments_view AS select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    6. Compruebe los datos.

      %flink.ssql(type=update) select * from sentiments_view
    7. Cree una tabla en memoria para la salida de Kinesis Stream. Asegúrese de que el nombre de la transmisión y AWS la región sean correctos.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews_stream_table; CREATE TABLE customer_reviews_stream_table ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phoneNumber varchar ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleOutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json');
    8. Inserte registros actualizados en el flujo de Kinesis de destino.

      %flink.ssql(type=update) INSERT INTO customer_reviews_stream_table SELECT customer_id, product, review, phoneNumber FROM sentiments_view
    9. Vea y verifique los datos del flujo de Kinesis de destino.

      %flink.ssql(type=update) select * from customer_reviews_stream_table

Promoción de un bloc de notas como aplicación

Ahora que ha probado el código de su bloc de notas de forma interactiva, implementará el código como una aplicación de flujo con un estado duradero. Primero tendrá que modificar la configuración de la aplicación para especificar una ubicación para su código en Amazon S3.

  1. En AWS Management Console, elija su bloc de notas y, en Implementar como configuración de aplicación (opcional), elija Editar.

  2. En Destino del código en Amazon S3, elija el bucket de Amazon S3 que crearon los AWS CloudFormation scripts. El proceso puede demorar unos minutos.

  3. No podrá promocionar la nota tal como está. Si lo intenta, se producirá un error ya que no se admiten las instrucciones Select. Para evitar este problema, descargue el cuaderno MaskPhoneNumber-Streaming Zeppelin.

  4. Importe el bloc de notas Zeppelin MaskPhoneNumber-streaming.

  5. Abre la nota y selecciona Acciones para. KinesisDataAnalyticsStudio

  6. Elija Build MaskPhoneNumber -Streaming y exporte a S3. Asegúrese de cambiar el nombre de la aplicación y de no incluir caracteres especiales.

  7. Seleccione Crear y exportar. La configuración de la aplicación de flujo tardará unos minutos.

  8. Una vez que se complete la compilación, elija Implementar mediante la consola de AWS .

  9. En la página siguiente, revise la configuración y asegúrese de elegir el IAM rol correcto. A continuación, seleccione Crear aplicación de streaming.

  10. Después de unos minutos, verá un mensaje que indica que la aplicación de flujo se creó correctamente.

Para obtener más información sobre la implementación de aplicaciones con un estado y límites duraderos, consulte Implementación como una aplicación con un estado duradero.

Limpieza

Si lo desea, ahora puede desinstalar la pila AWS CloudFormation. Esto eliminará todos los servicios que configuró anteriormente.