Seleziona le tue preferenze relative ai cookie

Utilizziamo cookie essenziali e strumenti simili necessari per fornire il nostro sito e i nostri servizi. Utilizziamo i cookie prestazionali per raccogliere statistiche anonime in modo da poter capire come i clienti utilizzano il nostro sito e apportare miglioramenti. I cookie essenziali non possono essere disattivati, ma puoi fare clic su \"Personalizza\" o \"Rifiuta\" per rifiutare i cookie prestazionali.

Se sei d'accordo, AWS e le terze parti approvate utilizzeranno i cookie anche per fornire utili funzionalità del sito, ricordare le tue preferenze e visualizzare contenuti pertinenti, inclusa la pubblicità pertinente. Per continuare senza accettare questi cookie, fai clic su \"Continua\" o \"Rifiuta\". Per effettuare scelte più dettagliate o saperne di più, fai clic su \"Personalizza\".

Visualizza esempi di query per analizzare i dati in un notebook Studio

Modalità Focus
Visualizza esempi di query per analizzare i dati in un notebook Studio - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Per informazioni sulle impostazioni delle query SQL di Apache Flink, consulta Flink sui notebook Zeppelin per l'analisi dei dati interattiva.

Per visualizzare la tua applicazione nel pannello di controllo di Apache Flink, scegli PROCESSO FLINK nella pagina Nota Zeppelin della tua applicazione.

Per ulteriori informazioni sulle query a finestra, consulta Finestre nella documentazione di Apache Flink.

Per altri esempi di query Streaming SQL di Apache Flink, consulta Query nella documentazione di Apache Flink.

Crea tabelle con Amazon MSK/Apache Kafka

Puoi utilizzare il connettore Amazon MSK Flink con il servizio gestito per Apache Flink Studio per autenticare la connessione con l'autenticazione Plaintext, SSL o IAM. Crea tabelle utilizzando proprietà specifiche in base alle tue esigenze.

-- Plaintext connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- SSL connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts', 'properties.ssl.truststore.password' = 'changeit', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- IAM connection (or for MSK Serverless) CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'AWS_MSK_IAM', 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );

Puoi combinarle con altre proprietà in Connettore SQL Apache Kafka.

Crea tabelle con Kinesis

Nell'esempio seguente, viene creata una tabella usando Kinesis:

CREATE TABLE KinesisTable ( `column1` BIGINT, `column2` BIGINT, `column3` BIGINT, `column4` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (column1, column2) WITH ( 'connector' = 'kinesis', 'stream' = 'test_stream', 'aws.region' = '<region>', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );

Per ulteriori informazioni sulle altre proprietà utilizzabili, consulta Connettore SQL per il flusso di dati Amazon Kinesis.

Interroga una finestra che si apre

La seguente query Streaming SQL di Flink seleziona dalla tabella ZeppelinTopic il prezzo più alto in ogni finestra a cascata di cinque secondi:

%flink.ssql(type=update) SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker FROM ZeppelinTopic GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)

Interroga una finestra scorrevole

La seguente query Streaming SQL di Apache Flink seleziona il prezzo più alto in ogni finestra scorrevole di cinque secondi dalla tabella ZeppelinTopic:

%flink.ssql(type=update) SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max FROM ZeppelinTopic//or your table name in AWS Glue GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)

Usa SQL interattivo

Questo esempio converte il massimo dell'ora evento e del tempo di elaborazione e la somma dei valori della tabella chiave-valore. Assicurati di avere lo script di generazione dei dati di esempio dal Usa Scala per generare dati di esempio in esecuzione. Per provare altre query SQL, ad esempio il filtraggio e i join, nel notebook Studio, consulta Queries nella documentazione di Apache Flink.

%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time. SELECT MAX(`et`) as `et`, MAX(`pt`) as `pt`, SUM(`value`) as `sum` FROM `key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive tumbling window query that displays the number of records observed per (event time) second. -- Browse through the chart views to see different visualizations of the streaming result. SELECT TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`, `key`, SUM(`value`) as `sum` FROM `key-values` GROUP BY TUMBLE(`et`, INTERVAL '1' SECONDS), `key`;

Usa il connettore BlackHole SQL

Il connettore BlackHole SQL non richiede la creazione di un flusso di dati Kinesis o di un cluster Amazon MSK per testare le query. Per informazioni sul connettore BlackHole SQL, consulta BlackHole SQL Connector nella documentazione di Apache Flink. In questo esempio, il catalogo predefinito è un catalogo in memoria.

%flink.ssql CREATE TABLE default_catalog.default_database.blackhole_table ( `key` BIGINT, `value` BIGINT, `et` TIMESTAMP(3) ) WITH ( 'connector' = 'blackhole' )
%flink.ssql(parallelism=1) INSERT INTO `test-target` SELECT `key`, `value`, `et` FROM `test-source` WHERE `key` > 3
%flink.ssql(parallelism=2) INSERT INTO `default_catalog`.`default_database`.`blackhole_table` SELECT `key`, `value`, `et` FROM `test-target` WHERE `key` > 7

Usa Scala per generare dati di esempio

Questo esempio utilizza Scala per generare dati di esempio. È possibile utilizzare questi dati di esempio per testare varie query. Utilizza l'istruzione di creazione tabella per creare la tabella chiave-valore.

import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import org.apache.flink.streaming.api.scala.DataStream import java.sql.Timestamp // ad-hoc convenience methods to be defined on Table implicit class TableOps[T](table: DataStream[T]) { def asView(name: String): DataStream[T] = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView("`" + name + "`") } stenv.createTemporaryView("`" + name + "`", table) return table; } }
%flink(parallelism=4) val stream = senv .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000)) .map(key => (key, 1, new Timestamp(System.currentTimeMillis))) .asView("key-values-data-generator")
%flink.ssql(parallelism=4) -- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)") -- in this case the INSERT query will inherit the parallelism of the of the above paragraph INSERT INTO `key-values` SELECT `_1` as `key`, `_2` as `value`, `_3` as `et` FROM `key-values-data-generator`

Usa Scala interattiva

Questa è la traduzione in Scala di Usa SQL interattivo. Per altri esempi di Scala, consulta API Table nella documentazione di Apache Flink.

%flink import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=4) // A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time. val query01 = stenv .from("`key-values`") .select( $"et".max().as("et"), $"pt".max().as("pt"), $"value".sum().as("sum") ).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink(parallelism=4) // An tumbling window view that displays the number of records observed per (event time) second. val query02 = stenv .from("`key-values`") .window(Tumble over 1.seconds on $"et" as $"w") .groupBy($"w", $"key") .select( $"w".start.as("window"), $"key", $"value".sum().as("sum") ).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

Usa Python interattivo

Questa è la traduzione in Python di Usa SQL interattivo. Per altri esempi di Python, consulta API Table nella documentazione di Apache Flink.

%flink.pyflink from pyflink.table.table import Table def as_view(table, name): if (name in st_env.list_temporary_views()): st_env.drop_temporary_view(name) st_env.create_temporary_view(name, table) return table Table.as_view = as_view
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`keyvalues`") \ .select(", ".join([ "max(et) as et", "max(pt) as pt", "sum(value) as sum" ])) \ .as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`key-values`") \ .window(Tumble.over("1.seconds").on("et").alias("w")) \ .group_by("w, key") \ .select(", ".join([ "w.start as window", "key", "sum(value) as sum" ])) \ .as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

Usa una combinazione di Python, SQL e Scala interattivi

Puoi usare qualsiasi combinazione di SQL, Python e Scala nel tuo notebook per l'analisi interattiva. In un notebook Studio che intendi implementare come applicazione con stato durevole, puoi utilizzare una combinazione di SQL e Scala. Questo esempio mostra le sezioni che vengono ignorate e quelle che vengono implementate nell'applicazione con stato durevole.

%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-source` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-source-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-target` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-target-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink() // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=1) val table = stenv .from("`default_catalog`.`default_database`.`my-test-source`") .select($"key", $"value", $"et") .filter($"key" > 10) .asView("query01")
%flink.ssql(parallelism=1) -- forward data INSERT INTO `default_catalog`.`default_database`.`my-test-target` SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000) -- forward data to local stream (ignored when deployed as application) SELECT * FROM `query01`
%flink // tell me the meaning of life (ignored when deployed as application!) print("42!")

Usa un flusso di dati Kinesis tra account

Per utilizzare un flusso di dati Kinesis che si trova in un account diverso da quello su cui è installato il notebook Studio, crea un ruolo di esecuzione del servizio nell'account su cui è in esecuzione il notebook Studio e una politica di attendibilità dei ruoli nell'account su cui è in esecuzione il flusso di dati. Utilizza aws.credentials.provider, aws.credentials.role.arn e aws.credentials.role.sessionName nel connettore Kinesis nell'istruzione DDL di creazione tabella per creare una tabella in base al flusso di dati.

Utilizza il seguente ruolo di esecuzione del servizio per l'account notebook Studio.

{ "Sid": "AllowNotebookToAssumeRole", "Effect": "Allow", "Action": "sts:AssumeRole" "Resource": "*" }

Utilizza la policy AmazonKinesisFullAccess e la seguente policy di attendibilità dei ruoli per l'account del flusso di dati.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<accountID>:root" }, "Action": "sts:AssumeRole", "Condition": {} } ] }

Usa il paragrafo seguente per l'istruzione di creazione tabella.

%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )
PrivacyCondizioni del sitoPreferenze cookie
© 2025, Amazon Web Services, Inc. o società affiliate. Tutti i diritti riservati.