Creación de una aplicación con Apache Beam - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Creación de una aplicación con Apache Beam

En este ejercicio, creará una aplicación de Managed Service para Apache Flink que transforma datos usando Apache Beam. Apache Beam es un modelo de programación para procesar datos de streaming. Para obtener más información sobre el uso de Apache Beam con Managed Service para Apache Flink, consulte Creación de aplicaciones de Managed Service para Apache Flink con Apache Beam.

nota

Para configurar los requisitos previos necesarios para este ejercicio, primero complete el ejercicio Tutorial: introducción al uso de la API de DataStream en Managed Service para Apache Flink.

Creación de recursos dependientes

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream)

  • Un bucket de Amazon S3 para almacenar el código de la aplicación (ka-app-code-<username>)

Se puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:

  • Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams. Asigne un nombre a sus flujos de datos ExampleInputStream y ExampleOutputStream.

  • ¿Cómo se puede crear un bucket de S3? en la guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo, ka-app-code-<username>.

Escritura de registros de muestra en el flujo de entrada

En esta sección, se utiliza un script de Python para escribir cadenas asignadas al azar al flujo para que la aplicación realice el procesamiento.

nota

Esta sección requiere AWS SDK for Python (Boto).

  1. Cree un archivo denominado ping.py con el siguiente contenido:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Ejecute el script ping.py:

    $ python ping.py

    Mantenga el script en ejecución mientras completa el resto del tutorial.

Descarga y examen del código de la aplicación

El código de la aplicación de Java para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git.

  2. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Vaya al directorio amazon-kinesis-data-analytics-java-examples/Beam.

El código de la aplicación se encuentra en el archivo BasicBeamStreamingJob.java. Tenga en cuenta lo siguiente en relación con el código de la aplicación:

  • La aplicación utiliza el Apache Beam ParDo para procesar los registros entrantes al invocar una función de transformación personalizada llamada PingPongFn.

    El código para invocar la función PingPongFn es el siguiente:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Las aplicaciones de Managed Service para Apache Flink que usan Apache Beam requieren los siguientes componentes. Si no incluye estos componentes y versiones en su pom.xml, la aplicación carga las versiones incorrectas desde las dependencias del entorno y, dado que las versiones no coinciden, la aplicación se bloquea durante el tiempo de ejecución.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • La función de PingPongFn transformación pasa los datos de entrada al flujo de salida, a menos que los datos de entrada sean ping, en cuyo caso emite la cadena pong\n al flujo de salida.

    El código de la función de transformación es el siguiente:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Compilar el código de la aplicación

Para compilar la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale Java y Maven. Para obtener más información, consulte Cumplimiento de los requisitos previos obligatorios en el tutorial de Tutorial: introducción al uso de la API de DataStream en Managed Service para Apache Flink.

  2. Compile la aplicación con el siguiente comando:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    nota

    El código fuente proporcionado se basa en bibliotecas de Java 11.

Al compilar la aplicación, se crea el archivo JAR de la aplicación (target/basic-beam-app-1.0.jar).

Cargar el código de Java de streaming de Apache Flink

En esta sección, cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección Creación de recursos dependientes.

  1. En la consola de Amazon S3, elija el bucket ka-app-code-<username> y seleccione Subir.

  2. En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo basic-beam-app-1.0.jar que creó en el paso anterior.

  3. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

Crear y ejecutar la aplicación de Managed Service para Apache Flink

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

Creación de la aplicación

  1. Inicie sesión en la Consola de administración de AWS y abra la consola de Amazon MSF en https://console.aws.amazon.com/flink.

  2. En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.

  3. En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:

    • En Nombre de la aplicación, escriba MyApplication.

    • En Tiempo de ejecución, escriba Apache Flink.

      nota

      Apache Beam no es compatible actualmente con la versión 1.19 o posterior de Apache Flink.

    • Deje el menú desplegable de versión como Apache Flink versión 1.13.

  4. Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM. kinesis-analytics-MyApplication-us-west-2

  5. Elija Crear aplicación.

nota

Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Rol: : kinesis-analytics-MyApplication-us-west-2

Modificar la política de IAM

Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.

  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-west-2 que la consola creó en su nombre en la sección anterior.

  3. En la página Resumen, elija Editar política. Seleccione la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (012345678901) por el ID de su cuenta.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configurar la aplicación

  1. En la página MyApplication, elija Configurar.

  2. En la página Configurar aplicación, proporcione la Ubicación del código:

    • Para el bucket de Amazon S3, introduzca ka-app-code-<username>.

    • En Ruta al objeto de Amazon S3, introduzca basic-beam-app-1.0.jar.

  3. En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM kinesis-analytics-MyApplication-us-west-2.

  4. Introduzca lo siguiente:

    ID de grupo Clave Valor
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.

  6. En Registro de CloudWatch, seleccione la casilla de verificación Habilitar.

  7. Elija Actualizar.

nota

Cuando se elige habilitar registros de CloudWatch, Managed Service para Apache Flink crea un grupo de registro y un flujo de registro para usted. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Este flujo de registro se utiliza para supervisar la aplicación. No es el mismo flujo de registro que utiliza la aplicación para enviar los resultados.

Ejecución de la aplicación

Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.

Se puede comprobar las métricas de Managed Service para Apache Flink en la consola de CloudWatch para verificar que la aplicación funciona.

Limpieza de recursos de AWS

En esta sección se incluyen los procedimientos para limpiar los recursos de AWS creados en el tutorial Ventana de salto de tamaño constante.

Eliminación de su aplicación de Managed Service para Apache Flink

  1. Inicie sesión en la Consola de administración de AWS y abra la consola de Amazon MSF en https://console.aws.amazon.com/flink.

  2. En el panel de Managed Service para Apache Flink, elija MyApplication.

  3. En la página de la aplicación, seleccione Eliminar y, a continuación, confirme la eliminación.

Eliminación de sus flujos de datos de Kinesis

  1. Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. En el panel de Kinesis Data Streams, elija ExampleInputStream.

  3. En la página ExampleInputStream, elija Eliminar flujo de Kinesis y, a continuación, confirme la eliminación.

  4. En la página Flujos de Kinesis, elija ExampleOutputStream, elija Acciones, elija Eliminar y, a continuación, confirme la eliminación.

Eliminación del objeto y el bucket de Amazon S3

  1. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3.

  2. Elija el bucket ka-app-code-<username>.

  3. Elija Eliminar y luego ingrese el nombre del bucket para confirmar la eliminación.

Eliminación de sus recursos de IAM

  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. En la barra de navegación, seleccione Políticas.

  3. En el control de filtros, introduzca kinesis.

  4. Elija la política kinesis-analytics-service-MyApplication-us-west-2.

  5. Seleccione Acciones de política y, a continuación, Eliminar.

  6. En la barra de navegación, seleccione Roles.

  7. Elija el rol kinesis-analytics-MyApplication-us-west-2.

  8. Elija Eliminar rol y, a continuación, confirme la eliminación.

Eliminación de sus recursos de CloudWatch

  1. Abra la consola de CloudWatch en https://console.aws.amazon.com/cloudwatch/.

  2. En la barra de navegación, elija Registros.

  3. Elija el grupo de registro /aws/kinesis-analytics/MyApplication.

  4. Elija Eliminar grupo de registro y, a continuación, confirme la eliminación.

Pasos a seguir a continuación

Ya que ha creado y ejecutado una aplicación básica de Managed Service para Apache Flink que transforma los datos usando Apache Beam, consulte la siguiente aplicación para encontrar un ejemplo de una solución más avanzada de Managed Service para Apache Flink.