Création d’un bloc-notes Studio avec Amazon MSK - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Création d’un bloc-notes Studio avec Amazon MSK

Ce didacticiel explique comment créer un bloc-notes Studio qui utilise un cluster Amazon MSK comme source.

Configuration

Pour ce didacticiel, vous avez besoin d’un cluster Amazon MSK qui autorise l’accès en texte brut. Si vous n’avez pas encore configuré de cluster Amazon MSK, suivez le didacticiel Mise en route avec Amazon MSK pour créer un VPC Amazon, un cluster Amazon MSK, une rubrique et une instance client Amazon EC2.

Lorsque vous suivez le didacticiel, procédez comme suit :

Ajoutez une passerelle NAT à votre VPC

Si vous avez créé un cluster Amazon MSK en suivant le didacticiel Mise en route avec Amazon MSK, ou si votre VPC Amazon existant ne possède pas encore de passerelle NAT pour ses sous-réseaux privés, vous devez ajouter une passerelle NAT à votre VPC Amazon. Le schéma suivant illustre l’architecture.

Pour créer une passerelle NAT pour votre Amazon VPC, procédez comme suit :

  1. Ouvrez la console Amazon VPC à l’adresse https://console.aws.amazon.com/vpc/.

  2. Dans la barre de navigation de gauche, choisissez Passerelles NAT.

  3. Sur la page Passerelles NAT, choisissez Créer une passerelle NAT.

  4. Sur la page Créer une passerelle NAT, renseignez les valeurs suivantes :

    Nom - facultatif ZeppelinGateway
    Sous-réseau AWS KafkaTutorialSubnet1
    ID d'allocation IP élastique Choisissez une adresse IP élastique disponible. Si aucune adresse IP élastique n'est disponible, choisissez Allouer une adresse IP élastique, puis choisissez l'adresse IP élastique créée par la console.

    Choisissez Créer une passerelle NAT.

  5. Dans le volet de navigation de gauche, choisissez Tables de routage.

  6. Choisissez Créer une table de routage.

  7. Sur la page Créer une table de routage, fournissez les informations suivantes :

    • Balise de nom : ZeppelinRouteTable

    • VPC : Choisissez votre VPC (par exemple, VPC).AWS KafkaTutorial

    Choisissez Créer.

  8. Dans la liste des tables de routage, choisissez ZeppelinRouteTable. Choisissez l’onglet Routes, puis Modifier les routes.

  9. Sur la page Modifier les routes, choisissez Ajouter une route.

  10. Dans le Pour Destination, saisissez 0.0.0.0/0. Pour Target, choisissez NAT Gateway, ZeppelinGateway. Choisissez Enregistrer les routes. Choisissez Fermer.

  11. Sur la page Tables de routage, lorsque cette option est ZeppelinRouteTablesélectionnée, choisissez l'onglet Associations de sous-réseaux. Choisissez Modifier les associations de sous-réseaux.

  12. Sur la page Modifier les associations de sous-réseaux, choisissez AWS KafkaTutorialSubnet2 et AWS KafkaTutorialSubnet3. Choisissez Enregistrer.

Création d'une AWS Glue connexion et d'une table

Votre bloc-notes Studio utilise une base de données AWS Glue pour les métadonnées relatives à votre source de données Amazon MSK. Dans cette section, vous créez une AWS Glue connexion qui décrit comment accéder à votre cluster Amazon MSK et un AWS Glue tableau qui décrit comment présenter les données de votre source de données à des clients tels que votre bloc-notes Studio.

Créer une connexion
  1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse https://console.aws.amazon.com/glue/.

  2. Si vous n'avez pas encore de AWS Glue base de données, choisissez Bases de données dans la barre de navigation de gauche. Choisissez Ajouter une base de données. Dans la fenêtre Ajouter une base de données, saisissez default comme nom de la base de données. Choisissez Créer.

  3. Dans le menu de navigation de gauche, sélectionnez Connexions. Choisissez Ajouter une connexion.

  4. Dans la fenêtre Ajouter une connexion, indiquez les valeurs suivantes :

    • Pour Nom de connexion, saisissez ZeppelinConnection.

    • Pour Type de connexion, choisissez Kafka.

    • Pour les URL du serveur d’amorçage Kafka, fournissez la chaîne d’agent d’amorçage de votre cluster. Vous pouvez obtenir les agents d’amorçage depuis la console MSK ou en saisissant la commande CLI suivante :

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Décochez la case Exiger une connexion SSL.

    Choisissez Suivant.

  5. Sur la page VPC, renseignez les valeurs suivantes :

    • Pour le VPC, choisissez le nom de votre VPC (par exemple, VPC.) AWS KafkaTutorial

    • Pour Sous-réseau, choisissez AWS KafkaTutorialSubnet2.

    • Pour Groupes de sécurité, sélectionnez tous les groupes disponibles.

    Choisissez Suivant.

  6. Sur la page Propriétés de la connexion/Accès à la connexion, choisissez Terminer.

Création d’une table
Note

Vous pouvez soit créer la table manuellement comme décrit dans les étapes suivantes, soit utiliser le code du connecteur de création de table pour le service géré pour Apache Flink dans votre bloc-notes dans Apache Zeppelin pour créer votre table via une instruction DDL. Vous pouvez ensuite vous enregistrer AWS Glue pour vous assurer que la table a été correctement créée.

  1. Dans la barre de navigation de gauche, choisissez Tables. Sur la page Tables, choisissez Ajouter des tables, Ajouter une table manuellement.

  2. Sur la page Configurer les propriétés de votre table, saisissez stock pour Nom de la table. Assurez-vous de sélectionner la base de données que vous avez créée précédemment. Choisissez Suivant.

  3. Sur la page Ajouter un magasin de données, choisissez Kafka. Pour le nom du sujet, entrez le nom de votre sujet (par exemple AWS KafkaTutorialTopic). Pour Connection, choisissez ZeppelinConnection.

  4. Sur la page Classification, choisissez JSON. Choisissez Suivant.

  5. Sur la page Définir un schéma, choisissez Ajouter une colonne pour ajouter une colonne. Ajoutez des colonnes avec les propriétés suivantes :

    Nom de la colonne Type de données
    ticker string
    price double

    Choisissez Suivant.

  6. Sur la page suivante, vérifiez vos paramètres, puis choisissez Terminer.

  7. Choisissez la table que vous venez de créer dans la liste des tables.

  8. Choisissez Modifier la table et ajoutez une propriété avec la clé managed-flink.proctime et la valeur proctime.

  9. Choisissez Appliquer.

Créer un bloc-notes Studio avec Amazon MSK

Maintenant que vous avez créé les ressources utilisées par votre application, vous pouvez créer votre bloc-notes Studio.

Vous pouvez créer votre application à l'aide du AWS Management Console ou du AWS CLI.
Note

Vous pouvez également créer un bloc-notes Studio à partir de la console Amazon MSK en choisissant un cluster existant, puis en choisissant Traiter les données en temps réel.

Créez un bloc-notes Studio à l'aide du AWS Management Console

  1. Ouvrez la console du service géré pour Apache Flink à l’adresse https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. Sur la page Applications de service géré pour Apache Flink, choisissez l’onglet Studio. Choisissez Créer un bloc-notes Studio.

    Note

    Pour créer un bloc-notes Studio à partir des consoles Amazon MSK ou Kinesis Data Streams, sélectionnez votre cluster Amazon MSK ou votre flux de données Kinesis d’entrée, puis choisissez Traiter les données en temps réel.

  3. Sur la page Créer un bloc-notes Studio, fournissez les informations suivantes :

    • Pour Nom du bloc-notes Studio, saisissez MyNotebook.

    • Pour Base de données AWS Glue, choisissez Par défaut.

    Choisissez Créer un bloc-notes Studio.

  4. Sur la MyNotebookpage, choisissez l'onglet Configuration. Dans la section Mise en réseau, choisissez Modifier.

  5. Dans la MyNotebook page Modifier le réseau pour, choisissez la configuration VPC basée sur le cluster Amazon MSK. Choisissez votre cluster Amazon MSK pour Cluster Amazon MSK. Sélectionnez Enregistrer les modifications.

  6. Sur la MyNotebookpage, choisissez Exécuter. Attendez que État indique En cours d’exécution.

Créez un bloc-notes Studio à l'aide du AWS CLI

Pour créer votre bloc-notes Studio à l'aide du AWS CLI, procédez comme suit :

  1. Assurez-vous de disposer des informations suivantes. Vous avez besoin de ces valeurs pour créer votre application.

    • Votre ID de compte.

    • ID de sous-réseau et l’ID du groupe de sécurité pour le VPC Amazon qui contient votre cluster Amazon MSK.

  2. Créez un fichier nommé create.json avec le contenu suivant. Remplacez les valeurs des espaces réservés par vos informations.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Pour créer votre application, exécutez la commande suivante.

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Lorsque la commande est terminée, vous devriez obtenir une sortie similaire à celle qui suit, montrant les détails de votre nouveau bloc-notes Studio :

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Pour lancer votre application, exécutez la commande suivante. Remplacez les exemples de valeur par l’identifiant de votre compte.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Envoyer des données à votre cluster Amazon MSK

Dans cette section, vous allez exécuter un script Python dans votre client Amazon EC2 pour envoyer des données à votre source de données Amazon MSK.

  1. Connectez-vous à votre client Amazon EC2.

  2. Exécutez les commandes suivantes pour installer Python version 3, Pip et le package Kafka pour Python, puis confirmez les actions :

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configurez le AWS CLI sur votre machine cliente en saisissant la commande suivante :

    aws configure

    Fournissez les informations d’identification de votre compte, et us-east-1 pour la region.

  4. Créez un fichier nommé stock.py avec le contenu suivant. Remplacez la valeur de l'échantillon par la chaîne Bootstrap Brokers de votre cluster Amazon MSK et mettez à jour le nom du sujet si celui-ci n'est pas : AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Exécutez le script avec la commande suivante :

    $ python3 stock.py
  6. Laissez le script s’exécuter pendant que vous complétez la section suivante.

Tester votre bloc-notes Studio

Dans cette section, vous utilisez votre bloc-notes Studio pour interroger les données de votre cluster Amazon MSK.

  1. Ouvrez la console du service géré pour Apache Flink à l’adresse https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. Sur la page Applications de service géré pour Apache Flink, choisissez l’onglet Bloc-notes Studio. Choisissez MyNotebook.

  3. Sur la MyNotebookpage, choisissez Ouvrir dans Apache Zeppelin.

    L’interface Apache Zeppelin s’ouvre dans un nouvel onglet.

  4. Sur la page Bienvenue sur Zeppelin !, choisissez Nouvelle note Zeppelin.

  5. Sur la page Note Zeppelin, entrez la requête suivante dans une nouvelle note :

    %flink.ssql(type=update) select * from stock

    Choisissez l’icône d’exécution.

    L’application affiche les données du cluster Amazon MSK.

Pour ouvrir le tableau de bord Apache Flink de votre application afin de visualiser les aspects opérationnels, choisissez FLINK JOB. Pour plus d’informations sur le tableau de bord Flink, consultez Tableau de bord Apache Flink dans le guide du développeur du service géré pour Apache Flink.

Pour d’autres exemples de requêtes SQL Flink Streaming, consultez la section Queries de la documentation Apache Flink.