Managed Service for Apache Flink Python アプリケーションをプログラムする - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink Python アプリケーションをプログラムする

Managed Service for Apache Flink for Python アプリケーションをコーディングするには、Apache Flink Python テーブル を使用しますAPI。Apache Flink エンジンは、Python テーブルAPIステートメント (Python VM で実行) を Java テーブルAPIステートメント (Java VM で実行) に変換します。

Python テーブルは、次のAPI方法で使用します。

  • StreamTableEnvironment へのリファレンスを作成します。

  • StreamTableEnvironment リファレンスに対してクエリを実行して、table ソースストリーミングデータからオブジェクトを作成します。

  • table オブジェクトに対してクエリを実行して出力テーブルを作成します。

  • StatementSet を使用して出力テーブルを宛先に書き込みます。

Managed Service for Apache Flink APIの Python テーブルの使用を開始するには、「」を参照してくださいAmazon Managed Service for Apache Flink for Python の使用を開始する

ストリーミングデータの読み取りと書き込み

ストリーミングデータの読み取りと書き込みを行うには、テーブル環境でSQLクエリを実行します。

テーブルを作成する

次のコード例は、SQLクエリを作成するユーザー定義関数を示しています。SQL クエリは、Kinesis ストリームとやり取りするテーブルを作成します。

def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)

ストリーミングデータの読み取り

次のコード例は、テーブル環境リファレンスで前述のCreateTableSQLクエリを使用してデータを読み取る方法を示しています。

table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))

ストリーミングデータの書き込み

次のコード例は、CreateTableこの例のSQLクエリを使用して出力テーブルリファレンスを作成する方法と、 を使用してテーブルを操作しStatementSetて送信先 Kinesis ストリームにデータを書き込む方法を示しています。

table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))

ランタイムプロパティの読み取り

ランタイムプロパティを使用すると、アプリケーションコードを変更せずにアプリケーションを設定できます。

アプリケーションのアプリケーションプロパティは、Java アプリケーション向けの Apache Flink 用 Managed Service と同じ方法で指定します。ランタイムプロパティは次の方法で指定できます。

コード内でアプリケーション・プロパティを取得するには、Managed Service for Apache Flinkランタイムが作成する application_properties.json と呼ばれるjsonファイルを読み込みます。

以下のサンプルコードは、application_properties.json ファイルからのアプリケーションプロパティの読み取りの例です。

file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)

次のユーザー定義関数のコード例は、アプリケーションプロパティオブジェクト:retrieves からプロパティグループを読み取る方法を示しています。

def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]

次のコード例は、前の例が返すプロパティグループから INPUT_STREAM_KEY というプロパティを読み取る方法を示しています。

input_stream = input_property_map[INPUT_STREAM_KEY]

アプリケーションのコードパッケージを作成する

Python アプリケーションを作成したら、コードファイルと依存関係を zip ファイルにバンドルします。

zip ファイルには main メソッドを含む Python スクリプトが含まれている必要があり、オプションで以下を含めることができます。

  • その他の Python コードファイル

  • JAR ファイル内のユーザー定義 Java コード

  • JAR ファイル内の Java ライブラリ

注記

アプリケーションの ZIP ファイルには、アプリケーションの依存関係がすべて含まれている必要があります。アプリケーションの他のソースからのライブラリを参照することはできません。