Crear y ejecutar una aplicación de Managed Service para Apache Flink para Python - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Crear y ejecutar una aplicación de Managed Service para Apache Flink para Python

En este ejercicio, se creará una aplicación de Managed Service para Apache Flink para Python con una secuencia de Kinesis como origen y receptor.

Crear recursos dependientes

Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de Kinesis para entrada y salida.

  • Un bucket de Amazon S3 para almacenar el código y los resultados de la aplicación (ka-app-code-<username>)

Crear dos flujos de Kinesis

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI. Para obtener instrucciones sobre la consola, consulte Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams.

Para crear flujos de datos (AWS CLI)
  1. Para crear el primer flujo (ExampleInputStream), utilice el siguiente comando de la AWS CLI create-stream de Amazon Kinesis.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Crear un bucket de Amazon S3

Puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:

  • ¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo ka-app-code-<username>.

Otros recursos

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/MyApplication.

  • Un flujo de registro llamado kinesis-analytics-log-stream.

Escribir registros de muestra en el flujo de entrada

En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.

nota

Esta sección requiere AWS SDK for Python (Boto).

nota

El script de Python en esta sección usa AWS CLI. Debe configurar sus AWS CLI para usar las credenciales de su cuenta y la región predeterminada. Para configurar su AWS CLI, escriba lo siguiente:

aws configure
  1. Cree un archivo denominado stock.py con el siguiente contenido:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Ejecute el script stock.py:

    $ python stock.py

    Mantenga el script en ejecución mientras completa el resto del tutorial.

Crear y examinar el código de Python de streaming de Apache Flink

El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Instalación de Git.

  2. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Vaya al directorio amazon-kinesis-data-analytics-java-examples/python/GettingStarted.

El código de la aplicación se encuentra en el archivo getting_started.py. Tenga en cuenta lo siguiente en relación con el código de la aplicación:

  • La aplicación utiliza un origen de tabla de Kinesis para leer del flujo de origen. El siguiente fragmento llama a la función create_table para crear el origen de la tabla de Kinesis:

    table_env.execute_sql( create_table(output_table_name, output_stream, output_region)

    La función create_table utiliza un comando SQL para crear una tabla respaldada por el origen de streaming:

    def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
  • La aplicación crea dos tablas y, a continuación, escribe el contenido de una tabla en la otra.

    # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
  • La aplicación utiliza el conector Flink, del archivo flink- sql-connector-kinesis _2.12/1.15.2.

Añadir dependencias de terceros a las aplicaciones de Python

Cuando se utilicen paquetes de Python de terceros (como boto3), tendrá que añadir sus dependencias transitivas y las propiedades necesarias para adaptarse a estas dependencias. En un nivel superior, para PyPi las dependencias, puede copiar los archivos y carpetas que se encuentran dentro de la site-packages carpeta de entornos de Python para crear una estructura de directorios como la siguiente:

PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-1.15.2.jar │ │ ... ...

Para añadir boto3 como una dependencia de terceros:

  1. Cree un entorno de Python independiente (conda o similar) en su máquina local con las dependencias necesarias.

  2. Anote la lista inicial de paquetes de la carpeta site_packages de ese entorno.

  3. pip-install todas las dependencias necesarias para su aplicación.

  4. Anote los paquetes que se agregaron a la carpeta site_packages después del paso 3 indicado anteriormente. Estas son las carpetas que debe incluir en su paquete (debajo de la carpeta my_deps), organizadas como se muestra arriba. Esto le permitirá capturar una diferencia de los paquetes entre los pasos 2 y 3 para identificar las dependencias de paquetes adecuadas para su aplicación.

  5. Indique my_deps/ como argumento para la propiedad pyFiles en el grupo de propiedades kinesis.analytics.flink.run.options, tal como se describe a continuación para la propiedad jarfiles. Flink también le permite especificar las dependencias de Python mediante la función add_python_file, pero es importante tener en cuenta que solo necesita especificar una u otra, no ambas.

nota

No es necesario que le ponga un nombre a la carpeta my_deps. Lo importante es registrar las dependencias utilizando pyFiles o, bien, add_python_file. Puede encontrar un ejemplo en Cómo usar boto3 en pyFlink.

Cargar el código de Python de streaming de Apache Flink

En esta sección, creará un bucket de Amazon S3 y cargará el código de la aplicación.

Para cargar el código de la aplicación mediante la consola:
  1. Utilice la aplicación de compresión que prefiera para getting-started.py comprimir los archivos https://mvnrepository.com/artifact/org.apache.flink/flink - sql-connector-kinesis _2.12/1.15.2. Dé el nombre myapp.zip al archivo. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración: GettingStarted/getting-started.py.

  2. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3/.

  3. Elija Crear bucket.

  4. Escriba ka-app-code-<username> en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.

  5. En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.

  6. En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.

  7. Elija Crear bucket.

  8. En la consola de Amazon S3, elija el <username>bucket ka-app-code- y elija Upload.

  9. En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo myapp.zip que creó en el paso anterior. Elija Siguiente.

  10. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.

Para cargar el código de la aplicación mediante AWS CLI:
nota

No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip. Esto puede dar lugar a que el código de la aplicación no sea válido.

  1. Utilice la aplicación de compresión que prefiera para streaming-file-sink.py comprimir los archivos https://mvnrepository.com/artifact/org.apache.flink/flink - sql-connector-kinesis _2.12/1.15.2.

    nota

    No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip. Esto puede dar lugar a que el código de la aplicación no sea válido.

  2. Utilice la aplicación de compresión que prefiera para comprimir los archivos https://mvnrepository.com/artifact/org.apache.flink/ /1.15.2. getting-started.py flink-sql-connector-kinesis Dé el nombre myapp.zip al archivo. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración: GettingStarted/getting-started.py.

  3. Ejecute el siguiente comando:

    $ aws s3 --region aws region cp myapp.zip s3://ka-app-code-<username>

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

Crear y ejecutar la aplicación de Managed Service para Apache Flink

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

Crear la aplicación

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/flink

  2. En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.

  3. En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:

    • En Nombre de la application, escriba MyApplication.

    • En Descripción, escriba My java test app.

    • En Tiempo de ejecución, escriba Apache Flink.

    • Deje la versión como Apache Flink, versión 1.15.2 (versión recomendada).

  4. Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM. kinesis-analytics-MyApplication-us-west-2

  5. Elija Crear aplicación.

nota

Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Rol: kinesisanalytics-MyApplication-us-west-2

Configurar la aplicación

Utilice el siguiente procedimiento para configurar la aplicación.

Para configurar la aplicación
  1. En la MyApplicationpágina, selecciona Configurar.

  2. En la página Configurar aplicación, proporcione la Ubicación del código:

    • Para el bucket de Amazon S3, introduzca ka-app-code-<username>.

    • En Ruta al objeto de Amazon S3, introduzca myapp.zip.

  3. En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM kinesis-analytics-MyApplication-us-west-2.

  4. En Propiedades, elija Añadir grupo.

  5. Introduzca lo siguiente:

    ID de grupo Clave Valor
    consumer.config.0 input.stream.name ExampleInputStream
    consumer.config.0 aws.region us-west-2
    consumer.config.0 scan.stream.initpos LATEST

    Seleccione Guardar.

  6. En Propiedades, elija Añadir grupo nuevamente.

  7. Introduzca lo siguiente:

    ID de grupo Clave Valor
    producer.config.0 output.stream.name ExampleOutputStream
    producer.config.0 aws.region us-west-2
    producer.config.0 shard.count 1
  8. En Propiedades, elija Añadir grupo nuevamente. En Nombre de grupo, introduzca kinesis.analytics.flink.run.options. Este grupo de propiedades especiales le indica a su aplicación dónde encontrar sus recursos de código. Para obtener más información, consulte Especificar sus archivos de código.

  9. Introduzca lo siguiente:

    ID de grupo Clave Valor
    kinesis.analytics.flink.run.options python getting-started.py
    kinesis.analytics.flink.run.options jarfile flink-sql-connector-kinesis-1.15.2.jar
  10. En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.

  11. Para el CloudWatch registro, active la casilla Activar.

  12. Seleccione Actualizar.

nota

Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Editar la política de IAM

Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.

Para editar la política de IAM para añadir los permisos para el bucket de S3
  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-west-2 que la consola creó en su nombre en la sección anterior.

  3. En la página Resumen, elija Editar política. Seleccione la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (012345678901) por el ID de su cuenta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Ejecutar la aplicación

Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.

Detener la aplicación

Para detener la aplicación, en la MyApplicationpágina, seleccione Detener. Confirme la acción.

Paso siguiente

Eliminación de recursos de AWS