Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Managed Service for Apache Flink Python アプリケーションをプログラムする
Managed Service for Apache Flink for Python アプリケーションをコーディングするには、Apache Flink Python テーブル を使用しますAPI。Apache Flink エンジンは、Python テーブルAPIステートメント (Python VM で実行) を Java テーブルAPIステートメント (Java VM で実行) に変換します。
Python テーブルは、次のAPI方法で使用します。
StreamTableEnvironment
へのリファレンスを作成します。StreamTableEnvironment
リファレンスに対してクエリを実行して、table
ソースストリーミングデータからオブジェクトを作成します。table
オブジェクトに対してクエリを実行して出力テーブルを作成します。StatementSet
を使用して出力テーブルを宛先に書き込みます。
Managed Service for Apache Flink APIの Python テーブルの使用を開始するには、「」を参照してくださいAmazon Managed Service for Apache Flink for Python の使用を開始する。
ストリーミングデータの読み取りと書き込み
ストリーミングデータの読み取りと書き込みを行うには、テーブル環境でSQLクエリを実行します。
テーブルを作成する
次のコード例は、SQLクエリを作成するユーザー定義関数を示しています。SQL クエリは、Kinesis ストリームとやり取りするテーブルを作成します。
def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)
ストリーミングデータの読み取り
次のコード例は、テーブル環境リファレンスで前述のCreateTable
SQLクエリを使用してデータを読み取る方法を示しています。
table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
ストリーミングデータの書き込み
次のコード例は、CreateTable
この例のSQLクエリを使用して出力テーブルリファレンスを作成する方法と、 を使用してテーブルを操作しStatementSet
て送信先 Kinesis ストリームにデータを書き込む方法を示しています。
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
ランタイムプロパティの読み取り
ランタイムプロパティを使用すると、アプリケーションコードを変更せずにアプリケーションを設定できます。
アプリケーションのアプリケーションプロパティは、Java アプリケーション向けの Apache Flink 用 Managed Service と同じ方法で指定します。ランタイムプロパティは次の方法で指定できます。
CreateApplication アクションの使用。
UpdateApplication アクションの使用。
コンソールを使ってアプリケーションを設定します。
コード内でアプリケーション・プロパティを取得するには、Managed Service for Apache Flinkランタイムが作成する application_properties.json
と呼ばれるjsonファイルを読み込みます。
以下のサンプルコードは、application_properties.json
ファイルからのアプリケーションプロパティの読み取りの例です。
file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)
次のユーザー定義関数のコード例は、アプリケーションプロパティオブジェクト:retrieves からプロパティグループを読み取る方法を示しています。
def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]
次のコード例は、前の例が返すプロパティグループから INPUT_STREAM_KEY というプロパティを読み取る方法を示しています。
input_stream = input_property_map[INPUT_STREAM_KEY]
アプリケーションのコードパッケージを作成する
Python アプリケーションを作成したら、コードファイルと依存関係を zip ファイルにバンドルします。
zip ファイルには main
メソッドを含む Python スクリプトが含まれている必要があり、オプションで以下を含めることができます。
その他の Python コードファイル
JAR ファイル内のユーザー定義 Java コード
JAR ファイル内の Java ライブラリ
注記
アプリケーションの ZIP ファイルには、アプリケーションの依存関係がすべて含まれている必要があります。アプリケーションの他のソースからのライブラリを参照することはできません。