为 Apache Flink Python 应用程序编程你的托管服务 - Managed Service for Apache Flink

适用于 Apache Flink 的亚马逊托管服务(亚马逊 MSF)以前被称为适用于 Apache Flink 的亚马逊 Kinesis Data Analytics。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 Apache Flink Python 应用程序编程你的托管服务

您可以使用 Apache Flink Python 表 API 编写 Python 应用程序的 Managed Service for Apache Flink 代码。Apache Flink 引擎将 Python 表 API 语句(在 Python 虚拟机中运行)转换为 Java 表 API 语句(在 Java 虚拟机中运行)。

您可以通过以下步骤使用 Python Table API:

  • 创建对的引用StreamTableEnvironment

  • 通过对StreamTableEnvironment参考文献执行查询,根据源流数据创建table对象。

  • 对您的table对象执行查询以创建输出表。

  • 使用将输出表写入目的地StatementSet

要开始在 Managed Service for Apache Flink 中使用 Python 表 API,请参阅。开始使用适用于 Python 的 Apache Flink 的亚马逊托管服务

读取和写入流数据

要读取和写入流数据,请在表环境中执行 SQL 查询。

创建表

以下代码示例演示了创建 SQL 查询的用户定义函数。SQL 查询会创建一个与 Kinesis 流交互的表:

def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)

读取流媒体数据

以下代码示例演示了如何使用前面的 CreateTable SQL 查询对表环境引用来读取数据:

table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))

写入流数据

以下代码示例演示如何使用CreateTable示例中的 SQL 查询来创建输出表引用,以及如何使用与表交互StatementSet以将数据写入目标 Kinesis 流:

table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))

读取运行时属性

您可以使用运行时系统属性配置应用程序,而无需更改应用程序代码。

为应用程序指定应用程序属性的方式与使用 Java 应用程序的 Managed Service for Apache Flink 方法相同。您可以使用以下方法指定运行时系统属性:

您可以通过读取 Managed Service for Apache Flink 运行时创建application_properties.json的名为 json 文件来检索代码中的应用程序属性。

以下代码示例演示了如何从application_properties.json文件中读取应用程序属性:

file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)

以下用户定义的函数代码示例演示了如何从应用程序属性对象中读取属性组:检索:

def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]

以下代码示例演示如何从上一个示例返回的属性组中读取名为 INPUT_STREAM_KEY 的属性:

input_stream = input_property_map[INPUT_STREAM_KEY]

创建应用程序的代码包

创建 Python 应用程序后,即可将代码文件和依赖项捆绑到一个 zip 文件中。

您的 zip 文件必须包含带有main方法的 python 脚本,并且可以选择包含以下内容:

  • 其他 Python 代码文件

  • JAR 文件中用户定义的 Java 代码

  • JAR 文件中的 Java 库

注意

您的应用程序 zip 文件必须包含应用程序的所有依赖项。您不能为应用程序引用其他来源的库。