Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Crear y ejecutar una aplicación de Managed Service para Apache Flink para Python
En este ejercicio, se creará una aplicación de Managed Service para Apache Flink para Python con una secuencia de Kinesis como origen y receptor.
Esta sección contiene los siguientes pasos.
- Crear recursos dependientes
- Escribir registros de muestra en el flujo de entrada
- Crear y examinar el código de Python de streaming de Apache Flink
- Añadir dependencias de terceros a las aplicaciones de Python
- Cargar el código de Python de streaming de Apache Flink
- Crear y ejecutar la aplicación de Managed Service para Apache Flink
- Paso siguiente
Crear recursos dependientes
Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:
-
Dos flujos de Kinesis para entrada y salida.
-
Un bucket de Amazon S3 para almacenar el código y los resultados de la aplicación (
ka-app-code-
)<username>
Crear dos flujos de Kinesis
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream
y ExampleOutputStream
). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.
Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI. Para obtener instrucciones sobre la consola, consulte Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams.
Para crear flujos de datos (AWS CLI)
-
Para crear el primer flujo (
ExampleInputStream
), utilice el siguiente comando de la AWS CLIcreate-stream
de Amazon Kinesis.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Crear un bucket de Amazon S3
Puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:
-
¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo
ka-app-code-
.<username>
Otros recursos
Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:
-
Un grupo de registro llamado
/AWS/KinesisAnalytics-java/MyApplication
. -
Un flujo de registro llamado
kinesis-analytics-log-stream
.
Escribir registros de muestra en el flujo de entrada
En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.
nota
Esta sección requiere AWS SDK for Python (Boto)
nota
El script de Python en esta sección usa AWS CLI. Debe configurar sus AWS CLI para usar las credenciales de su cuenta y la región predeterminada. Para configurar su AWS CLI, escriba lo siguiente:
aws configure
-
Cree un archivo denominado
stock.py
con el siguiente contenido:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Ejecute el script
stock.py
:$ python stock.py
Mantenga el script en ejecución mientras completa el resto del tutorial.
Crear y examinar el código de Python de streaming de Apache Flink
El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Instalación de Git
. Clone el repositorio remoto con el siguiente comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Vaya al directorio
amazon-kinesis-data-analytics-java-examples/python/GettingStarted
.
El código de la aplicación se encuentra en el archivo getting_started.py
. Tenga en cuenta lo siguiente en relación con el código de la aplicación:
La aplicación utiliza un origen de tabla de Kinesis para leer del flujo de origen. El siguiente fragmento llama a la función
create_table
para crear el origen de la tabla de Kinesis:table_env.execute_sql( create_table(output_table_name, output_stream, output_region)
La función
create_table
utiliza un comando SQL para crear una tabla respaldada por el origen de streaming:def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
La aplicación crea dos tablas y, a continuación, escribe el contenido de una tabla en la otra.
# 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
La aplicación utiliza el conector Flink, del archivo flink- sql-connector-kinesis _2.12/1.15.2
.
Añadir dependencias de terceros a las aplicaciones de Python
Cuando se utilicen paquetes de Python de terceros (como boto3site-packages
carpeta de entornos de Python para crear una estructura de directorios como la siguiente:
PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-1.15.2.jar │ │ ... ...
Para añadir boto3 como una dependencia de terceros:
Cree un entorno de Python independiente (conda o similar) en su máquina local con las dependencias necesarias.
Anote la lista inicial de paquetes de la carpeta
site_packages
de ese entorno.pip-install
todas las dependencias necesarias para su aplicación.Anote los paquetes que se agregaron a la carpeta
site_packages
después del paso 3 indicado anteriormente. Estas son las carpetas que debe incluir en su paquete (debajo de la carpetamy_deps
), organizadas como se muestra arriba. Esto le permitirá capturar una diferencia de los paquetes entre los pasos 2 y 3 para identificar las dependencias de paquetes adecuadas para su aplicación.Indique
my_deps/
como argumento para la propiedadpyFiles
en el grupo de propiedadeskinesis.analytics.flink.run.options
, tal como se describe a continuación para la propiedadjarfiles
. Flink también le permite especificar las dependencias de Python mediante la función add_python_file, pero es importante tener en cuenta que solo necesita especificar una u otra, no ambas.
nota
No es necesario que le ponga un nombre a la carpeta my_deps
. Lo importante es registrar las dependencias utilizando pyFiles
o, bien, add_python_file
. Puede encontrar un ejemplo en Cómo usar boto3 en pyFlink
Cargar el código de Python de streaming de Apache Flink
En esta sección, creará un bucket de Amazon S3 y cargará el código de la aplicación.
Para cargar el código de la aplicación mediante la consola:
Utilice la aplicación de compresión que prefiera para
getting-started.py
comprimir los archivos https://mvnrepository.com/artifact/org.apache.flink/flink - sql-connector-kinesis _2.12/1.15.2. Dé el nombre myapp.zip
al archivo. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración:GettingStarted/getting-started.py
.Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3/
. -
Elija Crear bucket.
-
Escriba
ka-app-code-
en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.<username>
-
En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.
-
En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.
-
Elija Crear bucket.
-
En la consola de Amazon S3, elija el
<username>bucket ka-app-code- y elija Upload.
-
En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo
myapp.zip
que creó en el paso anterior. Elija Siguiente. -
No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.
Para cargar el código de la aplicación mediante AWS CLI:
nota
No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip
. Esto puede dar lugar a que el código de la aplicación no sea válido.
Utilice la aplicación de compresión que prefiera para
streaming-file-sink.py
comprimir los archivos https://mvnrepository.com/artifact/org.apache.flink/flink - sql-connector-kinesis _2.12/1.15.2. nota
No utilice las características de compresión de Finder (MacOS) o del Explorador de Windows (Windows) para crear el archivo myapp.zip. Esto puede dar lugar a que el código de la aplicación no sea válido.
Utilice la aplicación de compresión que prefiera para comprimir los archivos https://mvnrepository.com/artifact/org.apache.flink/ /1.15.2.
getting-started.py
flink-sql-connector-kinesisDé el nombre myapp.zip
al archivo. Si incluye la carpeta externa en su archivo, debe incluirla en la ruta con el código de su(s) archivo(s) de configuración:GettingStarted/getting-started.py
.Ejecute el siguiente comando:
$ aws s3 --region
aws region
cp myapp.zip s3://ka-app-code-<username>
El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.
Crear y ejecutar la aplicación de Managed Service para Apache Flink
Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.
Crear la aplicación
Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/flink
-
En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.
-
En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:
-
En Nombre de la application, escriba
MyApplication
. -
En Descripción, escriba
My java test app
. -
En Tiempo de ejecución, escriba Apache Flink.
-
Deje la versión como Apache Flink, versión 1.15.2 (versión recomendada).
-
-
Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM.
kinesis-analytics-MyApplication-us-west-2
-
Elija Crear aplicación.
nota
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rol:
kinesisanalytics-
MyApplication
-us-west-2
Configurar la aplicación
Utilice el siguiente procedimiento para configurar la aplicación.
Para configurar la aplicación
-
En la MyApplicationpágina, selecciona Configurar.
-
En la página Configurar aplicación, proporcione la Ubicación del código:
-
Para el bucket de Amazon S3, introduzca
ka-app-code-
.<username>
-
En Ruta al objeto de Amazon S3, introduzca
myapp.zip
.
-
-
En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM
kinesis-analytics-MyApplication-us-west-2
. -
En Propiedades, elija Añadir grupo.
-
Introduzca lo siguiente:
ID de grupo Clave Valor consumer.config.0
input.stream.name
ExampleInputStream
consumer.config.0
aws.region
us-west-2
consumer.config.0
scan.stream.initpos
LATEST
Seleccione Guardar.
En Propiedades, elija Añadir grupo nuevamente.
Introduzca lo siguiente:
ID de grupo Clave Valor producer.config.0
output.stream.name
ExampleOutputStream
producer.config.0
aws.region
us-west-2
producer.config.0
shard.count
1
En Propiedades, elija Añadir grupo nuevamente. En Nombre de grupo, introduzca
kinesis.analytics.flink.run.options
. Este grupo de propiedades especiales le indica a su aplicación dónde encontrar sus recursos de código. Para obtener más información, consulte Especificar sus archivos de código.Introduzca lo siguiente:
ID de grupo Clave Valor kinesis.analytics.flink.run.options
python
getting-started.py
kinesis.analytics.flink.run.options
jarfile
flink-sql-connector-kinesis-1.15.2.jar
-
En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.
-
Para el CloudWatch registro, active la casilla Activar.
-
Seleccione Actualizar.
nota
Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:
-
Grupo de registro:
/aws/kinesis-analytics/MyApplication
-
Flujo de registro:
kinesis-analytics-log-stream
Editar la política de IAM
Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.
Para editar la política de IAM para añadir los permisos para el bucket de S3
Abra la consola de IAM en https://console.aws.amazon.com/iam/
. -
Elija Políticas. Elija la política
kinesis-analytics-service-MyApplication-us-west-2
que la consola creó en su nombre en la sección anterior. -
En la página Resumen, elija Editar política. Seleccione la pestaña JSON.
-
Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (
012345678901
) por el ID de su cuenta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Ejecutar la aplicación
Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.
Detener la aplicación
Para detener la aplicación, en la MyApplicationpágina, seleccione Detener. Confirme la acción.
Paso siguiente
Eliminación de recursos de AWS