

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 程式設計 Managed Service for Apache Flink Python 應用程式
<a name="how-python-programming"></a>

您可以使用 Apache Flink Python 資料表 API 對 Managed Service for Apache Flink Python 應用程式進行編碼。Apache Flink 引擎將 Python 資料表 API 陳述式 (在 Python 虛擬機中執行) 轉換為 Java 資料表 API 陳述式 (在 Java 虛擬機中執行)。

執行下列動作以使用 Python 資料表 API：
+ 建立對 `StreamTableEnvironment` 的參考。
+ 透過對 `table` 參考執行查詢，從來源串流資料建立 `StreamTableEnvironment` 物件。
+ 對 `table` 物件執行查詢以建立輸出資料表。
+ 使用 `StatementSet` 將輸出資料表寫入目的地。

若要開始在 Managed Service for Apache Flink 中使用 Python 資料表 API，請參閱 [Amazon Managed Service for Apache Flink for Python 入門](gs-python.md)。

## 讀取和寫入串流資料
<a name="how-python-programming-readwrite"></a>

若要讀取和寫入串流資料，請在資料表環境中執行 SQL 查詢。

### 建立資料表
<a name="how-python-programming-readwrite-createtable"></a>

下列程式碼範例示範可建立 SQL 查詢的使用者定義函數。SQL 查詢會建立與 Kinesis 串流互動的資料表：

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### 讀取串流資料
<a name="how-python-programming-readwrite-read"></a>

下列程式碼範例示範如何在資料表環境參考上使用先前的 `CreateTable` SQL 查詢來讀取資料：

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### 寫入串流資料
<a name="how-python-programming-readwrite-write"></a>

下列程式碼範例示範如何使用 `CreateTable` 範例中的 SQL 查詢來建立輸出資料表參考，以及如何使用 `StatementSet` 與資料表互動，以將資料寫入目的地 Kinesis 串流：

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## 讀取執行期屬性
<a name="how-python-programming-properties"></a>

您可以使用執行期屬性來設定應用程式，而無需變更應用程式的程式碼。

您可以指定應用程式的應用程式屬性，指定方式與 Managed Service for Apache Flink Java 應用程式相同。您可採用以下方式來指定執行期屬性：
+ 使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 動作。
+ 使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作。
+ 使用主控台設定應用程式。

您可以讀取 Managed Service for Apache Flink 執行期所建立的名為 `application_properties.json` 的 json 檔案，藉此擷取程式碼中的應用程式屬性。

下列程式碼範例示範從 `application_properties.json` 檔案讀取應用程式屬性：

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

下列使用者定義函數的程式碼範例示範如何從應用程式屬性物件讀取屬性群組：擷取：

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

下列程式碼範例示範從上一個範例傳回的屬性群組讀取名為 INPUT\_STREAM\_KEY 的屬性：

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## 建立應用程式的程式碼套件
<a name="how-python-programming-package"></a>

建立 Python 應用程式之後，您就可以將程式碼檔案和相依性綁定到 zip 檔案中。

您的 zip 檔案必須包含帶有 `main` 方法的 Python 指令碼，並且可以選擇包含以下內容：
+ 其他 Python 程式碼檔案
+ JAR 檔案中使用者定義的 Java 程式碼
+ JAR 檔案中的 Java 程式庫

**注意**  
應用程式 zip 檔案必須包含應用程式的所有相依性。您無法為應用程式參考其他來源的程式庫。