

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Programación de una aplicación de Managed Service para Apache Flink para Python
<a name="how-python-programming"></a>

Usted codifica su aplicación Managed Service para Apache Flink para Python mediante la API Apache Flink Python Table. El motor Apache Flink traduce las declaraciones de la API de tablas de Python (que se ejecutan en la máquina virtual de Python) en declaraciones de la API de tabla de Java (que se ejecutan en la máquina virtual de Java). 

Para utilizar la Python Table API, siga estos pasos:
+ Cree una referencia a `StreamTableEnvironment`.
+ Cree `table` objetos a partir de sus datos de transmisión de origen ejecutando consultas en la referencia `StreamTableEnvironment`.
+ Ejecute consultas en sus objetos `table` para crear tablas de salida.
+ Escriba sus tablas de salida en sus destinos utilizando un `StatementSet`.

Para empezar a utilizar la API de tabla de Python en Managed Service para Apache Flink, consulte [Introducción a Amazon Managed Service para Apache Flink para Python](gs-python.md).

## Lectura y escritura de datos de transmisión
<a name="how-python-programming-readwrite"></a>

Para leer y escribir datos de streaming, ejecute consultas SQL en el entorno de tablas.

### Creación de una tabla
<a name="how-python-programming-readwrite-createtable"></a>

El siguiente ejemplo de código muestra una función definida por el usuario que crea una consulta SQL. La consulta SQL crea una tabla que interactúa con un flujo de Kinesis:

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Lectura de datos de transmisión
<a name="how-python-programming-readwrite-read"></a>

El siguiente ejemplo de código muestra cómo utilizar la consulta SQL `CreateTable` anterior en una referencia de entorno de tabla para leer datos:

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Escritura de datos de transmisión
<a name="how-python-programming-readwrite-write"></a>

El siguiente ejemplo de código muestra cómo utilizar la consulta SQL del ejemplo `CreateTable` para crear una referencia a la tabla de salida y cómo utilizar un `StatementSet` para interactuar con las tablas y escribir datos en un flujo de Kinesis de destino:

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Lectura de propiedades de tiempo de ejecución
<a name="how-python-programming-properties"></a>

Puede usar las propiedades de tiempo de ejecución para configurar su aplicación sin tener que cambiar el código de la aplicación.

Las propiedades de la aplicación se especifican de la misma manera que en el caso de una aplicación de Managed Service para Apache Flink para Java. Puede especificar propiedades del tiempo de ejecución de las siguientes maneras:
+ Uso de la acción [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html).
+ Uso de la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción.
+ Configuración de su aplicación usando la consola

Puede recuperar las propiedades de la aplicación en el código leyendo un archivo json llamado `application_properties.json`, creado por el motor de ejecución Managed Service para Apache Flink.

El siguiente ejemplo de código demuestra las propiedades de aplicación de lectura desde el archivo `application_properties.json`:

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

El siguiente ejemplo de código de función definido por el usuario muestra la lectura de un grupo de propiedades del objeto de propiedades de la aplicación: recupera:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

En el siguiente ejemplo de código, se muestra la lectura de una propiedad denominada INPUT\_STREAM\_KEY de un grupo de propiedades que se devuelve en el ejemplo anterior:

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Creación del paquete de códigos de la aplicación
<a name="how-python-programming-package"></a>

Una vez que haya creado su aplicación Python, agrupe el archivo de código y las dependencias en un archivo zip.

El archivo zip debe contener una secuencia de comandos de Python con un método `main` y, de forma opcional, puede contener lo siguiente:
+ Archivos de código Python adicionales
+ User-defined Código Java en archivos JAR
+ Bibliotecas Java en archivos JAR

**nota**  
El archivo zip de la aplicación debe contener todas las dependencias de la aplicación. No puede hacer referencia a bibliotecas de otros orígenes para su aplicación.