Creazione di un notebook Studio con Amazon MSK - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Creazione di un notebook Studio con Amazon MSK

Questo tutorial descrive come creare un notebook Studio che utilizza un cluster Amazon MSK come origine.

Installazione

Per questo tutorial, è necessario un cluster Amazon MSK che consenta l'accesso al testo in chiaro. Se non disponi già di un cluster Amazon MSK, segui il tutorial Nozioni di base per l'uso di Amazon MSK per creare un Amazon VPC, un cluster Amazon MSK, un argomento e un'istanza client Amazon EC2.

Seguendo il tutorial, completa le seguenti operazioni:

Aggiungi un gateway NAT al tuo VPC

Se hai creato un cluster Amazon MSK seguendo il tutorial Nozioni di base per l'uso di Amazon MSK o se il tuo Amazon VPC esistente non dispone già di un gateway NAT per le sue sottoreti private, devi aggiungere un gateway NAT al tuo Amazon VPC. Il diagramma seguente illustra l'architettura generale.

Per creare un gateway NAT per il tuo Amazon VPC, procedi come segue:

  1. Apri la console Amazon VPC all'indirizzo https://console.aws.amazon.com/vpc/.

  2. Scegli Gateway NAT dalla barra di navigazione a sinistra.

  3. Nella pagina Gateway NAT, scegli Crea gateway NAT.

  4. Nella pagina Crea gateway NAT, specifica i seguenti valori:

    Nome: opzionale ZeppelinGateway
    Sottorete AWS KafkaTutorialSubnet1
    ID di allocazione IP elastico Scegli un IP elastico disponibile. Se non ci sono IP elastici disponibili, scegli Allocate Elastic IP, quindi scegli l'IP Elasic creato dalla console.

    Scegli Crea un gateway NAT.

  5. Nella barra di navigazione a sinistra, seleziona Tabelle di routing.

  6. Seleziona Crea tabella di routing.

  7. Nella pagina Crea tabella di routing, fornisci le seguenti informazioni:

    • Tag nome: ZeppelinRouteTable

    • VPC: scegli il tuo VPC (ad esempio VPC).AWS KafkaTutorial

    Scegli Crea.

  8. Nell'elenco delle tabelle dei percorsi, scegli. ZeppelinRouteTable Seleziona la scheda Route, seleziona Modifica route.

  9. Nella scheda Modifica route scegli Aggiungi route.

  10. Per Destinazione, inserisci 0.0.0.0/0. Per Target, scegli NAT Gateway, ZeppelinGateway. Seleziona Salva route. Scegli Chiudi.

  11. Nella pagina Tabelle delle rotte, con l'ZeppelinRouteTableopzione selezionata, scegli la scheda Associazioni di sottoreti. Scegli Modifica associazioni sottorete.

  12. Nella pagina Modifica associazioni di sottoreti, scegli AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet 3. Selezionare Salva.

Crea una AWS Glue connessione e una tabella

Il notebook Studio utilizza un database AWS Glue per i metadati sull'origine dati Amazon MSK. In questa sezione, crei una AWS Glue connessione che descrive come accedere al tuo cluster Amazon MSK e una AWS Glue tabella che descrive come presentare i dati della tua origine dati a client come il tuo notebook Studio.

Creazione di una connessione
  1. Accedi AWS Management Console e apri la AWS Glue console all'indirizzo https://console.aws.amazon.com/glue/.

  2. Se non disponi già di un AWS Glue database, scegli Database dalla barra di navigazione a sinistra. Scegli Aggiungi database. Nella finestra Aggiungi database, inserisci default per Nome database. Scegli Crea.

  3. Scegli Connessioni dalla barra di navigazione a sinistra. Scegli Aggiungi connessione.

  4. Nella finestra Aggiungi connessione, fornisci i seguenti valori:

    • Per Nome connessione, inserisci ZeppelinConnection.

    • Per Tipo di connessione, scegli Kafka.

    • Per URL del server di bootstrap Kafka, fornisci la stringa del broker bootstrap per il tuo cluster. È possibile ottenere i broker di bootstrap dalla console MSK o immettendo il seguente comando CLI:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Deseleziona la casella di controllo Richiedi connessione SSL.

    Seleziona Successivo.

  5. Nella pagina VPC, fornisci i seguenti valori:

    • Per VPC, scegli il nome del tuo VPC (ad esempio VPC). AWS KafkaTutorial

    • Per Subnet, scegli 2.AWS KafkaTutorialSubnet

    • Per Gruppi di sicurezza, scegli tutti i gruppi disponibili.

    Seleziona Successivo.

  6. Nella pagina Proprietà di connessione/Accesso alla connessione, scegli Finisci.

Creazione di una tabella
Nota

È possibile creare manualmente la tabella come descritto nei passaggi seguenti oppure utilizzare il codice del connettore di creazione tabella per il servizio gestito per Apache Flink nel notebook all'interno di Apache Zeppelin per creare la tabella tramite un'istruzione DDL. È quindi possibile effettuare il check-in AWS Glue per assicurarsi che la tabella sia stata creata correttamente.

  1. Nella barra di navigazione a sinistra, seleziona Tabelle. Nella pagina Tabelle, scegli Aggiungi tabelle > Aggiungi tabella manualmente.

  2. Nella pagina Imposta le proprietà della tabella, inserisci stock per Nome tabella. Assicurati di selezionare il database creato in precedenza. Seleziona Successivo.

  3. Nella pagina Aggiungi un datastore, scegli Kafka. Per il nome dell'argomento, inserisci il nome dell'argomento (ad es. AWS KafkaTutorialTopic). Per Connessione, scegli ZeppelinConnection.

  4. Nella pagina Classificazione, scegli JSON. Seleziona Successivo.

  5. Nella pagina Definisci uno schema, scegli Aggiungi colonna per aggiungere una colonna. Aggiungi colonne con le seguenti proprietà:

    Nome colonna Tipo di dati
    ticker string
    price double

    Seleziona Successivo.

  6. Nella pagina successiva, verifica le impostazioni e scegli Fine.

  7. Scegli la tabella appena creata dall'elenco delle tabelle.

  8. Scegli Modifica tabella e aggiungi una proprietà con la chiave managed-flink.proctime e il valore proctime.

  9. Scegli Applica.

Crea un notebook Studio con Amazon MSK

Ora che hai creato le risorse utilizzate dall'applicazione, puoi creare il notebook Studio.

È possibile creare l'applicazione utilizzando il AWS Management Console o il AWS CLI.
Nota

Un notebook Studio può essere creato anche dalla console Amazon MSK scegliendo un cluster esistente, quindi selezionando Elabora dati in tempo reale.

Crea un taccuino Studio utilizzando il AWS Management Console

  1. Apri la console del servizio gestito per Apache Flink all'indirizzo https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Studio. Scegli Crea notebook Studio.

    Nota

    Per creare un notebook Studio dalle console Amazon MSK o del flusso di dati Kinesis, seleziona il cluster Amazon MSK o il flusso di dati Kinesis di input, quindi scegli Elabora dati in tempo reale.

  3. Nella pagina Crea notebook Studio, immetti le seguenti informazioni:

    • Inserisci MyNotebook per Nome notebook Studio.

    • Scegli l'impostazione predefinita per il database AWS Glue.

    Scegli Crea notebook Studio.

  4. Nella MyNotebookpagina, scegli la scheda Configurazione. Nella sezione Reti, scegli Modifica.

  5. Nella MyNotebook pagina Modifica rete per, scegli la configurazione VPC basata sul cluster Amazon MSK. Scegli il cluster Amazon MSK per Cluster Amazon MSK. Seleziona Salvataggio delle modifiche.

  6. Nella MyNotebookpagina, scegli Esegui. Attendi che lo stato mostri In esecuzione.

Crea un taccuino Studio utilizzando il AWS CLI

Per creare il tuo taccuino Studio utilizzando il AWS CLI, procedi come segue:

  1. Assicurati di disporre delle informazioni riportate di seguito. Questi valori sono necessari per creare l'applicazione.

    • ID dell'account.

    • Gli ID di sottorete e l'ID del gruppo di sicurezza per Amazon VPC che contiene il cluster Amazon MSK.

  2. Crea un file denominato create.json con i seguenti contenuti. Sostituisci i valori segnaposto con le tue informazioni.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Per creare l'applicazione, esegui il comando riportato di seguito:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Una volta completata l'esecuzione del comando, dovresti visualizzare un output simile al seguente, che mostra i dettagli per il nuovo notebook Studio:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Per avviare l'applicazione, esegui il comando riportato di seguito. Sostituisci il valore segnaposto con il tuo ID account.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Invio di dati al cluster Amazon MSK

In questa sezione, esegui uno script Python nel client Amazon EC2 per inviare dati all'origine dati Amazon MSK.

  1. Esegui la connessione al client Amazon EC2.

  2. Esegui i seguenti comandi per installare Python versione 3, Pip e il pacchetto Kafka per Python e conferma le operazioni:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configuralo AWS CLI sul tuo computer client inserendo il seguente comando:

    aws configure

    Fornisci le credenziali del tuo account e us-east-1 per region.

  4. Crea un file denominato stock.py con i seguenti contenuti. Sostituisci il valore di esempio con la stringa Bootstrap Brokers del tuo cluster Amazon MSK e aggiorna il nome dell'argomento se l'argomento non è: AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Esegui lo script con il comando seguente:

    $ python3 stock.py
  6. Lascia lo script in esecuzione mentre completi la sezione seguente.

Test del notebook Studio

In questa sezione, il notebook Studio viene utilizzato per eseguire query sui dati del cluster Amazon MSK.

  1. Apri la console del servizio gestito per Apache Flink all'indirizzo https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Notebook Studio. Scegli. MyNotebook

  3. Nella MyNotebookpagina, scegli Apri in Apache Zeppelin.

    L'interfaccia di Apache Zeppelin viene aperta in una nuova scheda.

  4. Nella pagina Ti diamo il benvenuto su Zeppelin!, scegli Nuova nota Zeppelin.

  5. Nella pagina Nota Zeppelin, inserisci la seguente query in una nuova nota:

    %flink.ssql(type=update) select * from stock

    Seleziona l'icona dell'esecuzione.

    L'applicazione mostra i dati del cluster Amazon MSK.

Per aprire il pannello di controllo di Apache Flink per la tua applicazione e visualizzare gli aspetti operativi, scegli PROCESSO FLINK. Per ulteriori informazioni sul pannello di controllo di Flink, consulta Pannello di controllo di Apache Flink nella Guida per gli sviluppatori del servizio gestito per Apache Flink.

Per altri esempi di query Streaming SQL in Flink, consulta Query nella documentazione di Apache Flink.