Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Programación de una aplicación de Managed Service para Apache Flink para Python
Usted codifica su aplicación Managed Service para Apache Flink para Python mediante la API Apache Flink Python Table. El motor Apache Flink traduce las declaraciones de la API de tablas de Python (que se ejecutan en la máquina virtual de Python) en declaraciones de la API de tabla de Java (que se ejecutan en la máquina virtual de Java).
Para utilizar la Python Table API, siga estos pasos:
Cree una referencia a
StreamTableEnvironment.Cree
tableobjetos a partir de sus datos de transmisión de origen ejecutando consultas en la referenciaStreamTableEnvironment.Ejecute consultas en sus objetos
tablepara crear tablas de salida.Escriba sus tablas de salida en sus destinos utilizando un
StatementSet.
Para empezar a utilizar la API de tabla de Python en Managed Service para Apache Flink, consulte Introducción a Amazon Managed Service para Apache Flink para Python.
Lectura y escritura de datos de transmisión
Para leer y escribir datos de streaming, ejecute consultas SQL en el entorno de tablas.
Creación de una tabla
El siguiente ejemplo de código muestra una función definida por el usuario que crea una consulta SQL. La consulta SQL crea una tabla que interactúa con un flujo de Kinesis:
def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)
Lectura de datos de transmisión
El siguiente ejemplo de código muestra cómo utilizar la consulta SQL CreateTable anterior en una referencia de entorno de tabla para leer datos:
table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
Escritura de datos de transmisión
El siguiente ejemplo de código muestra cómo utilizar la consulta SQL del ejemplo CreateTable para crear una referencia a la tabla de salida y cómo utilizar un StatementSet para interactuar con las tablas y escribir datos en un flujo de Kinesis de destino:
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
Lectura de propiedades de tiempo de ejecución
Puede usar las propiedades de tiempo de ejecución para configurar su aplicación sin tener que cambiar el código de la aplicación.
Las propiedades de la aplicación se especifican de la misma manera que en el caso de una aplicación de Managed Service para Apache Flink para Java. Puede especificar propiedades del tiempo de ejecución de las siguientes maneras:
Usando la acción CreateApplication.
Usando la acción UpdateApplication.
Configuración de su aplicación usando la consola
Puede recuperar las propiedades de la aplicación en el código leyendo un archivo json llamado application_properties.json, creado por el motor de ejecución Managed Service para Apache Flink.
El siguiente ejemplo de código demuestra las propiedades de aplicación de lectura desde el archivo application_properties.json:
file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)
El siguiente ejemplo de código de función definido por el usuario muestra la lectura de un grupo de propiedades del objeto de propiedades de la aplicación: recupera:
def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]
En el siguiente ejemplo de código, se muestra la lectura de una propiedad denominada INPUT_STREAM_KEY de un grupo de propiedades que se devuelve en el ejemplo anterior:
input_stream = input_property_map[INPUT_STREAM_KEY]
Creación del paquete de códigos de la aplicación
Una vez que haya creado su aplicación Python, agrupe el archivo de código y las dependencias en un archivo zip.
El archivo zip debe contener una secuencia de comandos de Python con un método main y, de forma opcional, puede contener lo siguiente:
Archivos de código Python adicionales
Código Java definido por el usuario en archivos JAR
Bibliotecas Java en archivos JAR
nota
El archivo zip de la aplicación debe contener todas las dependencias de la aplicación. No puede hacer referencia a bibliotecas de otros orígenes para su aplicación.