View a markdown version of this page

Managed Service for Apache Flink Python 애플리케이션 프로그래밍 - Managed Service for Apache Flink

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Managed Service for Apache Flink Python 애플리케이션 프로그래밍

Apache Flink Python 표 API를 사용하여 Python 애플리케이션용 Managed Service for Apache Flink를 코딩합니다. Apache Flink 엔진은 Python 표 API 명령문(Python가상 머신에서 실행)을 Java 표 API 명령문(Java 가상 머신에서 실행)으로 변환합니다.

Python 표 API를 사용하는 방법은 다음과 같습니다.

  • StreamTableEnvironment에 대한 참조를 생성합니다.

  • StreamTableEnvironment 참조에 대해 쿼리를 실행하여 소스 스트리밍 데이터에서 table 객체를 생성합니다.

  • table 객체에 대해 쿼리를 실행하여 출력 표를 생성합니다.

  • StatementSet을 사용하여 대상에 출력 표를 작성합니다.

Managed Service for Apache Flink에서 Python 표 API를 사용하여 시작하려면 Python으로 Amazon Managed Service for Apache Flink 시작하기 섹션을 참조하십시오.

스트리밍 데이터 읽기 및 쓰기

스트리밍 데이터를 읽고 쓰려면 표 환경에서 SQL 쿼리를 실행합니다.

테이블 생성

다음 코드 예는 SQL 쿼리를 생성하는 사용자 정의 함수를 보여줍니다. SQL 쿼리는 Kinesis 스트림과 상호 작용하는 표를 생성합니다.

def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)

스트리밍 데이터 읽기

다음 코드 예는 표 환경 참조에서 이전 CreateTableSQL 쿼리를 사용하여 데이터를 읽는 방법을 보여줍니다.

table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))

스트리밍 데이터 쓰기

다음 코드 예는 CreateTable 예의 SQL 쿼리를 사용하여 출력 표 참조를 생성하는 방법과 StatementSet를 사용하여 표과 상호 작용하여 대상 Kinesis 스트림에 데이터를 쓰는 방법을 보여줍니다.

table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))

런타임 속성 읽기

애플리케이션 코드를 변경하지 않고도 런타임 속성을 사용하여 애플리케이션을 구성할 수 있습니다.

Java 애플리케이션용 Managed Service for Apache Flink를 사용할 때와 같은 방식으로 애플리케이션의 애플리케이션 속성을 지정합니다. 런타임 속성은 다음과 같은 방법으로 지정할 수 있습니다.

Managed Service for Apache Flink 런타임에서 생성하는 application_properties.json이라는 json 파일을 읽으면 코드에서 애플리케이션 속성을 검색할 수 있습니다.

다음 코드 예는 application_properties.json 파일에서 애플리케이션 속성을 읽는 방법을 보여줍니다.

file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)

다음 사용자 정의 함수 코드 예는 애플리케이션 속성 객체에서 속성 그룹을 읽는 방법을 보여줍니다: 검색합니다:

def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]

다음 코드 예는 이전 예에서 반환하는 속성 그룹에서 INPUT_STREAM_KEY라는 속성을 읽는 방법을 보여 줍니다.

input_stream = input_property_map[INPUT_STREAM_KEY]

애플리케이션의 코드 패키지 생성

Python 애플리케이션을 만든 후에는 코드 파일과 종속성을 zip 파일로 번들로 묶습니다.

zip 파일에는 main 메서드가 있는 Python 스크립트가 포함되어야 하며 선택적으로 다음을 포함할 수 있습니다.

  • 추가 Python 코드 파일

  • JAR 파일의 사용자 정의 Java 코드

  • JAR 파일의 Java 라이브러리

참고

애플리케이션 zip 파일에는 애플리케이션의 모든 종속성이 포함되어야 합니다. 다른 소스의 라이브러리는 애플리케이션에 참조할 수 없습니다.