Creación de un cuaderno de Studio con Amazon MSK - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Creación de un cuaderno de Studio con Amazon MSK

En este tutorial se describe cómo crear un cuaderno de Studio que utilice un clúster de Amazon MSK como fuente.

Configuración

Para este tutorial, necesita un clúster de Amazon MSK que permita el acceso a texto sin formato. Si aún no ha configurado un clúster de Amazon MSK, siga el tutorial Cómo empezar a utilizar Amazon MSK para crear una Amazon VPC, un clúster de Amazon MSK, un tema y una instancia de cliente de Amazon EC2.

Al seguir el tutorial, haga lo siguiente:

Añada una puerta de enlace NAT a su VPC

Si ha creado un clúster de Amazon MSK siguiendo el tutorial Cómo empezar a utilizar Amazon MSK, o si su Amazon VPC existente aún no tiene una puerta de enlace NAT para sus subredes privadas, debe añadir una puerta de enlace NAT a su Amazon VPC. En el siguiente diagrama se muestra la arquitectura.

Para crear una puerta de enlace NAT para su Amazon VPC, haga lo siguiente:

  1. Abra la consola de Amazon VPC en https://console.aws.amazon.com/vpc/.

  2. En la barra de navegación izquierda, elija puertas de enlace NAT.

  3. En la página puertas de enlace NAT, seleccione Crear puerta de enlace NAT.

  4. En la página Crear puerta de enlace NAT, especifique los valores siguientes:

    Nombre: opcional ZeppelinGateway
    Subred AWS KafkaTutorialSubnet1
    ID de asignación de IP elástica Elija una IP elástica disponible. Si no hay ninguna IP elástica disponible, elija Asignar IP elástica y, a continuación, elija la IP elástica que cree la consola.

    Elija Create a NAT Gateway (Crear una puerta de enlace NAT).

  5. En la de navegación izquierda, elija Tablas de ruteo.

  6. Elija Create Route Table (Crear tabla de ruteo).

  7. En la página Crear tabla de enrutamiento, proporcione la siguiente información:

    • Name tag: ZeppelinRouteTable

    • VPC: elija su VPC (p. ej., VPC).AWS KafkaTutorial

    Seleccione Crear.

  8. En la lista de tablas de rutas, elija. ZeppelinRouteTable Elija la pestaña Rutas y, a continuación, Editar rutas.

  9. En la pestaña Editar rutas, elija Añadir rutas.

  10. En Para Destino, escriba 0.0.0.0/0. Para Target, elija NAT Gateway, ZeppelinGateway. Elija Guardar rutas. Elija Close.

  11. En la página de tablas de rutas, con la ZeppelinRouteTableopción seleccionada, elija la pestaña Asociaciones de subredes. Elija Editar asociaciones de subredes.

  12. En la página Editar asociaciones de subredes, elija AWS KafkaTutorialSubnet2 y AWS KafkaTutorialSubnet 3. Seleccione Guardar.

Cree una AWS Glue conexión y una tabla

Su cuaderno de Studio utiliza una base de datos AWS Glue para los metadatos sobre su origen de datos de Amazon MSK. En esta sección, crea una AWS Glue conexión que describe cómo acceder a su clúster de Amazon MSK y una AWS Glue tabla que describe cómo presentar los datos de su fuente de datos a clientes como su bloc de notas Studio.

Creación de una conexión
  1. Inicie sesión AWS Management Console y abra la AWS Glue consola en https://console.aws.amazon.com/glue/.

  2. Si aún no tiene una AWS Glue base de datos, elija Bases de datos en la barra de navegación izquierda. Elija Agregar una base de datos. En la ventana Añadir base de datos, introduzca default en el nombre de la base de datos. Seleccione Crear.

  3. En la barra de navegación de la izquierda, seleccione Conexiones. Elija Añadir conexión.

  4. En la ventana Añadir conexión, introduzca los siguientes valores:

    • En Nombre de conexión, ingrese ZeppelinConnection.

    • En Tipo de conexión, elija Kafka.

    • En las URL del servidor de arranque de Kafka, proporcione la cadena del agente de arranque del clúster. Puede obtener los agentes de arranque desde la consola MSK o ingresando el siguiente comando de la CLI:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Desactive la casilla de verificación Exigir conexión SSL.

    Elija Siguiente.

  5. En la página VPC, especifique los valores siguientes:

    • Para VPC, elija el nombre de su VPC (por ejemplo, VPC). AWS KafkaTutorial

    • Para Subnet, elija 2.AWS KafkaTutorialSubnet

    • Para los grupos de seguridad, elija todos los grupos disponibles.

    Elija Siguiente.

  6. En la página Propiedades de la conexión o Acceso a la conexión, seleccione Finalizar.

Crear una tabla
nota

Puede crear la tabla manualmente tal y como se describe en los pasos siguientes, o puede usar el código conector de creación de tablas para Managed Service para Apache Flink en su cuaderno en Apache Zeppelin para crear la tabla mediante una instrucción DDL. A continuación, puede comprobar AWS Glue que la tabla se ha creado correctamente.

  1. En la barra de navegación izquierda, seleccione Tablas. En la página Tablas, seleccione Añadir tablas y Añadir tabla manualmente.

  2. En la página Configurar las propiedades de la tabla, introduzca stock como Nombre de la tabla. Asegúrese de seleccionar la base de datos que creó anteriormente. Elija Siguiente.

  3. En la página Añadir almacén de datos, elija Kafka. Para el nombre del tema, introduce el nombre del tema (por ejemplo AWS KafkaTutorialTopic). En Conexión, elija ZeppelinConnection.

  4. En la página de Clasificación, seleccione JSON. Elija Siguiente.

  5. En la página Definir un esquema, elija Añadir columna para añadir una. Añada columnas con las siguientes propiedades:

    Nombre de la columna Tipo de datos
    ticker string
    price double

    Elija Siguiente.

  6. En la página siguiente, verifique su configuración y seleccione Finalizar.

  7. Elija la tabla recién creada de la lista de tablas.

  8. Elija Editar tabla y añada una propiedad con la clave managed-flink.proctime y el valor proctime.

  9. Seleccione Aplicar.

Creación de un cuaderno de Studio con Amazon MSK

Ahora que ha creado los recursos que utiliza su aplicación, cree su cuaderno de Studio.

Puede crear su aplicación mediante el AWS Management Console o el AWS CLI.
nota

También puede crear un cuaderno de Studio desde la consola Amazon MSK seleccionando un clúster existente y, a continuación, seleccionando Procesar datos en tiempo real.

Cree un cuaderno de Studio con el AWS Management Console

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. En la página de Aplicaciones de Managed Service para Apache Flink, seleccione la pestaña Studio. Seleccione Crear cuaderno de Studio.

    nota

    Para crear un cuaderno de Studio desde las consolas Amazon MSK o Kinesis Data Streams, seleccione el clúster Amazon MSK o el flujo de datos de Kinesis de entrada y, a continuación, elija Procesar datos en tiempo real.

  3. En la página Crear cuaderno de Studio, proporcione la siguiente información:

    • Introduzca MyNotebook como Nombre del cuaderno de Studio.

    • Elija el valor predeterminado para la Base de datos de Glue de AWS .

    Seleccione Crear cuaderno de Studio.

  4. En la MyNotebookpágina, seleccione la pestaña Configuración. En la sección Redes, elija Editar.

  5. En la MyNotebook página Editar red para, selecciona la configuración de VPC basada en el clúster de Amazon MSK. Elija su clúster de Amazon MSK para el Clúster de Amazon MSK. Elija Guardar cambios.

  6. En la MyNotebookpágina, selecciona Ejecutar. Espere a que el Estado muestre En ejecución.

Cree un bloc de notas de Studio con AWS CLI

Para crear tu bloc de notas de Studio mediante el AWS CLI, haz lo siguiente:

  1. Verifique que disponga de la siguiente información. Necesita estos valores para crear su aplicación.

    • Su ID de cuenta de .

    • Los ID de la subred y el ID de grupo de seguridad de Amazon VPC que contiene su clúster de Amazon MSK.

  2. Cree un archivo denominado create.json con el siguiente contenido. Reemplace los valores de marcador de posición con su información.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Para crear su aplicación, ejecute el siguiente comando:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Una vez completado el comando, debería ver un resultado similar al siguiente, con los detalles de su nuevo cuaderno de Studio:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Para iniciar su aplicación, ejecute el siguiente comando. Sustituya los valores de muestra por su ID de la cuenta.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Envío de datos a su clúster de Amazon MSK

En esta sección, ejecuta un script de Python en su cliente Amazon EC2 para enviar datos a su origen de datos de Amazon MSK.

  1. Conecte con su cliente Amazon EC2.

  2. Ejecute los siguientes comandos para instalar la versión 3 de Python, Pip y el paquete Kafka para Python, y confirme las acciones:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configure AWS CLI en su máquina cliente introduciendo el siguiente comando:

    aws configure

    Proporcione las credenciales de su cuenta y us-east-1 para region.

  4. Cree un archivo denominado stock.py con el siguiente contenido. Sustituya el valor de muestra por la cadena Bootstrap Brokers de su clúster de Amazon MSK y actualice el nombre del tema si su tema no es: AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Ejecute el script con el siguiente comando:

    $ python3 stock.py
  6. Deje el script en ejecución mientras completa la siguiente sección.

Prueba de su cuaderno de Studio

En esta sección, utilizará su cuaderno de Studio para consultar datos de su clúster de Amazon MSK.

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. En la página de Aplicaciones de Managed Service para Apache Flink, seleccione la pestaña Cuaderno de Studio. Elige. MyNotebook

  3. En la MyNotebookpágina, elija Abrir en Apache Zeppelin.

    La interfaz de Apache Zeppelin se abre en una pestaña nueva.

  4. En la página ¡Bienvenido a Zeppelin!, elija la nueva nota de Zeppelin.

  5. En la página Zeppelin Note, introduzca la siguiente consulta en una nota nueva:

    %flink.ssql(type=update) select * from stock

    Seleccione el icono de reproducción.

    La aplicación muestra los datos del clúster de Amazon MSK.

Para abrir el Panel de control de Apache Flink de su aplicación y ver los aspectos operativos, elija TRABAJO DE FLINK. Para obtener más información sobre el Panel de control de Flink, consulte Apache Flink Dashboard en la Guía para desarrolladores de Managed Service para Apache Flink.

Para ver más ejemplos de consultas SQL de Flink Streaming, consulte Queries en la documentación de Apache Flink.