Paso 3: Crear y ejecutar una aplicación de Managed Service para Apache Flink para Flink - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Paso 3: Crear y ejecutar una aplicación de Managed Service para Apache Flink para Flink

En este ejercicio, se creará una aplicación de Managed Service para Apache Flink para Flink con flujos de datos como origen y destino.

Crear dos Amazon Kinesis Data Streams

Antes de crear una aplicación de Managed Service para Apache Flink para Flink en este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones de la consola, consulte Creación y actualización de secuencias de datos.

Cómo crear flujos de datos (AWS CLI)
  1. Para crear la primera transmisión (ExampleInputStream), utilice el siguiente comando de Amazon Kinesis create-stream AWS CLI .

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Escritura de registros de muestra en el flujo de entrada

En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.

nota

Esta sección requiere AWS SDK for Python (Boto).

  1. Cree un archivo denominado stock.py con el siguiente contenido:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. Más adelante en el tutorial, se ejecuta el script stock.py para enviar datos a la aplicación.

    $ python stock.py

Descarga y consulta del código de Java de streaming de Apache Flink

El código de la aplicación Java para este ejemplo está disponible en. GitHub Para descargar el código de la aplicación, haga lo siguiente:

  1. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. Vaya al directorio GettingStarted.

El código de la aplicación se encuentra en los archivos CustomSinkStreamingJob.java y CloudWatchLogSink.java. Tenga en cuenta lo siguiente en relación con el código de la aplicación:

  • La aplicación utiliza un origen de Kinesis para leer del flujo de origen. El siguiente fragmento crea el receptor de Kinesis:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

Compilación del código de la aplicación

En esta sección, se utiliza el compilador Apache Maven para crear el código de Java para la aplicación. Para obtener más información sobre la instalación de Apache Maven y el Java Development Kit (JDK), consulte Requisitos previos para completar los ejercicios.

La aplicación de Java requiere los siguientes componentes:

  • Un archivo Project Object Model (pom.xml). Este archivo contiene información sobre la configuración y las dependencias de la aplicación, incluidas las bibliotecas de aplicaciones de Managed Service para Apache Flink para Flink.

  • Un método main que contiene la lógica de la aplicación.

nota

Para utilizar el conector de Kinesis para la siguiente aplicación, es necesario descargar el código fuente del conector y compilarlo tal y como se describe en la documentación de Apache Flink.

Para crear y compilar el código de la aplicación
  1. Cree una aplicación de Java/Maven en su entorno de desarrollo. Para obtener más información acerca de cómo crear una aplicación, consulte la documentación de su entorno de desarrollo:

  2. Utilice el siguiente código para un archivo llamado StreamingJob.java.

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    Tenga en cuenta lo siguiente en relación con el ejemplo de código anterior:

    • Este archivo contiene el método main que define la funcionalidad de la aplicación.

    • La aplicación crea conectores de origen y recepción para obtener acceso a recursos externos usando un objeto StreamExecutionEnvironment.

    • La aplicación crea conectores de origen y recepción mediante propiedades estáticas. Para utilizar propiedades dinámicas de la aplicación, utilice los métodos createSourceFromApplicationProperties y createSinkFromApplicationProperties para crear los conectores. Estos métodos leen las propiedades de la aplicación para configurar los conectores.

  3. Para utilizar el código de la aplicación, compile y empaquete el código en un archivo JAR. Puede compilar y empaquetar el código de una de las dos formas siguientes:

    • Utilice la herramienta de línea de comandos de Maven. Cree su archivo JAR ejecutando el siguiente comando en el directorio que contiene el archivo pom.xml:

      mvn package
    • Use el entorno de desarrollo. Consulte la documentación de su entorno de desarrollo para obtener más información.

    Puede cargar el paquete como un archivo JAR o puede comprimir el paquete y cargarlo como un archivo ZIP. Si crea la aplicación con AWS CLI, especifique el tipo de contenido del código (JAR o ZIP).

  4. Si hay errores al compilar, verifique que la variable de entorno JAVA_HOME se ha configurado correctamente.

Si la aplicación se compila correctamente, se crea el siguiente archivo:

target/java-getting-started-1.0.jar

Carga del código de Java de streaming de Apache Flink

En esta sección, creará un bucket de Amazon Simple Storage Service (Amazon S3) y cargará el código de la aplicación.

Cómo cargar el código de la aplicación
  1. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3.

  2. Elija Crear bucket.

  3. Escriba ka-app-code-<username> en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.

  4. En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.

  5. En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.

  6. Elija Crear bucket.

  7. En la consola de Amazon S3, elija el <username>bucket ka-app-code- y, a continuación, seleccione Upload.

  8. En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo java-getting-started-1.0.jar que creó en el paso anterior. Elija Siguiente.

  9. En el paso Establecer permisos, deje los ajustes tal y como están. Elija Siguiente.

  10. En el paso Establecer propiedades, deje los ajustes tal y como están. Seleccione Cargar.

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

Crear y ejecutar la aplicación de Managed Service para Apache Flink

Puede crear y ejecutar una aplicación de Managed Service para Apache Flink para Flink mediante la consola o la AWS CLI.

nota

Cuando crea la aplicación mediante la consola, sus recursos AWS Identity and Access Management (de IAM) y de Amazon CloudWatch Logs se crean automáticamente. Cuando crea la aplicación con AWS CLI, crea estos recursos por separado.

Crear y ejecutar la aplicación (consola)

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

Creación de la aplicación

  1. Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. En el panel de Amazon Kinesis, elija Crear aplicación de análisis.

  3. En la página Kinesis Analytics - Crear aplicación, proporcione la siguiente información:

    • En Nombre de la aplicación, escriba MyApplication.

    • En Descripción, escriba My java test app.

    • En Tiempo de ejecución, escriba Apache Flink 1.6.

  4. Para Permisos de acceso, seleccione Crear o actualizar rol de IAM kinesis-analytics-MyApplication-us-west-2.

  5. Elija Crear aplicación.

nota

Cuando crea una aplicación de Managed Service para Apache Flink para Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Rol: kinesis-analytics-MyApplication-us-west-2

Modificación de la política de IAM

Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.

  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-west-2 que la consola creó en su nombre en la sección anterior.

  3. En la página Resumen, elija Editar política. Seleccione la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (012345678901) por el ID de su cuenta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configuración de la aplicación

  1. En la MyApplicationpágina, elija Configurar.

  2. En la página Configurar aplicación, proporcione la Ubicación del código:

    • Para el bucket de Amazon S3, introduzca ka-app-code-<username>.

    • En Ruta al objeto de Amazon S3, introduzca java-getting-started-1.0.jar.

  3. En Acceso a los recursos de la aplicación, en Permisos de acceso, seleccione Crear o actualizar rol de IAM kinesis-analytics-MyApplication-us-west-2.

  4. En Propiedades, en ID de grupo, escriba ProducerConfigProperties.

  5. Escriba las siguientes propiedades y valores de la aplicación:

    Clave Valor
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. En Monitorización, asegúrese de que el Nivel de métricas de monitorización se ha establecido en Aplicación.

  7. Para el CloudWatch registro, active la casilla Activar.

  8. Elija Actualizar.

nota

Si decide habilitar el CloudWatch registro, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros automáticamente. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Ejecución de la aplicación

  1. En la MyApplicationpágina, seleccione Ejecutar. Confirme la acción.

  2. Cuando la aplicación se está ejecutando, actualice la página. La consola muestra el Gráfico de la aplicación.

Detención de la aplicación

En la MyApplicationpágina, selecciona Detener. Confirme la acción.

Actualizar la aplicación

Mediante la consola, puede actualizar la configuración de la aplicación, tal como sus propiedades, ajustes de monitorización y la ubicación o el nombre de archivo JAR de la aplicación. También puede volver a cargar el JAR de la aplicación del bucket de Amazon S3 si necesita actualizar el código de la aplicación.

En la MyApplicationpágina, elija Configurar. Actualice la configuración de la aplicación y elija Actualizar.

Creación y ejecución de la aplicación (AWS CLI)

En esta sección, se utiliza AWS CLI para crear y ejecutar la aplicación Managed Service for Apache Flink. Managed Service for Apache Flink for Flink Applications utiliza el kinesisanalyticsv2 AWS CLI comando para crear aplicaciones Managed Service for Apache Flink e interactuar con ellas.

Crear una política de permisos

En primer lugar, debe crear una política de permisos con dos instrucciones: una que concede permisos para la acción read en el flujo de origen y otra que concede permisos para las acciones write en el flujo de recepción. A continuación, asocie la política a un rol de IAM (que se crea en la siguiente sección). Por lo tanto, cuando Managed Service para Apache Flink asume el rol, el servicio tiene los permisos necesarios para leer desde el flujo de origen y escribir en el flujo de recepción.

Utilice el siguiente código para crear la política de permisos KAReadSourceStreamWriteSinkStream. Reemplace username por el nombre de usuario que se utilizó para crear el bucket de Amazon S3 para almacenar el código de la aplicación. Sustituya el ID de la cuenta en los nombres de recurso de Amazon (ARN) (012345678901) por el ID de su cuenta.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Para step-by-step obtener instrucciones sobre cómo crear una política de permisos, consulte el tutorial: Cómo crear y adjuntar su primera política gestionada por el cliente en la Guía del usuario de IAM.

nota

Para acceder a otros AWS servicios, puede utilizar el AWS SDK for Java. Managed Service para Apache Flink establece automáticamente las credenciales requeridas por el SDK con las del rol de IAM de ejecución del servicio asociada a su aplicación. No hace falta realizar ningún otro paso.

Creación de un rol de IAM

En esta sección se crea un rol de IAM que la aplicación de Managed Service para Apache Flink para Flink puede adoptar para leer un flujo de origen y escribir en el flujo de destino.

Managed Service para Apache Flink no puede acceder a su flujo sin permisos. Estos permisos se conceden a través del rol de IAM. Cada rol de IAM tiene dos políticas asociadas. La política de confianza concede a Managed Service para Apache Flink permiso para asumir el rol, y la política de permisos determina lo que Managed Service para Apache Flink puede hacer después de asumir el rol.

Usted deberá asociar la política de permisos que ha creado en la sección anterior a este rol.

Cómo crear un rol de IAM
  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. En el panel de navegación, elija Roles, Crear rol.

  3. En Seleccionar tipo de entidad de confianza, elija Servicio de AWS . En Elegir el servicio que usará este rol, elija Kinesis. En Seleccionar su caso de uso, elija Kinesis Analytics.

    Elija Siguiente: permisos.

  4. En la página Asociar políticas de permisos, elija Siguiente: Revisión. Asociará políticas de permisos después de crear el rol.

  5. En la página Crear rol, escriba KA-stream-rw-role como Nombre de rol. Elija Crear rol.

    Ahora ha creado un nuevo rol de IAM llamado KA-stream-rw-role. A continuación, actualice las políticas de confianza y permisos del rol.

  6. Asocie la política de permisos al rol.

    nota

    Para este ejercicio, Managed Service para Apache Flink asume este rol tanto para leer datos de un flujo de datos de Kinesis (origen) como para escribir la salida en otro flujo de datos de Kinesis. Asocie la política que ha creado en el paso anterior, Crear una política de permisos.

    1. En la página Resumen, elija la pestaña Permisos.

    2. Seleccione Asociar políticas.

    3. En el campo de búsqueda, escriba KAReadSourceStreamWriteSinkStream (la política que ha creado en la sección anterior).

    4. Elija la ReadInputStreamWriteOutputStream política KA y elija Adjuntar política.

Ahora ha creado el rol de ejecución de servicio que utiliza la aplicación para obtener acceso a los recursos. Anote el ARN del nuevo rol.

Para step-by-step obtener instrucciones sobre cómo crear un rol, consulte Creación de un rol de IAM (consola) en la Guía del usuario de IAM.

Crear la aplicación de Managed Service para Apache Flink

  1. Guarde el siguiente código JSON en un archivo denominado create_request.json. Cambie el ARN del rol de ejemplo por el ARN del rol que ha creado antes. Reemplace el sufijo del ARN del bucket (username) por el sufijo que eligió en la sección anterior. Reemplace el ID de la cuenta de ejemplo (012345678901) del rol de ejecución del servicio por el ID de su cuenta.

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. Ejecute la acción CreateApplication con la solicitud anterior para crear la aplicación:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

Se ha creado la aplicación. Puede iniciar la aplicación en el siguiente paso.

Inicio de la aplicación

En esta sección, se utiliza la acción StartApplication para iniciar la aplicación.

Cómo iniciar la aplicación
  1. Guarde el siguiente código JSON en un archivo denominado start_request.json.

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. Ejecute la acción StartApplication con la solicitud anterior para iniciar la aplicación:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

Ya se debe estar ejecutando la aplicación. Puedes comprobar las métricas de Managed Service for Apache Flink en la CloudWatch consola de Amazon para comprobar que la aplicación funciona.

Detención de la aplicación

En esta sección, se utiliza la acción StopApplication para detener la aplicación.

Cómo detener la aplicación
  1. Guarde el siguiente código JSON en un archivo denominado stop_request.json.

    {"ApplicationName": "test" }
  2. Ejecute la acción StopApplication con la siguiente solicitud para detener la aplicación:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

La aplicación se habrá detenido.