Para proyectos nuevos, le recomendamos que utilice el nuevo Kinesis Data Analytics Studio en lugar de Kinesis Data Analytics for SQL Applications. Kinesis Data Analytics Studio combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones sofisticadas de procesamiento de transmisiones en cuestión de minutos.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplo: recuperación de los valores más frecuentes (TOP_K_ITEMS_TUMBLING)
Este ejemplo de Amazon Kinesis Data Analytics demuestra cómo utilizar laTOP_K_ITEMS_TUMBLING
función para recuperar los valores que aparecen con más frecuencia en una ventana que se abre. Para obtener más información, consulte la TOP_K_ITEMS_TUMBLING
función en la referencia SQL de Amazon Kinesis Data Analytics.
La función TOP_K_ITEMS_TUMBLING
es útil cuando se agregan decenas o cientos de miles de claves y se desea reducir el uso de recursos. La función produce el mismo resultado que la agregación con cláusulas GROUP BY
y ORDER BY
.
En este ejemplo, escribe los siguientes registros en una transmisión de datos de Amazon Kinesis:
{"TICKER": "TBV"} {"TICKER": "INTC"} {"TICKER": "MSFT"} {"TICKER": "AMZN"} ...
A continuación, creación de una aplicación de Kinesis Data Analytics en elAWS Management Console, con el flujo de datos de Kinesis como el origen de streaming. El proceso de detección lee los registros de muestra en el origen de streaming e infiere un esquema en la aplicación con una columna (TICKER
), tal como se muestra a continuación.

Utilice el código de aplicación con la función TOP_K_VALUES_TUMBLING
para crear una agregación en ventana de los datos. A continuación, inserte los datos resultantes en otra secuencia en la aplicación, tal y como se muestra en la siguiente captura de pantalla:

En el siguiente procedimiento, se crea una aplicación de Kinesis Data Analytics que recupera los valores que aparecen con más frecuencia en el flujo de entrada.
Temas
Paso 1: creación de un flujo de datos de Kinesis
Cree una transmisión de datos de Amazon Kinesis y rellene los registros de la siguiente manera:
Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis
. -
Elija Data Streams (Secuencias de datos) en el panel de navegación.
-
Elija Create Kinesis Stream (Crear secuencia de Kinesis) y, a continuación, cree una secuencia con un fragmento. Para obtener más información, consulte Creación de streaming en la Guía para desarrolladores de Amazon Kinesis Data Streams.
-
Para escribir registros en un flujo de datos de Kinesis en un entorno de producción, recomendamos utilizar Kinesis Client Library o la API de Kinesis Data Streams. Para simplificar, en este ejemplo se utiliza el siguiente script Python para generar registros. Ejecute el código para rellenar los registros de ticker de muestra. Este código simple escribe continuamente un registro de ticker aleatorio en el flujo. Deje el script ejecutándose para poder generar el esquema de la aplicación en un paso posterior.
import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
Paso 2: creación de la aplicación de Kinesis Data Analytics
Creación de una aplicación de Kinesis Data Analytics de la siguiente manera:
Abra la consola de Kinesis Data Analytics en https://console.aws.amazon.com/kinesisanalytics
. -
Elija Create application (Crear aplicación), escriba el nombre de la aplicación y elija Create application (Crear aplicación).
-
En la página de detalles de la aplicación, elija Connect streaming data (Conectar datos de streaming) para conectarse al origen.
-
En la página Connect to source (Conectarse al origen), haga lo siguiente:
-
Elija la secuencia que ha creado en la sección anterior.
-
Elija Discover Schema (Detectar esquema). Espere a que la consola muestre el esquema inferido y los registros de muestra utilizados para inferir en el esquema de la secuencia en la aplicación que ha creado. El esquema inferido tiene una columna.
-
Elija Save schema and update stream samples. Después de que la consola guarde el esquema, elija Exit (Salir).
-
Elija Save and continue.
-
-
En la página de detalles de la aplicación, elija Go to SQL editor (Ir al editor de SQL). Para iniciar la aplicación, elija Yes, start application (Sí, iniciar la aplicación) en el cuadro de diálogo que aparece.
-
En el editor de SQL, escriba el código de la aplicación y verifique los resultados como se indica a continuación:
-
Copie el siguiente código de la aplicación y péguelo en el editor:
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM ( "TICKER" VARCHAR(4), "MOST_FREQUENT_VALUES" BIGINT ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM TABLE (TOP_K_ITEMS_TUMBLING( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 'TICKER', -- name of column in single quotes 5, -- number of the most frequently occurring values 60 -- tumbling window size in seconds ) );
-
Elija Save and run SQL.
En la pestaña Real-time analytics (Análisis en tiempo real), puede ver todas las secuencias en la aplicación creadas por esta y comprobar los datos.
-