Creación de una aplicación con Apache Beam - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Creación de una aplicación con Apache Beam

En este ejercicio, creará una aplicación de Managed Service para Apache Flink que transforma datos usando Apache Beam. Apache Beam es un modelo de programación para procesar datos de streaming. Para obtener más información sobre el uso de Apache Beam con Managed Service para Apache Flink, consulte Uso de Apache Beam.

nota

Para configurar los requisitos previos necesarios para este ejercicio, primero complete el ejercicio Primeros pasos (DataStream API).

Cree recursos dependientes

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream)

  • Un bucket de Amazon S3 para almacenar el código de la aplicación (ka-app-code-<username>)

Puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:

  • Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams. Asigne un nombre a sus flujos de datos ExampleInputStream y ExampleOutputStream.

  • ¿Cómo se puede crear un bucket de S3? en la guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo, ka-app-code-<username>.

Escriba registros de muestra en el flujo de entrada

En esta sección, se utiliza un script de Python para escribir cadenas asignadas al azar al flujo para que la aplicación realice el procesamiento.

nota

Esta sección requiere AWS SDK for Python (Boto).

  1. Cree un archivo denominado ping.py con el siguiente contenido:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Ejecute el script ping.py:

    $ python ping.py

    Mantenga el script en ejecución mientras completa el resto del tutorial.

Descargue y examine el código de la aplicación

El código de la aplicación Java de este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git.

  2. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Vaya al directorio amazon-kinesis-data-analytics-java-examples/Beam.

El código de la aplicación se encuentra en el archivo BasicBeamStreamingJob.java. Tenga en cuenta lo siguiente en relación con el código de la aplicación:

  • La aplicación utiliza Apache Beam ParDopara procesar los registros entrantes mediante la invocación de una función de transformación personalizada llamadaPingPongFn.

    El código para invocar la función PingPongFn es el siguiente:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Las aplicaciones de Managed Service para Apache Flink que usan Apache Beam requieren los siguientes componentes. Si no incluye estos componentes y versiones en su pom.xml, la aplicación carga las versiones incorrectas desde las dependencias del entorno y, dado que las versiones no coinciden, la aplicación se bloquea durante el tiempo de ejecución.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • La función de PingPongFn transformación pasa los datos de entrada al flujo de salida, a menos que los datos de entrada sean ping, en cuyo caso emite la cadena pong\n al flujo de salida.

    El código de la función de transformación es el siguiente:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Compila el código de la aplicación

Para compilar la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale Java y Maven. Para obtener más información, consulte Requisitos previos en el tutorial de Primeros pasos (DataStream API).

  2. Compile la aplicación con el siguiente comando:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    nota

    El código fuente proporcionado se basa en bibliotecas de Java 11.

Al compilar la aplicación, se crea el archivo JAR de la aplicación (target/basic-beam-app-1.0.jar).

Cargue el código Java de streaming de Apache Flink

En esta sección, cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección Cree recursos dependientes.

  1. En la consola de Amazon S3, elija el <username>bucket ka-app-code- y elija Upload.

  2. En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo basic-beam-app-1.0.jar que creó en el paso anterior.

  3. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

Cree y ejecute la aplicación Managed Service for Apache Flink

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

Creación de la aplicación

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/flink.

  2. En el panel de Managed Service para Apache Flink, seleccione Crear aplicación de análisis.

  3. En la página Managed Service para Apache Flink: crear aplicación, proporcione los detalles de la aplicación de la siguiente manera:

    • En Nombre de la aplicación, escriba MyApplication.

    • En Tiempo de ejecución, escriba Apache Flink.

      nota

      Apache Beam no es compatible actualmente con la versión 1.19 o posterior de Apache Flink.

    • Seleccione Apache Flink versión 1.15 en el menú desplegable de versiones.

  4. Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM. kinesis-analytics-MyApplication-us-west-2

  5. Elija Crear aplicación.

nota

Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Rol: kinesis-analytics-MyApplication-us-west-2

Edite la política de IAM

Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.

  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-west-2 que la consola creó en su nombre en la sección anterior.

  3. En la página Resumen, elija Editar política. Seleccione la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (012345678901) por el ID de su cuenta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configure la aplicación

  1. En la MyApplicationpágina, elija Configurar.

  2. En la página Configurar aplicación, proporcione la Ubicación del código:

    • Para el bucket de Amazon S3, introduzca ka-app-code-<username>.

    • En Ruta al objeto de Amazon S3, introduzca basic-beam-app-1.0.jar.

  3. En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar el rol de IAM kinesis-analytics-MyApplication-us-west-2.

  4. Introduzca lo siguiente:

    ID de grupo Clave Valor
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.

  6. Para el CloudWatch registro, active la casilla Activar.

  7. Elija Actualizar.

nota

Si decide habilitar el CloudWatch registro, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros automáticamente. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Este flujo de registro se utiliza para supervisar la aplicación. No es el mismo flujo de registro que utiliza la aplicación para enviar los resultados.

Ejecución de la aplicación

Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.

Puede comprobar las métricas del servicio gestionado para Apache Flink en la CloudWatch consola para comprobar que la aplicación funciona.

Limpie los recursos AWS

Esta sección incluye procedimientos para limpiar los AWS recursos creados en el tutorial Tumbling Window.

Elimine su aplicación Managed Service for Apache Flink

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/flink.

  2. en el panel Servicio gestionado para Apache Flink, elija. MyApplication

  3. En la página de la aplicación, seleccione Eliminar y, a continuación, confirme la eliminación.

Elimine sus transmisiones de datos de Kinesis

  1. Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. En el panel Kinesis Data Streams, ExampleInputStreamelija.

  3. En la ExampleInputStreampágina, elija Eliminar Kinesis Stream y, a continuación, confirme la eliminación.

  4. En la página de transmisiones de Kinesis, elija, elija Acciones ExampleOutputStream, elija Eliminar y, a continuación, confirme la eliminación.

Elimine el objeto y el bucket de Amazon S3

  1. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3.

  2. Elija el cubo ka-app-code -. <username>

  3. Elija Eliminar y luego ingrese el nombre del bucket para confirmar la eliminación.

Elimine sus recursos de IAM

  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. En la barra de navegación, seleccione Políticas.

  3. En el control de filtros, introduzca kinesis.

  4. Elija la política kinesis-analytics-service- MyApplication -us-west-2.

  5. Seleccione Acciones de política y, a continuación, Eliminar.

  6. En la barra de navegación, seleccione Roles.

  7. Elija el rol kinesis-analytics- MyApplication -us-west-2.

  8. Elija Eliminar rol y, a continuación, confirme la eliminación.

CloudWatch Elimine sus recursos

  1. Abre la CloudWatch consola en https://console.aws.amazon.com/cloudwatch/.

  2. En la barra de navegación, elija Registros.

  3. Elija el grupo de registros MyApplication/aws/kinesis-analytics/.

  4. Elija Eliminar grupo de registro y, a continuación, confirme la eliminación.

Siguientes pasos

Ya que ha creado y ejecutado una aplicación básica de Managed Service para Apache Flink que transforma los datos usando Apache Beam, consulte la siguiente aplicación para encontrar un ejemplo de una solución más avanzada de Managed Service para Apache Flink.