Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Creare ed eseguire un servizio gestito per l'applicazione Apache Flink for Python
In questa sezione, crei un'applicazione Managed Service for Apache Flink per l'applicazione Python con un flusso Kinesis come sorgente e sink.
Questa sezione contiene le fasi seguenti.
- Crea risorse dipendenti
- Configurazione dell'ambiente di sviluppo locale
- Scarica ed esamina il codice Python per lo streaming di Apache Flink
- Gestisci le dipendenze JAR
- Scrivi record di esempio nel flusso di input
- Esegui l'applicazione localmente
- Osserva i dati di input e output nei flussi Kinesis
- Arresta l'esecuzione locale dell'applicazione
- Package del codice dell'applicazione
- Carica il pacchetto dell'applicazione in un bucket Amazon S3
- Crea e configura l'applicazione Managed Service for Apache Flink
- Approfondimenti
Crea risorse dipendenti
Prima di creare un servizio gestito per Apache Flink per questo esercizio, devi creare le risorse dipendenti seguenti:
-
Due flussi Kinesis per l'input e l'output.
-
Un bucket Amazon S3 per archiviare il codice dell'applicazione.
Nota
Questo tutorial presuppone che l'applicazione venga distribuita nella regione us-east-1. Se si utilizza un'altra regione, è necessario adattare tutti i passaggi di conseguenza.
Crea due stream Kinesis
Prima di creare un'applicazione Managed Service for Apache Flink per questo esercizio, crea due flussi di dati Kinesis (ExampleInputStream
eExampleOutputStream
) nella stessa regione che utilizzerai per distribuire l'applicazione (us-east-1 in questo esempio). L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.
Puoi creare questi flussi utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis.
Per creare i flussi di dati (AWS CLI)
-
Per creare il primo stream (
ExampleInputStream
), usa il seguente comando Amazon Kinesiscreate-stream
AWS CLI .$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
-
Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
Creazione di un bucket Amazon S3
Puoi creare un bucket Amazon S3 utilizzando la relativa console. Per istruzioni per la creazione di questa risorsa, consulta gli argomenti riportati di seguito:
-
Come si crea un bucket S3? nella Guida per l'utente di Amazon Simple Storage Service. Assegna al bucket Amazon S3 un nome univoco a livello globale, ad esempio aggiungendo il tuo nome di accesso.
Nota
Assicurati di creare il bucket S3 nella regione che usi per questo tutorial (us-east-1).
Altre risorse
Quando crei la tua applicazione, Managed Service for Apache Flink crea le seguenti CloudWatch risorse Amazon se non esistono già:
-
Un gruppo di log chiamato
/AWS/KinesisAnalytics-java/<my-application>
. -
Un flusso di log denominato
kinesis-analytics-log-stream
.
Configurazione dell'ambiente di sviluppo locale
Per lo sviluppo e il debug, puoi eseguire l'applicazione Python Flink sul tuo computer. Puoi avviare l'applicazione dalla riga di comando con python
main.py
o in un Python a tua IDE scelta.
Nota
Sulla macchina di sviluppo, devi avere Python 3.10 o 3.11, Java 11, Apache Maven e Git installati. Ti consigliamo di usare un IDE esempio o Visual Studio Code. PyCharm
Installa la libreria PyFlink
Per sviluppare l'applicazione ed eseguirla localmente, è necessario installare la libreria Flink Python.
-
Crea un ambiente Python autonomo VirtualEnv usando Conda o qualsiasi strumento Python simile.
-
Installa la PyFlink libreria in quell'ambiente. Usa la stessa versione di runtime di Apache Flink che utilizzerai in Amazon Managed Service per Apache Flink. Attualmente, il runtime consigliato è 1.19.1.
$ pip install apache-flink==1.19.1
-
Assicurati che l'ambiente sia attivo quando esegui l'applicazione. Se esegui l'applicazione inIDE, assicurati che IDE stia utilizzando l'ambiente come runtime. Il processo dipende da quello IDE che state utilizzando.
Nota
Devi solo installare la PyFlink libreria. Non è necessario installare un cluster Apache Flink sul computer.
Autentica la tua sessione AWS
L'applicazione utilizza i flussi di dati Kinesis per pubblicare i dati. Quando si esegue localmente, è necessario disporre di una sessione AWS autenticata valida con autorizzazioni di scrittura nel flusso di dati Kinesis. Usa i seguenti passaggi per autenticare la tua sessione:
-
Se non hai configurato il profilo AWS CLI e un profilo denominato con credenziali valide, consulta. Configura il AWS Command Line Interface (AWS CLI)
-
Verifica che AWS CLI sia configurato correttamente e che gli utenti dispongano delle autorizzazioni per scrivere nel flusso di dati Kinesis pubblicando il seguente record di test:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Se hai IDE un plugin con cui integrarti AWS, puoi utilizzarlo per passare le credenziali all'applicazione in esecuzione su. IDE Per ulteriori informazioni, vedere AWS Toolkit for PyCharm,AWS Toolkit for
Visual Studio Code AWS e Toolkit for IntelliJ. IDEA
Scarica ed esamina il codice Python per lo streaming di Apache Flink
Il codice dell'applicazione Python per questo esempio è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:
-
Clona il repository remoto con il comando seguente:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Passa alla directory
./python/GettingStarted
.
Esamina i componenti dell'applicazione
Il codice dell'applicazione si trova inmain.py
. Usiamo SQL embedded in Python per definire il flusso dell'applicazione.
Nota
Per un'esperienza di sviluppo ottimizzata, l'applicazione è progettata per funzionare senza modifiche al codice sia su Amazon Managed Service per Apache Flink che localmente, per lo sviluppo sulla tua macchina. L'applicazione utilizza la variabile di ambiente IS_LOCAL =
true
per rilevare quando è in esecuzione localmente. È necessario impostare la variabile IS_LOCAL = true
di ambiente sulla shell o nella configurazione di esecuzione del fileIDE.
-
L'applicazione configura l'ambiente di esecuzione e legge la configurazione di runtime. Per funzionare sia su Amazon Managed Service for Apache Flink che localmente, l'applicazione controlla la
IS_LOCAL
variabile.-
Di seguito è riportato il comportamento predefinito quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink:
-
Carica le dipendenze impacchettate con l'applicazione. Per ulteriori informazioni, vedere (link)
-
Carica la configurazione dalle proprietà di runtime che definisci nell'applicazione Amazon Managed Service for Apache Flink. Per ulteriori informazioni, consulta (link)
-
-
Quando l'applicazione rileva
IS_LOCAL = true
quando l'applicazione viene eseguita localmente:-
Carica le dipendenze esterne dal progetto.
-
Carica la configurazione dal
application_properties.json
file incluso nel progetto.... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
L'applicazione definisce una tabella di origine con un'
CREATE TABLE
istruzione, utilizzando il KinesisConnector. Questa tabella legge i dati dal flusso Kinesis di input. L'applicazione prende il nome dello stream, la regione e la posizione iniziale dalla configurazione di runtime. table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
-
L'applicazione definisce anche una tabella sink utilizzando il Kinesis Connector
in questo esempio. Questo racconto invia i dati allo stream Kinesis in uscita. table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
-
Infine, l'applicazione esegue la tabella sink a SQL partire dalla tabella di origine.
INSERT INTO...
In un'applicazione più complessa, è probabile che siano necessari passaggi aggiuntivi per trasformare i dati prima di scriverli nel sink.table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
-
È necessario aggiungere un altro passaggio alla fine della
main()
funzione per eseguire l'applicazione localmente:if is_local: table_result.wait()
Senza questa istruzione, l'applicazione termina immediatamente quando viene eseguita localmente. Non devi eseguire questa istruzione quando esegui l'applicazione in Amazon Managed Service for Apache Flink.
Gestisci le dipendenze JAR
Un' PyFlink applicazione richiede in genere uno o più connettori. L'applicazione in questo tutorial utilizza il Kinesis
In questo esempio, mostriamo come usare Apache Maven per recuperare le dipendenze e impacchettare l'applicazione da eseguire su Managed Service for Apache Flink.
Nota
Esistono modi alternativi per recuperare e impacchettare le dipendenze. Questo esempio dimostra un metodo che funziona correttamente con uno o più connettori. Consente inoltre di eseguire l'applicazione localmente, per lo sviluppo e su Managed Service for Apache Flink senza modifiche al codice.
Usa il file pom.xml
Apache Maven utilizza il pom.xml
file per controllare le dipendenze e il pacchetto delle applicazioni.
Tutte JAR le dipendenze sono specificate nel file del pom.xml
blocco. <dependencies>...</dependencies>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
Per trovare l'elemento e la versione del connettore corretti da utilizzare, vedere. Usa i connettori Apache Flink con Managed Service for Apache Flink Assicurati di fare riferimento alla versione di Apache Flink che stai utilizzando. Per questo esempio, utilizziamo il connettore Kinesis. Per Apache Flink 1.19, la versione del connettore è. 4.3.0-1.19
Nota
Se si utilizza Apache Flink 1.19, non è stata rilasciata alcuna versione del connettore specifica per questa versione. Utilizzate i connettori rilasciati per la versione 1.18.
Dipendenze da scaricare e impacchettare
Usa Maven per scaricare le dipendenze definite nel pom.xml
file e impacchettarle per l'applicazione Python Flink.
-
Passa alla directory che contiene il progetto Python Getting Started chiamato.
python/GettingStarted
-
Esegui il comando seguente:
$ mvn package
Maven crea un nuovo file chiamato. ./target/pyflink-dependencies.jar
Quando sviluppate localmente sulla vostra macchina, l'applicazione Python cerca questo file.
Nota
Se dimentichi di eseguire questo comando, quando tenti di eseguire l'applicazione, l'applicazione fallirà e restituirà l'errore: Could not find any factory for identifier «kinesis».
Scrivi record di esempio nel flusso di input
In questa sezione, invierai record di esempio allo stream per l'elaborazione dell'applicazione. Sono disponibili due opzioni per generare dati di esempio, utilizzando uno script Python o il Kinesis
Genera dati di esempio usando uno script Python
Puoi usare uno script Python per inviare record di esempio allo stream.
Nota
Per eseguire questo script Python, devi usare Python 3.x e avere installata la libreria for AWS SDKPython
Per iniziare a inviare i dati di test al flusso di input Kinesis:
-
Scarica lo script
stock.py
Python del generatore di dati dal repository Data generator GitHub. -
Esegui lo script
stock.py
:$ python stock.py
Mantieni lo script in esecuzione mentre completi il resto del tutorial. Ora puoi eseguire la tua applicazione Apache Flink.
Generazione di dati di esempio utilizzando Kinesis Data Generator
In alternativa all'utilizzo dello script Python, puoi usare Kinesis Data Generator
Per configurare ed eseguire Kinesis Data Generator:
-
Segui le istruzioni nella documentazione di Kinesis Data Generator
per configurare l'accesso allo strumento. Eseguirai un AWS CloudFormation modello che imposta un utente e una password. -
Accedi a Kinesis Data Generator tramite il file URL generato dal CloudFormation modello. Puoi trovarlo URL nella scheda Output dopo aver completato il CloudFormation modello.
-
Configura il generatore di dati:
-
Regione: Seleziona la regione che stai utilizzando per questo tutorial: us-east-1
-
Stream/flusso di consegna: seleziona il flusso di input che l'applicazione utilizzerà:
ExampleInputStream
-
Record al secondo: 100
-
Modello di registrazione: copia e incolla il seguente modello:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Prova il modello: scegli Modello di test e verifica che il record generato sia simile al seguente:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Avvia il generatore di dati: scegli Seleziona Invia dati.
Kinesis Data Generator sta ora inviando dati a. ExampleInputStream
Esegui l'applicazione localmente
Puoi testare l'applicazione localmente, eseguendola dalla riga di comando con python main.py
o dal tuoIDE.
Per eseguire l'applicazione localmente, è necessario che sia installata la versione corretta della PyFlink libreria, come descritto nella sezione precedente. Per ulteriori informazioni, vedere (link)
Nota
Prima di continuare, verifica che i flussi di input e output siano disponibili. Per informazioni, consulta Crea due flussi di dati Amazon Kinesis. Inoltre, verifica di disporre dell'autorizzazione per leggere e scrivere da entrambi gli stream. Per informazioni, consulta Autentica la tua sessione AWS.
Importa il progetto Python nel tuo IDE
Per iniziare a lavorare sull'applicazione in usoIDE, è necessario importarla come progetto Python.
Il repository che hai clonato contiene diversi esempi. Ogni esempio è un progetto separato. Per questo tutorial, importa il contenuto della ./python/GettingStarted
sottodirectory nella tuaIDE.
Importa il codice come progetto Python esistente.
Nota
Il processo esatto per importare un nuovo progetto Python varia a seconda del tipo di progetto IDE utilizzato.
Controllate la configurazione locale dell'applicazione
Quando viene eseguita localmente, l'applicazione utilizza la configurazione contenuta nel application_properties.json
file nella cartella delle risorse del progetto sotto./src/main/resources
. È possibile modificare questo file per utilizzare diversi nomi o regioni di stream Kinesis.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Esegui la tua applicazione Python localmente
È possibile eseguire l'applicazione localmente, dalla riga di comando come un normale script Python o da. IDE
Per eseguire l'applicazione dalla riga di comando
-
Assicurati che l'ambiente Python standalone come Conda VirtualEnv o dove hai installato la libreria Python Flink sia attualmente attivo.
-
Assicurati di aver eseguito
mvn package
almeno una volta. -
Imposta la variabile di ambiente
IS_LOCAL = true
:$ export IS_LOCAL=true
-
Esegui l'applicazione come un normale script Python.
$python main.py
Per eseguire l'applicazione dall'interno di IDE
-
Configura il tuo IDE per eseguire lo
main.py
script con la seguente configurazione:-
Usa l'ambiente Python autonomo come Conda VirtualEnv o dove hai installato la libreria. PyFlink
-
Usa le AWS credenziali per accedere ai flussi di dati Kinesis di input e output.
-
Imposta
IS_LOCAL = true
.
-
-
Il processo esatto per impostare la configurazione di esecuzione dipende dall'utente IDE e varia.
-
Dopo aver configurato il tuoIDE, esegui lo script Python e usa gli strumenti forniti da te IDE mentre l'applicazione è in esecuzione.
Ispezionate i log delle applicazioni localmente
Quando viene eseguita localmente, l'applicazione non mostra alcun registro nella console, a parte alcune righe stampate e visualizzate all'avvio dell'applicazione. PyFlink scrive i log in un file nella directory in cui è installata la libreria Python Flink. L'applicazione stampa la posizione dei log all'avvio. È inoltre possibile eseguire il comando seguente per trovare i registri:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
Elenca i file nella directory di registrazione. Di solito si trova un solo
.log
file. -
Coda il file mentre l'applicazione è in esecuzione:
tail -f <log-path>/<log-file>.log
.
Osserva i dati di input e output nei flussi Kinesis
Puoi osservare i record inviati al flusso di input da (Python di esempio che genera) o Kinesis Data Generator (link) utilizzando Data Viewer nella console Amazon Kinesis.
Per osservare i record:
Arresta l'esecuzione locale dell'applicazione
Arresta l'applicazione in esecuzione suIDE. IDEDi solito fornisce un'opzione di «stop». La posizione e il metodo esatti dipendono daIDE.
Package del codice dell'applicazione
In questa sezione, si utilizza Apache Maven per impacchettare il codice dell'applicazione e tutte le dipendenze richieste in un file.zip.
Esegui nuovamente il comando del pacchetto Maven:
$ mvn package
Questo comando genera il file. target/managed-flink-pyflink-getting-started-1.0.0.zip
Carica il pacchetto dell'applicazione in un bucket Amazon S3
In questa sezione, carichi il file.zip creato nella sezione precedente nel bucket Amazon Simple Storage Service (Amazon S3) creato all'inizio di questo tutorial. Se non hai completato questo passaggio, consulta (link).
Per caricare il JAR file di codice dell'applicazione
Apri la console Amazon S3 all'indirizzo. https://console.aws.amazon.com/s3/
-
Scegli il bucket che hai creato in precedenza per il codice dell'applicazione.
-
Scegli Carica.
-
Scegliere Add files (Aggiungi file).
-
Passa al file.zip generato nel passaggio precedente:.
target/managed-flink-pyflink-getting-started-1.0.0.zip
-
Scegli Carica senza modificare altre impostazioni.
Crea e configura l'applicazione Managed Service for Apache Flink
È possibile creare e configurare un'applicazione Managed Service for Apache Flink utilizzando la console o il. AWS CLI Per questo tutorial, useremo la console.
Creazione dell'applicazione
Apri la console Managed Service for Apache Flink su /flink https://console.aws.amazon.com
-
Verifica che sia selezionata la regione corretta: Stati Uniti orientali (Virginia settentrionale) us-east-1.
-
Apri il menu a destra e scegli Applicazioni Apache Flink, quindi Crea applicazione di streaming. In alternativa, scegli Crea applicazione di streaming dalla sezione Guida introduttiva della pagina iniziale.
-
Nella pagina Crea applicazioni di streaming:
-
Per Scegli un metodo per configurare l'applicazione di elaborazione dello stream, scegli Crea da zero.
-
Per la configurazione di Apache Flink, versione Application Flink, scegli Apache Flink 1.19.
-
Per la configurazione dell'applicazione:
-
Per Nome applicazione, immetti
MyApplication
. -
Per Descrizione, inserisci
My Python test app
. -
In Accesso alle risorse dell'applicazione, scegli Create/update IAM role kinesis-analytics- MyApplication -us-east-1 con le politiche richieste.
-
-
Per le impostazioni di Template for applications:
-
Per Modelli, scegli Sviluppo.
-
-
Scegli Crea applicazione di streaming.
-
Nota
Quando crei un servizio gestito per l'applicazione Apache Flink utilizzando la console, hai la possibilità di creare un IAM ruolo e una policy per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste IAM risorse vengono denominate utilizzando il nome e la regione dell'applicazione come segue:
-
Policy:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Ruolo:
kinesisanalytics-
MyApplication
-us-west-2
Amazon Managed Service for Apache Flink era precedentemente noto come Kinesis Data Analytics. Il nome delle risorse generate automaticamente ha il prefisso per garantire la compatibilità con le versioni precedenti. kinesis-analytics
Modifica la politica IAM
Modifica la IAM policy per aggiungere le autorizzazioni per accedere al bucket Amazon S3.
Per modificare la IAM policy per aggiungere le autorizzazioni del bucket S3
Apri la console all'IAMindirizzo. https://console.aws.amazon.com/iam/
-
Seleziona Policy. Scegli la policy
kinesis-analytics-service-MyApplication-us-east-1
creata dalla console nella sezione precedente. -
Scegli Modifica, quindi scegli la JSONscheda.
-
Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (
012345678901
) con l'ID del tuo account.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Scegli Avanti e quindi seleziona Salva modifiche.
Configura l'applicazione
Modifica la configurazione dell'applicazione per impostare l'elemento del codice dell'applicazione.
Per configurare l'applicazione
-
Nella MyApplicationpagina, scegli Configura.
-
Nella sezione Posizione del codice dell'applicazione:
-
Per il bucket Amazon S3, seleziona il bucket creato in precedenza per il codice dell'applicazione. Scegli Sfoglia e seleziona il bucket corretto, quindi scegli Scegli. Non selezionare il nome del bucket.
-
Per Percorso dell'oggetto Amazon S3, inserisci
managed-flink-pyflink-getting-started-1.0.0.zip
-
-
Per le autorizzazioni di accesso, scegli Crea/aggiorna il IAM ruolo
kinesis-analytics-MyApplication-us-east-1
con le politiche richieste. -
Passa alle proprietà di Runtime e mantieni i valori predefiniti per tutte le altre impostazioni.
-
Scegliete Aggiungi nuovo elemento e aggiungete ciascuno dei seguenti parametri:
ID gruppo Chiave Valore InputStream0
stream.name
ExampleInputStream
InputStream0
flink.stream.initpos
LATEST
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
kinesis.analytics.flink.run.options
python
main.py
kinesis.analytics.flink.run.options
jarfile
lib/pyflink-dependencies.jar
-
Non modificare nessuna delle altre sezioni e scegli Salva modifiche.
Nota
Quando scegli di abilitare la CloudWatch registrazione di Amazon, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:
-
Gruppo di log:
/aws/kinesis-analytics/MyApplication
-
Flusso di log:
kinesis-analytics-log-stream
Esecuzione dell'applicazione.
L'applicazione è ora configurata e pronta per l'esecuzione.
Per eseguire l'applicazione
-
Sulla console per Amazon Managed Service for Apache Flink, scegli La mia applicazione e scegli Esegui.
-
Nella pagina successiva, nella pagina di configurazione del ripristino dell'applicazione, scegli Esegui con l'ultima istantanea, quindi scegli Esegui.
Lo stato nell'applicazione descrive in dettaglio le transizioni da
Ready
Starting
e poi aRunning
quando l'applicazione è stata avviata.
Quando l'applicazione è nello Running
stato, ora puoi aprire la dashboard di Flink.
Per aprire il pannello di controllo
-
Scegli Apri la dashboard di Apache Flink. La dashboard si apre in una nuova pagina.
-
Nell'elenco dei lavori in esecuzione, scegli il singolo lavoro che puoi vedere.
Nota
Se hai impostato le proprietà di Runtime o hai modificato le IAM politiche in modo errato, lo stato dell'applicazione potrebbe cambiare
Running
, ma la dashboard di Flink mostra che il lavoro viene riavviato continuamente. Si tratta di uno scenario di errore comune se l'applicazione non è configurata correttamente o non dispone delle autorizzazioni per accedere alle risorse esterne.Quando ciò accade, controlla la scheda Eccezioni nella dashboard di Flink per vedere la causa del problema.
Osserva le metriche dell'applicazione in esecuzione
Nella MyApplicationpagina, nella sezione Amazon CloudWatch metrics, puoi vedere alcune delle metriche fondamentali dell'applicazione in esecuzione.
Per visualizzare le metriche
-
Accanto al pulsante Aggiorna, seleziona 10 secondi dall'elenco a discesa.
-
Quando l'applicazione è in esecuzione ed è integra, puoi vedere la metrica di uptime aumentare continuamente.
-
La metrica fullrestarts deve essere zero. Se è in aumento, la configurazione potrebbe presentare dei problemi. Per esaminare il problema, consulta la scheda Eccezioni nella dashboard di Flink.
-
La metrica del numero di checkpoint non riusciti deve essere pari a zero in un'applicazione integra.
Nota
Questa dashboard mostra un set fisso di metriche con una granularità di 5 minuti. Puoi creare una dashboard applicativa personalizzata con qualsiasi metrica nella dashboard. CloudWatch
Osserva i dati di output nei flussi Kinesis
Assicurati di continuare a pubblicare i dati sull'input, usando lo script Python o il Kinesis Data Generator.
È ora possibile osservare l'output dell'applicazione in esecuzione su Managed Service for Apache Flink utilizzando il Data Viewer in https://console.aws.amazon.com/kinesis/
Per visualizzare l'output
Apri la console Kinesis in /kinesis. https://console.aws.amazon.com
-
Verifica che la regione sia la stessa che stai usando per eseguire questo tutorial. Per impostazione predefinita, è US-East-1US East (Virginia settentrionale). Se necessario, modificare la regione.
-
Scegli Data Streams.
-
Seleziona lo stream che desideri osservare. Ai fini di questo tutorial, utilizza
ExampleOutputStream
. -
Scegli la scheda Data viewer.
-
Seleziona uno Shard, mantieni Ultimo come posizione iniziale, quindi scegli Ottieni record. Potresti visualizzare l'errore «nessun record trovato per questa richiesta». In tal caso, scegli Riprova a recuperare i record. Vengono visualizzati i record più recenti pubblicati nello stream.
-
Seleziona il valore nella colonna Dati per esaminare il contenuto del record nel JSON formato.
Arresta l'applicazione
Per interrompere l'applicazione, vai alla pagina della console dell'applicazione Managed Service for Apache Flink denominata. MyApplication
Per interrompere l'applicazione
-
Dall'elenco a discesa Azione, scegli Stop.
-
Lo stato nell'applicazione descrive in dettaglio le transizioni da
Running
e quindi aReady
quando l'applicazione viene completamente interrotta.Stopping
Nota
Non dimenticare di interrompere anche l'invio di dati al flusso di input dallo script Python o dal Kinesis Data Generator.