Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Creación de un cuaderno de Studio con Amazon MSK
En este tutorial se describe cómo crear un cuaderno de Studio que utilice un clúster de Amazon MSK como fuente.
Este tutorial contiene las siguientes secciones:
Configuración de un clúster de Amazon EKS
Para este tutorial, necesita un clúster de Amazon MSK que permita el acceso a texto sin formato. Si aún no ha configurado un clúster de Amazon MSK, siga el tutorial Cómo empezar a utilizar Amazon MSK para crear una Amazon VPC, un clúster de Amazon MSK, un tema y una instancia de cliente de Amazon EC2.
Al seguir el tutorial, haga lo siguiente:
En el paso 3: cree un clúster de Amazon MSK, en el paso 4, cambie el
ClientBrokervalor deTLSaPLAINTEXT.
Agregación de una puerta de enlace NAT a su VPC
Si ha creado un clúster de Amazon MSK siguiendo el tutorial Cómo empezar a utilizar Amazon MSK, o si su Amazon VPC existente aún no tiene una puerta de enlace NAT para sus subredes privadas, debe añadir una puerta de enlace NAT a su Amazon VPC. En el siguiente diagrama se muestra la arquitectura.
Para crear una puerta de enlace NAT para su Amazon VPC, haga lo siguiente:
Abra la consola de Amazon VPC en https://console.aws.amazon.com/vpc/
. En la barra de navegación izquierda, elija puertas de enlace NAT.
En la página puertas de enlace NAT, seleccione Crear puerta de enlace NAT.
En la página Crear puerta de enlace NAT, especifique los valores siguientes:
Nombre: optional ZeppelinGatewaySubred AWSKafkaTutorialSubnet1 Identificación de asignación de dirección IP elástica Choose an available Elastic IP. If there are no Elastic IPs available, choose Asignación de dirección IP elástica, and then choose the Elasic IP that the console creates. Elija Create a NAT Gateway (Crear una puerta de enlace NAT).
En la de navegación izquierda, elija Tablas de ruteo.
Elija Create Route Table (Crear tabla de ruteo).
En la página Crear tabla de enrutamiento, proporcione la siguiente información:
Name tag:
ZeppelinRouteTableVPC: elija su VPC (por ejemplo, AWSKafkaTutorialVPC).
Seleccione Crear.
En la lista de tablas de enrutamiento, elija ZeppelinRouteTable. Elija la pestaña Rutas y, a continuación, Editar rutas.
En la pestaña Editar rutas, elija Añadir rutas.
En Para Destino, escriba
0.0.0.0/0. Para Destino, elija puerta de enlace NAT, ZeppelinGateway. Elija Guardar rutas. Seleccione Cerrar.En la página de Tabla de enrutamiento, con ZeppelinRouteTable seleccionada, elija la pestaña Asociaciones de subredes. Elija Editar asociaciones de subredes.
En la página Editar asociaciones de subredes, elija AWSKafkaTutorialSubnet2 y AWSKafkaTutorialSubnet3. Seleccione Save.
Creación de una conexión y tabla de AWS Glue
Su cuaderno de Studio utiliza una base de datos AWS Glue para los metadatos sobre su origen de datos de Amazon MSK. En esta sección, crea una AWS Glue conexión que describe cómo acceder a su clúster de Amazon MSK y una tabla AWS Glue que describe cómo presentar los datos de su origen de datos a clientes como su cuaderno de Studio.
Creación de una conexión
Inicie sesión en la Consola de administración de AWS y abra la consola de AWS Glue en https://console.aws.amazon.com/glue/
. Si aún no tiene una base de datos AWS Glue, elija Bases de datos en la barra de navegación izquierda. Elija Agregar una base de datos. En la ventana Añadir base de datos, introduzca
defaulten el nombre de la base de datos. Seleccione Crear.En la barra de navegación de la izquierda, seleccione Conexiones. Elija Añadir conexión.
En la ventana Añadir conexión, introduzca los siguientes valores:
En Nombre de conexión, ingrese
ZeppelinConnection.En Tipo de conexión, elija Kafka.
En las URL del servidor de arranque de Kafka, proporcione la cadena del agente de arranque del clúster. Se puede obtener los agentes de arranque desde la consola MSK o ingresando el siguiente comando de la CLI:
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arnClusterArnDesactive la casilla de verificación Exigir conexión SSL.
Elija Siguiente.
En la página VPC, especifique los valores siguientes:
Para VPC, elija el nombre de su VPC (por ejemplo, AWSKafkaTutorialVPC).
En Subred, elija AWSKafkaTutorialSubnet2.
Para los grupos de seguridad, elija todos los grupos disponibles.
Elija Siguiente.
En la página Propiedades de la conexión o Acceso a la conexión, seleccione Finalizar.
Crear una tabla
nota
Se puede crear la tabla manualmente tal y como se describe en los pasos siguientes, o puede usar el código conector de creación de tablas para Managed Service para Apache Flink en su cuaderno en Apache Zeppelin para crear la tabla mediante una instrucción DDL. A continuación, puede comprobar AWS Glue para asegurarse de que la tabla se creó correctamente.
En la barra de navegación izquierda, seleccione Tablas. En la página Tablas, seleccione Añadir tablas y Añadir tabla manualmente.
En la página Configurar las propiedades de la tabla, introduzca
stockcomo Nombre de la tabla. Asegúrese de seleccionar la base de datos que creó anteriormente. Elija Siguiente.En la página Añadir almacén de datos, elija Kafka. Para el Nombre del tema, introduzca el nombre del tema (por ejemplo, AWSKafkaTutorialTopic). Para Conexión, elija ZeppelinConnection.
En la página de Clasificación, seleccione JSON. Elija Siguiente.
En la página Definir un esquema, elija Añadir columna para añadir una. Añada columnas con las siguientes propiedades:
Nombre de la columna Tipo de datos tickercadenapreciodoubleElija Siguiente.
En la página siguiente, verifique su configuración y seleccione Finalizar.
-
Elija la tabla recién creada de la lista de tablas.
-
Seleccione Editar tabla y agregue las siguientes propiedades:
-
clave:
managed-flink.proctime, valor:proctime -
clave:
flink.properties.group.id, valor:test-consumer-group -
clave:
flink.properties.auto.offset.reset, valor:latest -
clave:
classification, valor:json
Sin estos pares clave/valor, se produce un error en el cuaderno Flink.
-
-
Seleccione Aplicar.
Creación de un cuaderno de Studio con Amazon MSK
Ahora que ha creado los recursos que utiliza su aplicación, cree su cuaderno de Studio.
Se puede crear su aplicación mediante Consola de administración de AWS o AWS CLI.
nota
También puede crear un cuaderno de Studio desde la consola Amazon MSK seleccionando un clúster existente y, a continuación, seleccionando Procesar datos en tiempo real.
Creación de un cuaderno de Studio con la Consola de administración de AWS
Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
. En la página de Aplicaciones de Managed Service para Apache Flink, seleccione la pestaña Studio. Seleccione Crear cuaderno de Studio.
nota
Para crear un cuaderno de Studio desde las consolas Amazon MSK o Kinesis Data Streams, seleccione el clúster Amazon MSK o el flujo de datos de Kinesis de entrada y, a continuación, elija Procesar datos en tiempo real.
En la página Crear cuaderno de Studio, proporcione la siguiente información:
-
Introduzca
MyNotebookcomo Nombre del cuaderno de Studio. Elija el valor predeterminado para la Base de datos de Glue de AWS.
Seleccione Crear cuaderno de Studio.
-
En la página MyNotebook, seleccione la pestaña Configuración. En la sección Redes, elija Editar.
En la página Editar redes para MyNotebook, elija la configuración de VPC basada en el clúster de Amazon MSK. Elija su clúster de Amazon MSK para el Clúster de Amazon MSK. Seleccione Save changes (Guardar cambios).
En la página MyNotebook, seleccione Ejecutar. Espere a que el Estado muestre En ejecución.
Creación de un cuaderno de Studio con la AWS CLI
Para crear su cuaderno de Studio usando la AWS CLI, haga lo siguiente:
Verifique que disponga de la siguiente información. Necesita estos valores para crear su aplicación.
Su ID de cuenta de .
Los ID de la subred y el ID de grupo de seguridad de Amazon VPC que contiene su clúster de Amazon MSK.
Cree un archivo denominado
create.jsoncon el siguiente contenido. Reemplace los valores de marcador de posición con su información.{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }Para crear su aplicación, ejecute el siguiente comando:
aws kinesisanalyticsv2 create-application --cli-input-json file://create.jsonUna vez completado el comando, debería ver un resultado similar al siguiente, con los detalles de su nuevo cuaderno de Studio:
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...Para iniciar su aplicación, ejecute el siguiente comando. Sustituya los valores de muestra por su ID de la cuenta.
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
Envío de datos a su clúster de Amazon MSK
En esta sección, ejecuta un script de Python en su cliente Amazon EC2 para enviar datos a su origen de datos de Amazon MSK.
Conecte con su cliente Amazon EC2.
Ejecute los siguientes comandos para instalar la versión 3 de Python, Pip y el paquete Kafka para Python, y confirme las acciones:
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-pythonConfigure el AWS CLI en su equipo cliente con el siguiente comando:
aws configureProporcione las credenciales de su cuenta y
us-east-1pararegion.Cree un archivo denominado
stock.pycon el siguiente contenido. Sustituya el valor del ejemplo por la cadena de agentes de arranque de su clúster de Amazon MSK y actualice el nombre del tema si este no es AWSKafkaTutorialTopic:from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())Ejecute el script con el siguiente comando:
$ python3 stock.pyDeje el script en ejecución mientras completa la siguiente sección.
Prueba de su cuaderno de Studio
En esta sección, utilizará su cuaderno de Studio para consultar datos de su clúster de Amazon MSK.
Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
. En la página de Aplicaciones de Managed Service para Apache Flink, seleccione la pestaña Cuaderno de Studio. Elija MyNotebook.
En la página MyNotebook, seleccione Abrir en Apache Zeppelin.
La interfaz de Apache Zeppelin se abre en una pestaña nueva.
En la página ¡Bienvenido a Zeppelin!, elija la nueva nota de Zeppelin.
En la página Zeppelin Note, introduzca la siguiente consulta en una nota nueva:
%flink.ssql(type=update) select * from stockSeleccione el icono de reproducción.
La aplicación muestra los datos del clúster de Amazon MSK.
Para abrir el Panel de control de Apache Flink de su aplicación y ver los aspectos operativos, elija TRABAJO DE FLINK. Para obtener más información sobre el Panel de control de Flink, consulte Apache Flink Dashboard en la Guía para desarrolladores de Managed Service para Apache Flink.
Para ver más ejemplos de consultas SQL de Flink Streaming, consulte Queries