

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink Python 애플리케이션 프로그래밍
<a name="how-python-programming"></a>

Apache Flink Python 표 API를 사용하여 Python 애플리케이션용 Managed Service for Apache Flink를 코딩합니다. Apache Flink 엔진은 Python 표 API 명령문(Python가상 머신에서 실행)을 Java 표 API 명령문(Java 가상 머신에서 실행)으로 변환합니다.

Python 표 API를 사용하는 방법은 다음과 같습니다.
+ `StreamTableEnvironment`에 대한 참조를 생성합니다.
+ `StreamTableEnvironment` 참조에 대해 쿼리를 실행하여 소스 스트리밍 데이터에서 `table` 객체를 생성합니다.
+ `table` 객체에 대해 쿼리를 실행하여 출력 표를 생성합니다.
+ `StatementSet`을 사용하여 대상에 출력 표를 작성합니다.

Managed Service for Apache Flink에서 Python 표 API를 사용하여 시작하려면 [Python으로 Amazon Managed Service for Apache Flink 시작하기](gs-python.md) 섹션을 참조하십시오.

## 스트리밍 데이터 읽기 및 쓰기
<a name="how-python-programming-readwrite"></a>

스트리밍 데이터를 읽고 쓰려면 표 환경에서 SQL 쿼리를 실행합니다.

### 테이블 생성
<a name="how-python-programming-readwrite-createtable"></a>

다음 코드 예는 SQL 쿼리를 생성하는 사용자 정의 함수를 보여줍니다. SQL 쿼리는 Kinesis 스트림과 상호 작용하는 표를 생성합니다.

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### 스트리밍 데이터 읽기
<a name="how-python-programming-readwrite-read"></a>

다음 코드 예는 표 환경 참조에서 이전 `CreateTable`SQL 쿼리를 사용하여 데이터를 읽는 방법을 보여줍니다.

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### 스트리밍 데이터 쓰기
<a name="how-python-programming-readwrite-write"></a>

다음 코드 예는 `CreateTable` 예의 SQL 쿼리를 사용하여 출력 표 참조를 생성하는 방법과 `StatementSet`를 사용하여 표과 상호 작용하여 대상 Kinesis 스트림에 데이터를 쓰는 방법을 보여줍니다.

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## 런타임 속성 읽기
<a name="how-python-programming-properties"></a>

애플리케이션 코드를 변경하지 않고도 런타임 속성을 사용하여 애플리케이션을 구성할 수 있습니다.

Java 애플리케이션용 Managed Service for Apache Flink를 사용할 때와 같은 방식으로 애플리케이션의 애플리케이션 속성을 지정합니다. 런타임 속성은 다음과 같은 방법으로 지정할 수 있습니다.
+ [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 작업 사용하기
+ [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업 사용하기
+ 콘솔을 사용하여 애플리케이션 구성하기.

Managed Service for Apache Flink 런타임에서 생성하는 `application_properties.json`이라는 json 파일을 읽으면 코드에서 애플리케이션 속성을 검색할 수 있습니다.

다음 코드 예는 `application_properties.json` 파일에서 애플리케이션 속성을 읽는 방법을 보여줍니다.

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

다음 사용자 정의 함수 코드 예는 애플리케이션 속성 객체에서 속성 그룹을 읽는 방법을 보여줍니다: 검색합니다:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

다음 코드 예는 이전 예에서 반환하는 속성 그룹에서 INPUT\_STREAM\_KEY라는 속성을 읽는 방법을 보여 줍니다.

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## 애플리케이션의 코드 패키지 생성
<a name="how-python-programming-package"></a>

Python 애플리케이션을 만든 후에는 코드 파일과 종속성을 zip 파일로 번들로 묶습니다.

zip 파일에는 `main` 메서드가 있는 Python 스크립트가 포함되어야 하며 선택적으로 다음을 포함할 수 있습니다.
+ 추가 Python 코드 파일
+ JAR 파일의 사용자 정의 Java 코드
+ JAR 파일의 Java 라이브러리

**참고**  
애플리케이션 zip 파일에는 애플리케이션의 모든 종속성이 포함되어야 합니다. 다른 소스의 라이브러리는 애플리케이션에 참조할 수 없습니다.