예 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Flink SQL 쿼리 설정에 대한 자세한 내용은 대화형 데이터 분석을 위한 Zeppelin 노트북의 Flink를 참조하세요.

Apache Flink 대시보드에서 애플리케이션을 보려면 애플리케이션의 Zeppelin Note 페이지에서 FLINK JOB을 선택하세요.

윈도우 쿼리에 대한 자세한 내용은 Apache Flink 설명서윈도우를 참조하세요.

Apache Flink Streaming SQL 쿼리의 더 많은 예는 Apache Flink 설명서쿼리를 참조하세요.

Amazon MSK/Apache Kafka로 테이블 생성

Managed Service for Apache Flink Studio와 Amazon MSK Flink 커넥터를 사용하여 일반 텍스트, SSL 또는 IAM 인증과의 연결을 인증할 수 있습니다. 요구 사항에 따라 특정 속성을 사용하여 테이블을 생성하세요.

-- Plaintext connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- SSL connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts', 'properties.ssl.truststore.password' = 'changeit', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- IAM connection (or for MSK Serverless) CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'AWS_MSK_IAM', 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );

Apache Kafka SQL Connector에서 이러한 속성을 다른 속성과 결합할 수 있습니다.

Kinesis를 사용하여 테이블 생성

다음 예제에서는 Kinesis를 사용하여 테이블을 생성합니다.

CREATE TABLE KinesisTable ( `column1` BIGINT, `column2` BIGINT, `column3` BIGINT, `column4` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (column1, column2) WITH ( 'connector' = 'kinesis', 'stream' = 'test_stream', 'aws.region' = '<region>', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );

사용할 수 있는 다른 속성에 대한 자세한 내용은 Amazon Kinesis Data Streams SQL Connector를 참조하세요.

텀블링 윈도우

다음 Flink Streaming SQL 쿼리는 ZeppelinTopic 테이블에서 각 5초의 텀블링 윈도우 내에서 가장 높은 가격을 선택합니다.

%flink.ssql(type=update) SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker FROM ZeppelinTopic GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)

슬라이딩 윈도우

다음 Apache Flink Streaming SQL 쿼리는 ZeppelinTopic 표에서 각 5초 슬라이딩 윈도우에서 가장 높은 가격을 선택합니다.

%flink.ssql(type=update) SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max FROM ZeppelinTopic//or your table name in AWS Glue GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)

대화형 SQL

이 예제는 최대 이벤트 시간 및 처리 시간과 키 값 테이블의 값의 합계를 인쇄합니다. 데이터 생성기 샘플 데이터 생성 스크립트가 실행 중인지 확인하세요. Studio 노트북에서 필터링 및 조인과 같은 다른 SQL 쿼리를 시도하려면 Apache Flink 설명서: Apache Flink 설명서의 쿼리를 참조하세요.

%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time. SELECT MAX(`et`) as `et`, MAX(`pt`) as `pt`, SUM(`value`) as `sum` FROM `key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive tumbling window query that displays the number of records observed per (event time) second. -- Browse through the chart views to see different visualizations of the streaming result. SELECT TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`, `key`, SUM(`value`) as `sum` FROM `key-values` GROUP BY TUMBLE(`et`, INTERVAL '1' SECONDS), `key`;

BlackHole SQL 커넥터

BlackHole SQL 커넥터는 쿼리를 테스트하기 위해 Kinesis 데이터 스트림 또는 Amazon MSK 클러스터를 생성할 필요가 없습니다. BlackHoleSQL 커넥터에 대한 자세한 내용은 Apache BlackHole Flink 설명서의 SQL 커넥터를 참조하십시오. 이 예제에서 기본 카탈로그는 인메모리 카탈로그입니다.

%flink.ssql CREATE TABLE default_catalog.default_database.blackhole_table ( `key` BIGINT, `value` BIGINT, `et` TIMESTAMP(3) ) WITH ( 'connector' = 'blackhole' )
%flink.ssql(parallelism=1) INSERT INTO `test-target` SELECT `key`, `value`, `et` FROM `test-source` WHERE `key` > 3
%flink.ssql(parallelism=2) INSERT INTO `default_catalog`.`default_database`.`blackhole_table` SELECT `key`, `value`, `et` FROM `test-target` WHERE `key` > 7

데이터 생성기

이 예제에서는 Scala를 사용하여 샘플 데이터를 생성합니다. 이 샘플 데이터를 사용하여 다양한 쿼리를 테스트할 수 있습니다. 테이블 생성 명령문을 사용하여 키 값 테이블을 생성합니다.

import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import org.apache.flink.streaming.api.scala.DataStream import java.sql.Timestamp // ad-hoc convenience methods to be defined on Table implicit class TableOps[T](table: DataStream[T]) { def asView(name: String): DataStream[T] = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView("`" + name + "`") } stenv.createTemporaryView("`" + name + "`", table) return table; } }
%flink(parallelism=4) val stream = senv .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000)) .map(key => (key, 1, new Timestamp(System.currentTimeMillis))) .asView("key-values-data-generator")
%flink.ssql(parallelism=4) -- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)") -- in this case the INSERT query will inherit the parallelism of the of the above paragraph INSERT INTO `key-values` SELECT `_1` as `key`, `_2` as `value`, `_3` as `et` FROM `key-values-data-generator`

대화형 Scala

대화형 SQL의 스칼라 번역본입니다. 더 많은 스칼라 예제를 보려면 Apache Flink 설명서의 테이블 API를 참조하세요.

%flink import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=4) // A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time. val query01 = stenv .from("`key-values`") .select( $"et".max().as("et"), $"pt".max().as("pt"), $"value".sum().as("sum") ).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink(parallelism=4) // An tumbling window view that displays the number of records observed per (event time) second. val query02 = stenv .from("`key-values`") .window(Tumble over 1.seconds on $"et" as $"w") .groupBy($"w", $"key") .select( $"w".start.as("window"), $"key", $"value".sum().as("sum") ).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

대화형 Python

대화형 SQL의 Python 번역은 다음과 같습니다 더 많은 Python 예제를 보려면 Apache Flink 문서의 테이블 API를 참조하세요.

%flink.pyflink from pyflink.table.table import Table def as_view(table, name): if (name in st_env.list_temporary_views()): st_env.drop_temporary_view(name) st_env.create_temporary_view(name, table) return table Table.as_view = as_view
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`keyvalues`") \ .select(", ".join([ "max(et) as et", "max(pt) as pt", "sum(value) as sum" ])) \ .as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`key-values`") \ .window(Tumble.over("1.seconds").on("et").alias("w")) \ .group_by("w, key") \ .select(", ".join([ "w.start as window", "key", "sum(value) as sum" ])) \ .as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

대화형 Python, SQL, 스칼라

대화형 분석을 위해 노트북에서 SQL, Python 및 Scala의 모든 조합을 사용할 수 있습니다. 지속 가능한 상태의 애플리케이션으로 배포할 Studio 노트북에서는 SQL과 Scala를 함께 사용할 수 있습니다. 이 예제에서는 무시되는 섹션과 지속 가능한 상태의 애플리케이션에 배포되는 섹션을 보여줍니다.

%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-source` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-source-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-target` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-target-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink() // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=1) val table = stenv .from("`default_catalog`.`default_database`.`my-test-source`") .select($"key", $"value", $"et") .filter($"key" > 10) .asView("query01")
%flink.ssql(parallelism=1) -- forward data INSERT INTO `default_catalog`.`default_database`.`my-test-target` SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000) -- forward data to local stream (ignored when deployed as application) SELECT * FROM `query01`
%flink // tell me the meaning of life (ignored when deployed as application!) print("42!")

계정 간 Kinesis 데이터 스트림

Studio 노트북이 있는 계정이 아닌 다른 계정에 있는 Kinesis 데이터 스트림을 사용하려면 Studio 노트북이 실행되는 계정에서 서비스 실행 역할을 생성하고 데이터 스트림이 있는 계정에서 역할 신뢰 정책을 생성하세요. 테이블 생성 DDL 명령문의 Kinesis 커넥터에서 aws.credentials.provider, aws.credentials.role.arn, 및 aws.credentials.role.sessionName을(를) 사용하여 데이터 스트림에 대한 테이블을 생성합니다.

Studio 노트북 계정에 다음 서비스 실행 역할을 사용하세요.

{ "Sid": "AllowNotebookToAssumeRole", "Effect": "Allow", "Action": "sts:AssumeRole" "Resource": "*" }

데이터 스트림 계정에 AmazonKinesisFullAccess 정책과 다음 역할 신뢰 정책을 사용하세요.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<accountID>:root" }, "Action": "sts:AssumeRole", "Condition": {} } ] }

테이블 생성 명령문에는 다음 단락을 사용하세요.

%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )