Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Creación de una aplicación con Apache Beam
En este ejercicio, creará una aplicación de Managed Service para Apache Flink que transforma datos usando Apache Beam
nota
Para configurar los requisitos previos necesarios para este ejercicio, primero complete el ejercicio Tutorial: introducción al uso de la API de DataStream en Managed Service para Apache Flink.
Este tema contiene las siguientes secciones:
Creación de recursos dependientes
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:
Dos flujos de datos de Kinesis (
ExampleInputStreamyExampleOutputStream)Un bucket de Amazon S3 para almacenar el código de la aplicación (
ka-app-code-)<username>
Se puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:
Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams. Asigne un nombre a sus flujos de datos
ExampleInputStreamyExampleOutputStream.¿Cómo se puede crear un bucket de S3? en la guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo,
ka-app-code-.<username>
Escritura de registros de muestra en el flujo de entrada
En esta sección, se utiliza un script de Python para escribir cadenas asignadas al azar al flujo para que la aplicación realice el procesamiento.
nota
Esta sección requiere AWS SDK for Python (Boto)
-
Cree un archivo denominado
ping.pycon el siguiente contenido:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey") -
Ejecute el script
ping.py:$ python ping.pyMantenga el script en ejecución mientras completa el resto del tutorial.
Descarga y examen del código de la aplicación
El código de la aplicación de Java para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git
. Clone el repositorio remoto con el siguiente comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.gitVaya al directorio
amazon-kinesis-data-analytics-java-examples/Beam.
El código de la aplicación se encuentra en el archivo BasicBeamStreamingJob.java. Tenga en cuenta lo siguiente en relación con el código de la aplicación:
La aplicación utiliza el Apache Beam ParDo
para procesar los registros entrantes al invocar una función de transformación personalizada llamada PingPongFn.El código para invocar la función
PingPongFnes el siguiente:.apply("Pong transform", ParDo.of(new PingPongFn())Las aplicaciones de Managed Service para Apache Flink que usan Apache Beam requieren los siguientes componentes. Si no incluye estos componentes y versiones en su
pom.xml, la aplicación carga las versiones incorrectas desde las dependencias del entorno y, dado que las versiones no coinciden, la aplicación se bloquea durante el tiempo de ejecución.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>La función de
PingPongFntransformación pasa los datos de entrada al flujo de salida, a menos que los datos de entrada sean ping, en cuyo caso emite la cadena pong\n al flujo de salida.El código de la función de transformación es el siguiente:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
Compilar el código de la aplicación
Para compilar la aplicación, haga lo siguiente:
Si aún no lo ha hecho, instale Java y Maven. Para obtener más información, consulte Cumplimiento de los requisitos previos obligatorios en el tutorial de Tutorial: introducción al uso de la API de DataStream en Managed Service para Apache Flink.
Compile la aplicación con el siguiente comando:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8nota
El código fuente proporcionado se basa en bibliotecas de Java 11.
Al compilar la aplicación, se crea el archivo JAR de la aplicación (target/basic-beam-app-1.0.jar).
Cargar el código de Java de streaming de Apache Flink
En esta sección, cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección Creación de recursos dependientes.
-
En la consola de Amazon S3, elija el bucket ka-app-code-
<username>y seleccione Subir. -
En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo
basic-beam-app-1.0.jarque creó en el paso anterior. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.
El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.
Crear y ejecutar la aplicación de Managed Service para Apache Flink
Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.
Creación de la aplicación
Inicie sesión en la Consola de administración de AWS y abra la consola de Amazon MSF en https://console.aws.amazon.com/flink.
-
En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.
-
En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:
-
En Nombre de la aplicación, escriba
MyApplication. -
En Tiempo de ejecución, escriba Apache Flink.
nota
Apache Beam no es compatible actualmente con la versión 1.19 o posterior de Apache Flink.
Deje el menú desplegable de versión como Apache Flink versión 1.13.
-
-
Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM.
kinesis-analytics-MyApplication-us-west-2 -
Elija Crear aplicación.
nota
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:
-
Política:
kinesis-analytics-service-MyApplication-us-west-2 -
Rol: :
kinesis-analytics-MyApplication-us-west-2
Modificar la política de IAM
Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.
Abra la consola de IAM en https://console.aws.amazon.com/iam/
. -
Elija Políticas. Elija la política
kinesis-analytics-service-MyApplication-us-west-2que la consola creó en su nombre en la sección anterior. -
En la página Resumen, elija Editar política. Seleccione la pestaña JSON.
-
Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (
012345678901) por el ID de su cuenta.
Configurar la aplicación
-
En la página MyApplication, elija Configurar.
-
En la página Configurar aplicación, proporcione la Ubicación del código:
-
Para el bucket de Amazon S3, introduzca
ka-app-code-.<username> -
En Ruta al objeto de Amazon S3, introduzca
basic-beam-app-1.0.jar.
-
-
En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM
kinesis-analytics-MyApplication-us-west-2. -
Introduzca lo siguiente:
ID de grupo Clave Valor BeamApplicationPropertiesInputStreamNameExampleInputStreamBeamApplicationPropertiesOutputStreamNameExampleOutputStreamBeamApplicationPropertiesAwsRegionus-west-2 -
En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.
-
En Registro de CloudWatch, seleccione la casilla de verificación Habilitar.
-
Elija Actualizar.
nota
Cuando se elige habilitar registros de CloudWatch, Managed Service para Apache Flink crea un grupo de registro y un flujo de registro para usted. Los nombres de estos recursos son los siguientes:
-
Grupo de registro:
/aws/kinesis-analytics/MyApplication -
Flujo de registro:
kinesis-analytics-log-stream
Este flujo de registro se utiliza para supervisar la aplicación. No es el mismo flujo de registro que utiliza la aplicación para enviar los resultados.
Ejecución de la aplicación
Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.
Se puede comprobar las métricas de Managed Service para Apache Flink en la consola de CloudWatch para verificar que la aplicación funciona.
Limpieza de recursos de AWS
En esta sección se incluyen los procedimientos para limpiar los recursos de AWS creados en el tutorial Ventana de salto de tamaño constante.
Este tema contiene las siguientes secciones:
Eliminación de su aplicación de Managed Service para Apache Flink
Inicie sesión en la Consola de administración de AWS y abra la consola de Amazon MSF en https://console.aws.amazon.com/flink.
En el panel de Managed Service para Apache Flink, elija MyApplication.
En la página de la aplicación, seleccione Eliminar y, a continuación, confirme la eliminación.
Eliminación de sus flujos de datos de Kinesis
Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis
. En el panel de Kinesis Data Streams, elija ExampleInputStream.
En la página ExampleInputStream, elija Eliminar flujo de Kinesis y, a continuación, confirme la eliminación.
En la página Flujos de Kinesis, elija ExampleOutputStream, elija Acciones, elija Eliminar y, a continuación, confirme la eliminación.
Eliminación del objeto y el bucket de Amazon S3
Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3
. Elija el bucket ka-app-code-
<username>.Elija Eliminar y luego ingrese el nombre del bucket para confirmar la eliminación.
Eliminación de sus recursos de IAM
Abra la consola de IAM en https://console.aws.amazon.com/iam/
. En la barra de navegación, seleccione Políticas.
En el control de filtros, introduzca kinesis.
Elija la política kinesis-analytics-service-MyApplication-us-west-2.
Seleccione Acciones de política y, a continuación, Eliminar.
En la barra de navegación, seleccione Roles.
Elija el rol kinesis-analytics-MyApplication-us-west-2.
Elija Eliminar rol y, a continuación, confirme la eliminación.
Eliminación de sus recursos de CloudWatch
Abra la consola de CloudWatch en https://console.aws.amazon.com/cloudwatch/
. En la barra de navegación, elija Registros.
Elija el grupo de registro /aws/kinesis-analytics/MyApplication.
Elija Eliminar grupo de registro y, a continuación, confirme la eliminación.
Pasos a seguir a continuación
Ya que ha creado y ejecutado una aplicación básica de Managed Service para Apache Flink que transforma los datos usando Apache Beam, consulte la siguiente aplicación para encontrar un ejemplo de una solución más avanzada de Managed Service para Apache Flink.
Taller sobre streaming de Managed Service para Apache Flink
: en este taller, analizamos un ejemplo integral que combina aspectos de transmisión por lotes y streaming en una canalización uniforme de Apache Beam.