Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Creación y ejecución de un servicio gestionado para la aplicación Apache Flink para Python
En este ejercicio, se creará una aplicación de Managed Service para Apache Flink para Python con una secuencia de Kinesis como origen y receptor.
Esta sección contiene los siguientes pasos.
- Cree recursos dependientes
- Escriba registros de muestra en el flujo de entrada
- Cree y examine el código Python de streaming de Apache Flink
- Incorporación de dependencias de terceros a las aplicaciones de Python
- Cargue el código Python de streaming de Apache Flink
- Cree y ejecute la aplicación Managed Service for Apache Flink
- Siguiente paso
Cree recursos dependientes
Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:
-
Dos flujos de Kinesis para entrada y salida.
-
Un bucket de Amazon S3 para almacenar el código y los resultados de la aplicación (
ka-app-code-
)<username>
Cree dos transmisiones de Kinesis
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream
y ExampleOutputStream
). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.
Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones sobre la consola, consulte Creating and Updating Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams.
Cómo crear flujos de datos (AWS CLI)
-
Para crear la primera transmisión (
ExampleInputStream
), utilice el siguiente comando de Amazon Kinesiscreate-stream
AWS CLI .$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Crear un bucket de Amazon S3
Puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:
-
¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo,
ka-app-code-
.<username>
Otros recursos
Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:
-
Un grupo de registro llamado
/AWS/KinesisAnalytics-java/MyApplication
. -
Un flujo de registro llamado
kinesis-analytics-log-stream
.
Escriba registros de muestra en el flujo de entrada
En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.
nota
Esta sección requiere AWS SDK for Python (Boto)
nota
El script de Python en esta sección usa AWS CLI. Debe configurarlas AWS CLI para usar las credenciales de su cuenta y su región predeterminada. Para configurar la suya AWS CLI, introduzca lo siguiente:
aws configure
-
Cree un archivo denominado
stock.py
con el siguiente contenido:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Ejecute el script
stock.py
:$ python stock.py
Mantenga el script en ejecución mientras completa el resto del tutorial.
Cree y examine el código Python de streaming de Apache Flink
El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git
. Clone el repositorio remoto con el siguiente comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Vaya al directorio
amazon-kinesis-data-analytics-java-examples/python/GettingStarted
.
El código de la aplicación se encuentra en el archivo getting_started.py
. Tenga en cuenta lo siguiente en relación con el código de la aplicación:
La aplicación utiliza un origen de tabla de Kinesis para leer del flujo de origen. El siguiente fragmento llama a la función
create_table
para crear el origen de la tabla de Kinesis:table_env.execute_sql( create_table(output_table_name, output_stream, output_region)
La función
create_table
utiliza un comando SQL para crear una tabla respaldada por el origen de streaming:def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
La aplicación crea dos tablas y, a continuación, escribe el contenido de una tabla en la otra.
# 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
La aplicación utiliza el conector Flink, del archivo flink- sql-connector-kinesis _2.12/1.15.2
.
Incorporación de dependencias de terceros a las aplicaciones de Python
Cuando se utilicen paquetes de Python de terceros (como boto3site-packages
carpeta de entornos de Python para crear una estructura de directorios como la siguiente:
PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-4.2.0-1.18.jar │ │ ... ...
Cómo añadir boto3 como una dependencia de terceros:
Cree un entorno de Python independiente (conda o similar) en su máquina local con las dependencias necesarias.
Anote la lista inicial de paquetes de la carpeta
site_packages
de ese entorno.pip-install
todas las dependencias necesarias para su aplicación.Anote los paquetes que se agregaron a la carpeta
site_packages
después del paso 3 indicado anteriormente. Estas son las carpetas que debe incluir en su paquete (debajo de la carpetamy_deps
), organizadas como se muestra arriba. Esto le permitirá capturar una diferencia de los paquetes entre los pasos 2 y 3 para identificar las dependencias de paquetes adecuadas para su aplicación.Indique
my_deps/
como argumento para la propiedadpyFiles
en el grupo de propiedadeskinesis.analytics.flink.run.options
, tal como se describe a continuación para la propiedadjarfiles
. Flink también le permite especificar las dependencias de Python mediante la función add_python_file, pero es importante tener en cuenta que solo necesita especificar una u otra, no ambas.
nota
No es necesario que le ponga un nombre a la carpeta my_deps
. Lo importante es registrar las dependencias utilizando pyFiles
o, bien, add_python_file
. Puede encontrar un ejemplo en Cómo usar boto3 en pyFlink
Cargue el código Python de streaming de Apache Flink
En esta sección, creará un bucket de Amazon S3 y cargará el código de la aplicación.
Cómo cargar el código de la aplicación mediante la consola:
Utilice su aplicación de compresión preferida para comprimir los archivos del conector SQL
getting-started.py
y de Flink. Dé nombre al archivo myapp.zip
. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración:GettingStarted/getting-started.py
.Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3
. -
Elija Crear bucket.
-
Escriba
ka-app-code-
en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.<username>
-
En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.
-
En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.
-
Elija Crear bucket.
-
En la consola de Amazon S3, elija el
<username>bucket ka-app-code- y, a continuación, seleccione Upload.
-
En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo
myapp.zip
que creó en el paso anterior. Elija Siguiente. -
No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.
Cómo cargar el código de la aplicación mediante AWS CLI:
nota
No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip
. Esto puede dar lugar a que el código de la aplicación no sea válido.
Utilice la aplicación de compresión que prefiera para comprimir los archivos del conector SQL
streaming-file-sink.py
y de Flink. nota
No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip. Esto puede dar lugar a que el código de la aplicación no sea válido.
Utilice la aplicación de compresión que prefiera para comprimir los archivos del conector SQL
getting-started.py
y de Flink. Dé nombre al archivo myapp.zip
. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración:GettingStarted/getting-started.py
.Ejecute el siguiente comando:
$ aws s3 --region
aws region
cp myapp.zip s3://ka-app-code-<username>
El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.
Cree y ejecute la aplicación Managed Service for Apache Flink
Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.
Creación de la aplicación
Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/flink.
-
En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.
-
En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:
-
En Nombre de la aplicación, escriba
MyApplication
. -
En Descripción, escriba
My java test app
. -
En Tiempo de ejecución, escriba Apache Flink.
-
Deje la versión como Apache Flink versión 1.18.
-
-
Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM.
kinesis-analytics-MyApplication-us-west-2
-
Elija Crear aplicación.
nota
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rol:
kinesisanalytics-
MyApplication
-us-west-2
Configure la aplicación
Utilice el siguiente procedimiento para configurar la aplicación.
Cómo configurar la aplicación
-
En la MyApplicationpágina, elija Configurar.
-
En la página Configurar aplicación, proporcione la Ubicación del código:
-
Para el bucket de Amazon S3, introduzca
ka-app-code-
.<username>
-
En Ruta al objeto de Amazon S3, introduzca
myapp.zip
.
-
-
En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM
kinesis-analytics-MyApplication-us-west-2
. -
En Propiedades, elija Añadir grupo.
-
Introduzca lo siguiente:
ID de grupo Clave Valor consumer.config.0
input.stream.name
ExampleInputStream
consumer.config.0
aws.region
us-west-2
consumer.config.0
scan.stream.initpos
LATEST
Seleccione Guardar.
En Propiedades, elija Añadir grupo nuevamente.
Introduzca lo siguiente:
ID de grupo Clave Valor producer.config.0
output.stream.name
ExampleOutputStream
producer.config.0
aws.region
us-west-2
producer.config.0
shard.count
1
En Propiedades, elija Añadir grupo nuevamente. En Nombre de grupo, introduzca
flink.sql.connector.kinesis.options
. Este grupo de propiedades especiales le indica a su aplicación dónde encontrar sus recursos de código. Para obtener más información, consulte Especificar los archivos de código.Introduzca lo siguiente:
ID de grupo Clave Valor kinesis.analytics.flink.run.options
python
getting-started.py
kinesis.analytics.flink.run.options
jarfile
flink-sql-connector-kinesis-4.2.0-1.18.jar
-
En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.
-
Para el CloudWatch registro, active la casilla Activar.
-
Elija Actualizar.
nota
Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:
-
Grupo de registro:
/aws/kinesis-analytics/MyApplication
-
Flujo de registro:
kinesis-analytics-log-stream
Edite la política de IAM
Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.
Cómo editar la política de IAM para añadir los permisos para el bucket de S3
Abra la consola de IAM en https://console.aws.amazon.com/iam/
. -
Elija Políticas. Elija la política
kinesis-analytics-service-MyApplication-us-west-2
que la consola creó en su nombre en la sección anterior. -
En la página Resumen, elija Editar política. Seleccione la pestaña JSON.
-
Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (
012345678901
) por el ID de su cuenta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Ejecución de la aplicación
Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.
Detenga la aplicación
Para detener la aplicación, en la MyApplicationpágina, seleccione Detener. Confirme la acción.