Programación de una aplicación de Managed Service para Apache Flink para Python - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Programación de una aplicación de Managed Service para Apache Flink para Python

Usted codifica su aplicación Managed Service para Apache Flink para Python mediante la API Apache Flink Python Table. El motor Apache Flink traduce las declaraciones de la API de tablas de Python (que se ejecutan en la máquina virtual de Python) en declaraciones de la API de tabla de Java (que se ejecutan en la máquina virtual de Java).

Para utilizar la Python Table API, siga estos pasos:

  • Cree una referencia a StreamTableEnvironment.

  • Cree table objetos a partir de sus datos de transmisión de origen ejecutando consultas en la referencia StreamTableEnvironment.

  • Ejecute consultas en sus objetos table para crear tablas de salida.

  • Escriba sus tablas de salida en sus destinos utilizando un StatementSet.

Para empezar a utilizar la API de tabla de Python en Managed Service para Apache Flink, consulte Introducción a Amazon Managed Service para Apache Flink para Python.

Lectura y escritura de datos de transmisión

Para leer y escribir datos de streaming, ejecute consultas SQL en el entorno de tablas.

Creación de una tabla

El siguiente ejemplo de código muestra una función definida por el usuario que crea una consulta SQL. La consulta SQL crea una tabla que interactúa con un flujo de Kinesis:

def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)

Lectura de datos de transmisión

El siguiente ejemplo de código muestra cómo utilizar la consulta SQL CreateTable anterior en una referencia de entorno de tabla para leer datos:

table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))

Escritura de datos de transmisión

El siguiente ejemplo de código muestra cómo utilizar la consulta SQL del ejemplo CreateTable para crear una referencia a la tabla de salida y cómo utilizar un StatementSet para interactuar con las tablas y escribir datos en un flujo de Kinesis de destino:

table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))

Lectura de propiedades de tiempo de ejecución

Puede usar las propiedades de tiempo de ejecución para configurar su aplicación sin tener que cambiar el código de la aplicación.

Las propiedades de la aplicación se especifican de la misma manera que en el caso de una aplicación de Managed Service para Apache Flink para Java. Puede especificar propiedades del tiempo de ejecución de las siguientes maneras:

Puede recuperar las propiedades de la aplicación en el código leyendo un archivo json llamado application_properties.json, creado por el motor de ejecución Managed Service para Apache Flink.

El siguiente ejemplo de código demuestra las propiedades de aplicación de lectura desde el archivo application_properties.json:

file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)

El siguiente ejemplo de código de función definido por el usuario muestra la lectura de un grupo de propiedades del objeto de propiedades de la aplicación: recupera:

def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]

En el siguiente ejemplo de código, se muestra la lectura de una propiedad denominada INPUT_STREAM_KEY de un grupo de propiedades que se devuelve en el ejemplo anterior:

input_stream = input_property_map[INPUT_STREAM_KEY]

Creación del paquete de códigos de la aplicación

Una vez que haya creado su aplicación Python, agrupe el archivo de código y las dependencias en un archivo zip.

El archivo zip debe contener una secuencia de comandos de Python con un método main y, de forma opcional, puede contener lo siguiente:

  • Archivos de código Python adicionales

  • Código Java definido por el usuario en archivos JAR

  • Bibliotecas Java en archivos JAR

nota

El archivo zip de la aplicación debe contener todas las dependencias de la aplicación. No puede hacer referencia a bibliotecas de otros orígenes para su aplicación.