Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Le seguenti query di esempio mostrano come analizzare i dati utilizzando le query a finestra in un notebook di Studio.
Per informazioni sulle impostazioni delle query SQL di Apache Flink, consulta Flink sui notebook Zeppelin per l'analisi dei dati interattiva
Per visualizzare la tua applicazione nel pannello di controllo di Apache Flink, scegli PROCESSO FLINK nella pagina Nota Zeppelin della tua applicazione.
Per ulteriori informazioni sulle query a finestra, consulta Finestre
Per altri esempi di query Streaming SQL di Apache Flink, consulta Query
Crea tabelle con Amazon MSK/Apache Kafka
Puoi utilizzare il connettore Amazon MSK Flink con il servizio gestito per Apache Flink Studio per autenticare la connessione con l'autenticazione Plaintext, SSL o IAM. Crea tabelle utilizzando proprietà specifiche in base alle tue esigenze.
-- Plaintext connection
CREATE TABLE your_table (
`column1` STRING,
`column2` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'your_topic',
'properties.bootstrap.servers' = '<bootstrap servers>',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
-- SSL connection
CREATE TABLE your_table (
`column1` STRING,
`column2` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'your_topic',
'properties.bootstrap.servers' = '<bootstrap servers>',
'properties.security.protocol' = 'SSL',
'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
'properties.ssl.truststore.password' = 'changeit',
'properties.group.id' = 'myGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
-- IAM connection (or for MSK Serverless)
CREATE TABLE your_table (
`column1` STRING,
`column2` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'your_topic',
'properties.bootstrap.servers' = '<bootstrap servers>',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'AWS_MSK_IAM',
'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
'properties.group.id' = 'myGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
Puoi combinarle con altre proprietà in Connettore SQL Apache Kafka
Crea tabelle con Kinesis
Nell'esempio seguente, viene creata una tabella usando Kinesis:
CREATE TABLE KinesisTable (
`column1` BIGINT,
`column2` BIGINT,
`column3` BIGINT,
`column4` STRING,
`ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
'connector' = 'kinesis',
'stream' = 'test_stream',
'aws.region' = '<region>',
'scan.stream.initpos' = 'LATEST',
'format' = 'csv'
);
Per ulteriori informazioni sulle altre proprietà utilizzabili, consulta Connettore SQL per il flusso di dati Amazon Kinesis
Interroga una finestra che si apre
La seguente query Streaming SQL di Flink seleziona dalla tabella ZeppelinTopic
il prezzo più alto in ogni finestra a cascata di cinque secondi:
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
Interroga una finestra scorrevole
La seguente query Streaming SQL di Apache Flink seleziona il prezzo più alto in ogni finestra scorrevole di cinque secondi dalla tabella ZeppelinTopic
:
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
Usa SQL interattivo
Questo esempio converte il massimo dell'ora evento e del tempo di elaborazione e la somma dei valori della tabella chiave-valore. Assicurati di avere lo script di generazione dei dati di esempio dal Usa Scala per generare dati di esempio in esecuzione. Per provare altre query SQL, ad esempio il filtraggio e i join, nel notebook Studio, consulta Queries
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)
-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
MAX(`et`) as `et`,
MAX(`pt`) as `pt`,
SUM(`value`) as `sum`
FROM
`key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)
-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
`key`,
SUM(`value`) as `sum`
FROM
`key-values`
GROUP BY
TUMBLE(`et`, INTERVAL '1' SECONDS),
`key`;
Usa il connettore BlackHole SQL
Il connettore BlackHole SQL non richiede la creazione di un flusso di dati Kinesis o di un cluster Amazon MSK per testare le query. Per informazioni sul connettore BlackHole SQL, consulta BlackHole SQL Connector nella documentazione di Apache Flink
%flink.ssql
CREATE TABLE default_catalog.default_database.blackhole_table (
`key` BIGINT,
`value` BIGINT,
`et` TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
)
%flink.ssql(parallelism=1)
INSERT INTO `test-target`
SELECT
`key`,
`value`,
`et`
FROM
`test-source`
WHERE
`key` > 3
%flink.ssql(parallelism=2)
INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
`key`,
`value`,
`et`
FROM
`test-target`
WHERE
`key` > 7
Usa Scala per generare dati di esempio
Questo esempio utilizza Scala per generare dati di esempio. È possibile utilizzare questi dati di esempio per testare varie query. Utilizza l'istruzione di creazione tabella per creare la tabella chiave-valore.
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream
import java.sql.Timestamp
// ad-hoc convenience methods to be defined on Table
implicit class TableOps[T](table: DataStream[T]) {
def asView(name: String): DataStream[T] = {
if (stenv.listTemporaryViews.contains(name)) {
stenv.dropTemporaryView("`" + name + "`")
}
stenv.createTemporaryView("`" + name + "`", table)
return table;
}
}
%flink(parallelism=4)
val stream = senv
.addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
.map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
.asView("key-values-data-generator")
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
`_1` as `key`,
`_2` as `value`,
`_3` as `et`
FROM
`key-values-data-generator`
Usa Scala interattiva
Questa è la traduzione in Scala di Usa SQL interattivo. Per altri esempi di Scala, consulta API Table
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
def asView(name: String): Table = {
if (stenv.listTemporaryViews.contains(name)) {
stenv.dropTemporaryView(name)
}
stenv.createTemporaryView(name, table)
return table;
}
}
%flink(parallelism=4)
// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
.from("`key-values`")
.select(
$"et".max().as("et"),
$"pt".max().as("pt"),
$"value".sum().as("sum")
).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)
-- An interactive query prints the query01 output.
SELECT * FROM query01
%flink(parallelism=4)
// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
.from("`key-values`")
.window(Tumble over 1.seconds on $"et" as $"w")
.groupBy($"w", $"key")
.select(
$"w".start.as("window"),
$"key",
$"value".sum().as("sum")
).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)
-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
Usa Python interattivo
Questa è la traduzione in Python di Usa SQL interattivo. Per altri esempi di Python, consulta API Table
%flink.pyflink
from pyflink.table.table import Table
def as_view(table, name):
if (name in st_env.list_temporary_views()):
st_env.drop_temporary_view(name)
st_env.create_temporary_view(name, table)
return table
Table.as_view = as_view
%flink.pyflink(parallelism=16)
# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
.from_path("`keyvalues`") \
.select(", ".join([
"max(et) as et",
"max(pt) as pt",
"sum(value) as sum"
])) \
.as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)
-- An interactive query prints the query01 output.
SELECT * FROM query01
%flink.pyflink(parallelism=16)
# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
.from_path("`key-values`") \
.window(Tumble.over("1.seconds").on("et").alias("w")) \
.group_by("w, key") \
.select(", ".join([
"w.start as window",
"key",
"sum(value) as sum"
])) \
.as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)
-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
Usa una combinazione di Python, SQL e Scala interattivi
Puoi usare qualsiasi combinazione di SQL, Python e Scala nel tuo notebook per l'analisi interattiva. In un notebook Studio che intendi implementare come applicazione con stato durevole, puoi utilizzare una combinazione di SQL e Scala. Questo esempio mostra le sezioni che vengono ignorate e quelle che vengono implementate nell'applicazione con stato durevole.
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
`key` BIGINT NOT NULL,
`value` BIGINT NOT NULL,
`et` TIMESTAMP(3) NOT NULL,
`pt` AS PROCTIME(),
WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'kda-notebook-example-test-source-stream',
'aws.region' = 'eu-west-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
`key` BIGINT NOT NULL,
`value` BIGINT NOT NULL,
`et` TIMESTAMP(3) NOT NULL,
`pt` AS PROCTIME(),
WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'kda-notebook-example-test-target-stream',
'aws.region' = 'eu-west-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
%flink()
// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
def asView(name: String): Table = {
if (stenv.listTemporaryViews.contains(name)) {
stenv.dropTemporaryView(name)
}
stenv.createTemporaryView(name, table)
return table;
}
}
%flink(parallelism=1)
val table = stenv
.from("`default_catalog`.`default_database`.`my-test-source`")
.select($"key", $"value", $"et")
.filter($"key" > 10)
.asView("query01")
%flink.ssql(parallelism=1)
-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)
-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
%flink
// tell me the meaning of life (ignored when deployed as application!)
print("42!")
Usa un flusso di dati Kinesis tra account
Per utilizzare un flusso di dati Kinesis che si trova in un account diverso da quello su cui è installato il notebook Studio, crea un ruolo di esecuzione del servizio nell'account su cui è in esecuzione il notebook Studio e una politica di attendibilità dei ruoli nell'account su cui è in esecuzione il flusso di dati. Utilizza aws.credentials.provider
, aws.credentials.role.arn
e aws.credentials.role.sessionName
nel connettore Kinesis nell'istruzione DDL di creazione tabella per creare una tabella in base al flusso di dati.
Utilizza il seguente ruolo di esecuzione del servizio per l'account notebook Studio.
{
"Sid": "AllowNotebookToAssumeRole",
"Effect": "Allow",
"Action": "sts:AssumeRole"
"Resource": "*"
}
Utilizza la policy AmazonKinesisFullAccess
e la seguente policy di attendibilità dei ruoli per l'account del flusso di dati.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
<accountID>
:root" }, "Action": "sts:AssumeRole", "Condition": {} } ] }
Usa il paragrafo seguente per l'istruzione di creazione tabella.
%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::
<accountID>
:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )