Creación y ejecución de un servicio gestionado para la aplicación Apache Flink para Python - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Creación y ejecución de un servicio gestionado para la aplicación Apache Flink para Python

En este ejercicio, se creará una aplicación de Managed Service para Apache Flink para Python con una secuencia de Kinesis como origen y receptor.

Cree recursos dependientes

Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de Kinesis para entrada y salida.

  • Un bucket de Amazon S3 para almacenar el código y los resultados de la aplicación (ka-app-code-<username>)

Cree dos transmisiones de Kinesis

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones sobre la consola, consulte Creating and Updating Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams.

Cómo crear flujos de datos (AWS CLI)
  1. Para crear la primera transmisión (ExampleInputStream), utilice el siguiente comando de Amazon Kinesis create-stream AWS CLI .

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Crear un bucket de Amazon S3

Puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:

  • ¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo, ka-app-code-<username>.

Otros recursos

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/MyApplication.

  • Un flujo de registro llamado kinesis-analytics-log-stream.

Escriba registros de muestra en el flujo de entrada

En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.

nota

Esta sección requiere AWS SDK for Python (Boto).

nota

El script de Python en esta sección usa AWS CLI. Debe configurarlas AWS CLI para usar las credenciales de su cuenta y su región predeterminada. Para configurar la suya AWS CLI, introduzca lo siguiente:

aws configure
  1. Cree un archivo denominado stock.py con el siguiente contenido:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Ejecute el script stock.py:

    $ python stock.py

    Mantenga el script en ejecución mientras completa el resto del tutorial.

Cree y examine el código Python de streaming de Apache Flink

El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git.

  2. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Vaya al directorio amazon-kinesis-data-analytics-java-examples/python/GettingStarted.

El código de la aplicación se encuentra en el archivo getting_started.py. Tenga en cuenta lo siguiente en relación con el código de la aplicación:

  • La aplicación utiliza un origen de tabla de Kinesis para leer del flujo de origen. El siguiente fragmento llama a la función create_table para crear el origen de la tabla de Kinesis:

    table_env.execute_sql( create_table(output_table_name, output_stream, output_region)

    La función create_table utiliza un comando SQL para crear una tabla respaldada por el origen de streaming:

    def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
  • La aplicación crea dos tablas y, a continuación, escribe el contenido de una tabla en la otra.

    # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
  • La aplicación utiliza el conector Flink, del archivo flink- sql-connector-kinesis _2.12/1.15.2.

Incorporación de dependencias de terceros a las aplicaciones de Python

Cuando se utilicen paquetes de Python de terceros (como boto3), tendrá que añadir sus dependencias transitivas y las propiedades necesarias para adaptarse a estas dependencias. En un nivel superior, para PyPi las dependencias, puede copiar los archivos y carpetas que se encuentran dentro de la site-packages carpeta de entornos de Python para crear una estructura de directorios como la siguiente:

PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-4.2.0-1.18.jar │ │ ... ...

Cómo añadir boto3 como una dependencia de terceros:

  1. Cree un entorno de Python independiente (conda o similar) en su máquina local con las dependencias necesarias.

  2. Anote la lista inicial de paquetes de la carpeta site_packages de ese entorno.

  3. pip-install todas las dependencias necesarias para su aplicación.

  4. Anote los paquetes que se agregaron a la carpeta site_packages después del paso 3 indicado anteriormente. Estas son las carpetas que debe incluir en su paquete (debajo de la carpeta my_deps), organizadas como se muestra arriba. Esto le permitirá capturar una diferencia de los paquetes entre los pasos 2 y 3 para identificar las dependencias de paquetes adecuadas para su aplicación.

  5. Indique my_deps/ como argumento para la propiedad pyFiles en el grupo de propiedades kinesis.analytics.flink.run.options, tal como se describe a continuación para la propiedad jarfiles. Flink también le permite especificar las dependencias de Python mediante la función add_python_file, pero es importante tener en cuenta que solo necesita especificar una u otra, no ambas.

nota

No es necesario que le ponga un nombre a la carpeta my_deps. Lo importante es registrar las dependencias utilizando pyFiles o, bien, add_python_file. Puede encontrar un ejemplo en Cómo usar boto3 en pyFlink.

Cargue el código Python de streaming de Apache Flink

En esta sección, creará un bucket de Amazon S3 y cargará el código de la aplicación.

Cómo cargar el código de la aplicación mediante la consola:
  1. Utilice su aplicación de compresión preferida para comprimir los archivos del conector SQL getting-started.py y de Flink. Dé nombre al archivo myapp.zip. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración: GettingStarted/getting-started.py.

  2. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3.

  3. Elija Crear bucket.

  4. Escriba ka-app-code-<username> en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.

  5. En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.

  6. En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.

  7. Elija Crear bucket.

  8. En la consola de Amazon S3, elija el <username>bucket ka-app-code- y, a continuación, seleccione Upload.

  9. En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo myapp.zip que creó en el paso anterior. Elija Siguiente.

  10. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.

Cómo cargar el código de la aplicación mediante AWS CLI:
nota

No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip. Esto puede dar lugar a que el código de la aplicación no sea válido.

  1. Utilice la aplicación de compresión que prefiera para comprimir los archivos del conector SQL streaming-file-sink.py y de Flink.

    nota

    No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip. Esto puede dar lugar a que el código de la aplicación no sea válido.

  2. Utilice la aplicación de compresión que prefiera para comprimir los archivos del conector SQL getting-started.py y de Flink. Dé nombre al archivo myapp.zip. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración: GettingStarted/getting-started.py.

  3. Ejecute el siguiente comando:

    $ aws s3 --region aws region cp myapp.zip s3://ka-app-code-<username>

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

Cree y ejecute la aplicación Managed Service for Apache Flink

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

Creación de la aplicación

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/flink.

  2. En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.

  3. En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:

    • En Nombre de la aplicación, escriba MyApplication.

    • En Descripción, escriba My java test app.

    • En Tiempo de ejecución, escriba Apache Flink.

    • Deje la versión como Apache Flink versión 1.18.

  4. Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM. kinesis-analytics-MyApplication-us-west-2

  5. Elija Crear aplicación.

nota

Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Rol: kinesisanalytics-MyApplication-us-west-2

Configure la aplicación

Utilice el siguiente procedimiento para configurar la aplicación.

Cómo configurar la aplicación
  1. En la MyApplicationpágina, elija Configurar.

  2. En la página Configurar aplicación, proporcione la Ubicación del código:

    • Para el bucket de Amazon S3, introduzca ka-app-code-<username>.

    • En Ruta al objeto de Amazon S3, introduzca myapp.zip.

  3. En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM kinesis-analytics-MyApplication-us-west-2.

  4. En Propiedades, elija Añadir grupo.

  5. Introduzca lo siguiente:

    ID de grupo Clave Valor
    consumer.config.0 input.stream.name ExampleInputStream
    consumer.config.0 aws.region us-west-2
    consumer.config.0 scan.stream.initpos LATEST

    Seleccione Guardar.

  6. En Propiedades, elija Añadir grupo nuevamente.

  7. Introduzca lo siguiente:

    ID de grupo Clave Valor
    producer.config.0 output.stream.name ExampleOutputStream
    producer.config.0 aws.region us-west-2
    producer.config.0 shard.count 1
  8. En Propiedades, elija Añadir grupo nuevamente. En Nombre de grupo, introduzca flink.sql.connector.kinesis.options. Este grupo de propiedades especiales le indica a su aplicación dónde encontrar sus recursos de código. Para obtener más información, consulte Especificar los archivos de código.

  9. Introduzca lo siguiente:

    ID de grupo Clave Valor
    kinesis.analytics.flink.run.options python getting-started.py
    kinesis.analytics.flink.run.options jarfile flink-sql-connector-kinesis-4.2.0-1.18.jar
  10. En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.

  11. Para el CloudWatch registro, active la casilla Activar.

  12. Elija Actualizar.

nota

Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Edite la política de IAM

Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.

Cómo editar la política de IAM para añadir los permisos para el bucket de S3
  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-west-2 que la consola creó en su nombre en la sección anterior.

  3. En la página Resumen, elija Editar política. Seleccione la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (012345678901) por el ID de su cuenta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Ejecución de la aplicación

Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.

Detenga la aplicación

Para detener la aplicación, en la MyApplicationpágina, seleccione Detener. Confirme la acción.

Siguiente paso

Limpia AWS los recursos