Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Creazione di un notebook Studio con Amazon MSK
Questo tutorial descrive come creare un notebook Studio che utilizza un cluster Amazon MSK come origine.
Questo tutorial contiene le sezioni seguenti:
Installazione
Per questo tutorial, è necessario un cluster Amazon MSK che consenta l'accesso al testo in chiaro. Se non disponi già di un cluster Amazon MSK, segui il tutorial Nozioni di base per l'uso di Amazon MSK per creare un Amazon VPC, un cluster Amazon MSK, un argomento e un'istanza client Amazon EC2.
Seguendo il tutorial, completa le seguenti operazioni:
Nella Fase 3: creazione di un cluster Amazon MSK, passaggio 4, modifica il valore
ClientBroker
daTLS
aPLAINTEXT
.
Aggiungi un gateway NAT al tuo VPC
Se hai creato un cluster Amazon MSK seguendo il tutorial Nozioni di base per l'uso di Amazon MSK o se il tuo Amazon VPC esistente non dispone già di un gateway NAT per le sue sottoreti private, devi aggiungere un gateway NAT al tuo Amazon VPC. Il diagramma seguente illustra l'architettura generale.
![](images/vpc_05.png)
Per creare un gateway NAT per il tuo Amazon VPC, procedi come segue:
Apri la console Amazon VPC all'indirizzo https://console.aws.amazon.com/vpc/
. Scegli Gateway NAT dalla barra di navigazione a sinistra.
Nella pagina Gateway NAT, scegli Crea gateway NAT.
Nella pagina Crea gateway NAT, specifica i seguenti valori:
Nome: opzionale ZeppelinGateway
Sottorete AWS KafkaTutorialSubnet1 ID di allocazione IP elastico Scegli un IP elastico disponibile. Se non ci sono IP elastici disponibili, scegli Allocate Elastic IP, quindi scegli l'IP Elasic creato dalla console. Scegli Crea un gateway NAT.
Nella barra di navigazione a sinistra, seleziona Tabelle di routing.
Seleziona Crea tabella di routing.
Nella pagina Crea tabella di routing, fornisci le seguenti informazioni:
Tag nome:
ZeppelinRouteTable
VPC: scegli il tuo VPC (ad esempio VPC).AWS KafkaTutorial
Scegli Crea.
Nell'elenco delle tabelle dei percorsi, scegli. ZeppelinRouteTable Seleziona la scheda Route, seleziona Modifica route.
Nella scheda Modifica route scegli Aggiungi route.
Per Destinazione, inserisci
0.0.0.0/0
. Per Target, scegli NAT Gateway, ZeppelinGateway. Seleziona Salva route. Scegli Chiudi.Nella pagina Tabelle delle rotte, con l'ZeppelinRouteTableopzione selezionata, scegli la scheda Associazioni di sottoreti. Scegli Modifica associazioni sottorete.
Nella pagina Modifica associazioni di sottoreti, scegli AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet 3. Selezionare Salva.
Crea una AWS Glue connessione e una tabella
Il notebook Studio utilizza un database AWS Glue per i metadati sull'origine dati Amazon MSK. In questa sezione, crei una AWS Glue connessione che descrive come accedere al tuo cluster Amazon MSK e una AWS Glue tabella che descrive come presentare i dati della tua origine dati a client come il tuo notebook Studio.
Creazione di una connessione
Accedi AWS Management Console e apri la AWS Glue console all'indirizzo https://console.aws.amazon.com/glue/
. Se non disponi già di un AWS Glue database, scegli Database dalla barra di navigazione a sinistra. Scegli Aggiungi database. Nella finestra Aggiungi database, inserisci
default
per Nome database. Scegli Crea.Scegli Connessioni dalla barra di navigazione a sinistra. Scegli Aggiungi connessione.
Nella finestra Aggiungi connessione, fornisci i seguenti valori:
Per Nome connessione, inserisci
ZeppelinConnection
.Per Tipo di connessione, scegli Kafka.
Per URL del server di bootstrap Kafka, fornisci la stringa del broker bootstrap per il tuo cluster. È possibile ottenere i broker di bootstrap dalla console MSK o immettendo il seguente comando CLI:
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn
ClusterArn
Deseleziona la casella di controllo Richiedi connessione SSL.
Seleziona Successivo.
Nella pagina VPC, fornisci i seguenti valori:
Per VPC, scegli il nome del tuo VPC (ad esempio VPC). AWS KafkaTutorial
Per Subnet, scegli 2.AWS KafkaTutorialSubnet
Per Gruppi di sicurezza, scegli tutti i gruppi disponibili.
Seleziona Successivo.
Nella pagina Proprietà di connessione/Accesso alla connessione, scegli Finisci.
Creazione di una tabella
Nota
È possibile creare manualmente la tabella come descritto nei passaggi seguenti oppure utilizzare il codice del connettore di creazione tabella per il servizio gestito per Apache Flink nel notebook all'interno di Apache Zeppelin per creare la tabella tramite un'istruzione DDL. È quindi possibile effettuare il check-in AWS Glue per assicurarsi che la tabella sia stata creata correttamente.
Nella barra di navigazione a sinistra, seleziona Tabelle. Nella pagina Tabelle, scegli Aggiungi tabelle > Aggiungi tabella manualmente.
Nella pagina Imposta le proprietà della tabella, inserisci
stock
per Nome tabella. Assicurati di selezionare il database creato in precedenza. Seleziona Successivo.Nella pagina Aggiungi un datastore, scegli Kafka. Per il nome dell'argomento, inserisci il nome dell'argomento (ad es. AWS KafkaTutorialTopic). Per Connessione, scegli ZeppelinConnection.
Nella pagina Classificazione, scegli JSON. Seleziona Successivo.
Nella pagina Definisci uno schema, scegli Aggiungi colonna per aggiungere una colonna. Aggiungi colonne con le seguenti proprietà:
Nome colonna Tipo di dati ticker
string
price
double
Seleziona Successivo.
Nella pagina successiva, verifica le impostazioni e scegli Fine.
-
Scegli la tabella appena creata dall'elenco delle tabelle.
-
Scegli Modifica tabella e aggiungi una proprietà con la chiave
managed-flink.proctime
e il valoreproctime
. -
Scegli Applica.
Crea un notebook Studio con Amazon MSK
Ora che hai creato le risorse utilizzate dall'applicazione, puoi creare il notebook Studio.
È possibile creare l'applicazione utilizzando il AWS Management Console o il AWS CLI.
Nota
Un notebook Studio può essere creato anche dalla console Amazon MSK scegliendo un cluster esistente, quindi selezionando Elabora dati in tempo reale.
Crea un taccuino Studio utilizzando il AWS Management Console
Apri la console del servizio gestito per Apache Flink all'indirizzo https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
. Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Studio. Scegli Crea notebook Studio.
Nota
Per creare un notebook Studio dalle console Amazon MSK o del flusso di dati Kinesis, seleziona il cluster Amazon MSK o il flusso di dati Kinesis di input, quindi scegli Elabora dati in tempo reale.
Nella pagina Crea notebook Studio, immetti le seguenti informazioni:
-
Inserisci
MyNotebook
per Nome notebook Studio. Scegli l'impostazione predefinita per il database AWS Glue.
Scegli Crea notebook Studio.
-
Nella MyNotebookpagina, scegli la scheda Configurazione. Nella sezione Reti, scegli Modifica.
Nella MyNotebook pagina Modifica rete per, scegli la configurazione VPC basata sul cluster Amazon MSK. Scegli il cluster Amazon MSK per Cluster Amazon MSK. Seleziona Salvataggio delle modifiche.
Nella MyNotebookpagina, scegli Esegui. Attendi che lo stato mostri In esecuzione.
Crea un taccuino Studio utilizzando il AWS CLI
Per creare il tuo taccuino Studio utilizzando il AWS CLI, procedi come segue:
Assicurati di disporre delle informazioni riportate di seguito. Questi valori sono necessari per creare l'applicazione.
ID dell'account.
Gli ID di sottorete e l'ID del gruppo di sicurezza per Amazon VPC che contiene il cluster Amazon MSK.
Crea un file denominato
create.json
con i seguenti contenuti. Sostituisci i valori segnaposto con le tue informazioni.{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::
AccountID
:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1
", "SubnetID 2
", "SubnetID 3
" ], "SecurityGroupIds": [ "VPC Security Group ID
" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID
:database/default" } } } } }Per creare l'applicazione, esegui il comando riportato di seguito:
aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
Una volta completata l'esecuzione del comando, dovresti visualizzare un output simile al seguente, che mostra i dettagli per il nuovo notebook Studio:
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
Per avviare l'applicazione, esegui il comando riportato di seguito. Sostituisci il valore segnaposto con il tuo ID account.
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:
012345678901
:application/MyNotebook\
Invio di dati al cluster Amazon MSK
In questa sezione, esegui uno script Python nel client Amazon EC2 per inviare dati all'origine dati Amazon MSK.
Esegui la connessione al client Amazon EC2.
Esegui i seguenti comandi per installare Python versione 3, Pip e il pacchetto Kafka per Python e conferma le operazioni:
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
Configuralo AWS CLI sul tuo computer client inserendo il seguente comando:
aws configure
Fornisci le credenziali del tuo account e
us-east-1
perregion
.Crea un file denominato
stock.py
con i seguenti contenuti. Sostituisci il valore di esempio con la stringa Bootstrap Brokers del tuo cluster Amazon MSK e aggiorna il nome dell'argomento se l'argomento non è: AWS KafkaTutorialTopicfrom kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "
<<Bootstrap Broker List>>
" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())Esegui lo script con il comando seguente:
$ python3 stock.py
Lascia lo script in esecuzione mentre completi la sezione seguente.
Test del notebook Studio
In questa sezione, il notebook Studio viene utilizzato per eseguire query sui dati del cluster Amazon MSK.
Apri la console del servizio gestito per Apache Flink all'indirizzo https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
. Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Notebook Studio. Scegli. MyNotebook
Nella MyNotebookpagina, scegli Apri in Apache Zeppelin.
L'interfaccia di Apache Zeppelin viene aperta in una nuova scheda.
Nella pagina Ti diamo il benvenuto su Zeppelin!, scegli Nuova nota Zeppelin.
Nella pagina Nota Zeppelin, inserisci la seguente query in una nuova nota:
%flink.ssql(type=update) select * from stock
Seleziona l'icona dell'esecuzione.
L'applicazione mostra i dati del cluster Amazon MSK.
Per aprire il pannello di controllo di Apache Flink per la tua applicazione e visualizzare gli aspetti operativi, scegli PROCESSO FLINK. Per ulteriori informazioni sul pannello di controllo di Flink, consulta Pannello di controllo di Apache Flink nella Guida per gli sviluppatori del servizio gestito per Apache Flink.
Per altri esempi di query Streaming SQL in Flink, consulta Query