Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Lihat contoh kueri untuk menganalisis data di buku catatan Studio
Kueri contoh berikut menunjukkan cara menganalisis data menggunakan kueri jendela di notebook Studio.
- Buat tabel dengan MSK Amazon/Apache Kafka
- Buat tabel dengan Kinesis
- Kueri jendela yang jatuh
- Kueri jendela geser
- Gunakan interaktif SQL
- Gunakan BlackHole SQL konektornya
- Gunakan Scala untuk menghasilkan data sampel
- Gunakan Scala interaktif
- Gunakan Python interaktif
- Gunakan kombinasi Python interaktif,SQL, dan Scala
- Gunakan aliran data Kinesis lintas akun
Untuk informasi tentang pengaturan SQL kueri Apache Flink, lihat Flink di Notebook Zeppelin
Untuk melihat aplikasi Anda di dasbor Apache Flink, pilih FLINKJOBdi halaman Catatan Zeppelin aplikasi Anda.
Untuk informasi selengkapnya tentang kueri jendela, lihat Windows
Untuk lebih banyak contoh kueri Apache Flink Streaming, lihat SQL Kueri
Buat tabel dengan MSK Amazon/Apache Kafka
Anda dapat menggunakan konektor Amazon MSK Flink dengan Managed Service for Apache Flink Studio untuk mengautentikasi koneksi Anda dengan Plaintext,, atau otentikasi. SSL IAM Buat tabel Anda menggunakan properti spesifik sesuai kebutuhan Anda.
-- Plaintext connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- SSL connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts', 'properties.ssl.truststore.password' = 'changeit', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- IAM connection (or for MSK Serverless) CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'AWS_MSK_IAM', 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
Anda dapat menggabungkan ini dengan properti lain di Apache Kafka SQL
Buat tabel dengan Kinesis
Dalam contoh berikut, Anda membuat tabel menggunakan Kinesis:
CREATE TABLE KinesisTable ( `column1` BIGINT, `column2` BIGINT, `column3` BIGINT, `column4` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (column1, column2) WITH ( 'connector' = 'kinesis', 'stream' = 'test_stream', 'aws.region' = '<region>', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );
Untuk informasi selengkapnya tentang properti lain yang dapat Anda gunakan, lihat Konektor Amazon Kinesis SQL Data Streams
Kueri jendela yang jatuh
SQLKueri Streaming Flink berikut memilih harga tertinggi di setiap jendela jatuh lima detik dari tabel: ZeppelinTopic
%flink.ssql(type=update) SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker FROM ZeppelinTopic GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
Kueri jendela geser
SQLKueri Apache Flink Streaming berikut memilih harga tertinggi di setiap jendela geser lima detik dari tabel: ZeppelinTopic
%flink.ssql(type=update) SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max FROM ZeppelinTopic//or your table name in AWS Glue GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
Gunakan interaktif SQL
Contoh ini mencetak maks. waktu peristiwa dan waktu pemrosesan serta jumlah nilai dari tabel nilai kunci. Pastikan Anda memiliki skrip pembuatan data sampel dari Gunakan Scala untuk menghasilkan data sampel yang berjalan. Untuk mencoba SQL kueri lain seperti memfilter dan bergabung di buku catatan Studio Anda, lihat dokumentasi Apache Flink: Kueri
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time. SELECT MAX(`et`) as `et`, MAX(`pt`) as `pt`, SUM(`value`) as `sum` FROM `key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive tumbling window query that displays the number of records observed per (event time) second. -- Browse through the chart views to see different visualizations of the streaming result. SELECT TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`, `key`, SUM(`value`) as `sum` FROM `key-values` GROUP BY TUMBLE(`et`, INTERVAL '1' SECONDS), `key`;
Gunakan BlackHole SQL konektornya
BlackHole SQLKonektor tidak mengharuskan Anda membuat aliran data Kinesis atau MSK klaster Amazon untuk menguji kueri Anda. Untuk informasi tentang BlackHole SQL konektor, lihat BlackHole SQLKonektor
%flink.ssql CREATE TABLE default_catalog.default_database.blackhole_table ( `key` BIGINT, `value` BIGINT, `et` TIMESTAMP(3) ) WITH ( 'connector' = 'blackhole' )
%flink.ssql(parallelism=1) INSERT INTO `test-target` SELECT `key`, `value`, `et` FROM `test-source` WHERE `key` > 3
%flink.ssql(parallelism=2) INSERT INTO `default_catalog`.`default_database`.`blackhole_table` SELECT `key`, `value`, `et` FROM `test-target` WHERE `key` > 7
Gunakan Scala untuk menghasilkan data sampel
Contoh ini menggunakan Scala untuk menghasilkan data sampel. Anda dapat menggunakan data sampel ini untuk menguji berbagai kueri. Gunakan pernyataan buat tabel untuk membuat tabel nilai kunci.
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import org.apache.flink.streaming.api.scala.DataStream import java.sql.Timestamp // ad-hoc convenience methods to be defined on Table implicit class TableOps[T](table: DataStream[T]) { def asView(name: String): DataStream[T] = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView("`" + name + "`") } stenv.createTemporaryView("`" + name + "`", table) return table; } }
%flink(parallelism=4) val stream = senv .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000)) .map(key => (key, 1, new Timestamp(System.currentTimeMillis))) .asView("key-values-data-generator")
%flink.ssql(parallelism=4) -- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)") -- in this case the INSERT query will inherit the parallelism of the of the above paragraph INSERT INTO `key-values` SELECT `_1` as `key`, `_2` as `value`, `_3` as `et` FROM `key-values-data-generator`
Gunakan Scala interaktif
Ini adalah terjemahan Scala dari Gunakan interaktif SQL. Untuk contoh Scala lainnya, lihat Tabel API
%flink import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=4) // A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time. val query01 = stenv .from("`key-values`") .select( $"et".max().as("et"), $"pt".max().as("pt"), $"value".sum().as("sum") ).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink(parallelism=4) // An tumbling window view that displays the number of records observed per (event time) second. val query02 = stenv .from("`key-values`") .window(Tumble over 1.seconds on $"et" as $"w") .groupBy($"w", $"key") .select( $"w".start.as("window"), $"key", $"value".sum().as("sum") ).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`
Gunakan Python interaktif
Ini adalah terjemahan Python dari Gunakan interaktif SQL. Untuk contoh Python lainnya, lihat Tabel API dalam dokumentasi
%flink.pyflink from pyflink.table.table import Table def as_view(table, name): if (name in st_env.list_temporary_views()): st_env.drop_temporary_view(name) st_env.create_temporary_view(name, table) return table Table.as_view = as_view
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`keyvalues`") \ .select(", ".join([ "max(et) as et", "max(pt) as pt", "sum(value) as sum" ])) \ .as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`key-values`") \ .window(Tumble.over("1.seconds").on("et").alias("w")) \ .group_by("w, key") \ .select(", ".join([ "w.start as window", "key", "sum(value) as sum" ])) \ .as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`
Gunakan kombinasi Python interaktif,SQL, dan Scala
Anda dapat menggunakan kombinasiSQL, Python, dan Scala di buku catatan Anda untuk analisis interaktif. Di notebook Studio yang Anda rencanakan untuk digunakan sebagai aplikasi dengan status tahan lama, Anda dapat menggunakan kombinasi SQL dan Scala. Contoh ini menunjukkan bagian yang diabaikan dan bagian yang dapat digunakan dalam aplikasi dengan status tahan lama.
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-source` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-source-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-target` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-target-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink() // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=1) val table = stenv .from("`default_catalog`.`default_database`.`my-test-source`") .select($"key", $"value", $"et") .filter($"key" > 10) .asView("query01")
%flink.ssql(parallelism=1) -- forward data INSERT INTO `default_catalog`.`default_database`.`my-test-target` SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000) -- forward data to local stream (ignored when deployed as application) SELECT * FROM `query01`
%flink // tell me the meaning of life (ignored when deployed as application!) print("42!")
Gunakan aliran data Kinesis lintas akun
Untuk menggunakan Kinesis data stream yang ada di akun selain akun yang memiliki notebook Studio, buat peran eksekusi layanan di akun tempat notebook Studio Anda berjalan dan kebijakan kepercayaan peran di akun yang memiliki aliran data. Gunakanaws.credentials.provider
,aws.credentials.role.arn
, dan aws.credentials.role.sessionName
di konektor Kinesis dalam DDL pernyataan buat tabel Anda untuk membuat tabel terhadap aliran data.
Gunakan peran eksekusi layanan berikut untuk akun notebook Studio.
{ "Sid": "AllowNotebookToAssumeRole", "Effect": "Allow", "Action": "sts:AssumeRole" "Resource": "*" }
Gunakan kebijakan AmazonKinesisFullAccess
dan kebijakan kepercayaan peran berikut untuk akun aliran data.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
<accountID>
:root" }, "Action": "sts:AssumeRole", "Condition": {} } ] }
Gunakan paragraf berikut untuk membuat pernyataan tabel.
%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::
<accountID>
:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )