Creare ed eseguire un servizio gestito per l'applicazione Apache Flink for Python - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Creare ed eseguire un servizio gestito per l'applicazione Apache Flink for Python

In questa sezione, crei un'applicazione Managed Service for Apache Flink per l'applicazione Python con un flusso Kinesis come sorgente e sink.

Crea risorse dipendenti

Prima di creare un servizio gestito per Apache Flink per questo esercizio, devi creare le risorse dipendenti seguenti:

  • Due flussi Kinesis per l'input e l'output.

  • Un bucket Amazon S3 per archiviare il codice dell'applicazione.

Nota

Questo tutorial presuppone che l'applicazione venga distribuita nella regione us-east-1. Se si utilizza un'altra regione, è necessario adattare tutti i passaggi di conseguenza.

Crea due stream Kinesis

Prima di creare un'applicazione Managed Service for Apache Flink per questo esercizio, crea due flussi di dati Kinesis (ExampleInputStreameExampleOutputStream) nella stessa regione che utilizzerai per distribuire l'applicazione (us-east-1 in questo esempio). L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.

Puoi creare questi flussi utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis.

Per creare i flussi di dati (AWS CLI)
  1. Per creare il primo stream (ExampleInputStream), usa il seguente comando Amazon Kinesis create-stream AWS CLI .

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

Creazione di un bucket Amazon S3

Puoi creare un bucket Amazon S3 utilizzando la relativa console. Per istruzioni per la creazione di questa risorsa, consulta gli argomenti riportati di seguito:

  • Come si crea un bucket S3? nella Guida per l'utente di Amazon Simple Storage Service. Assegna al bucket Amazon S3 un nome univoco a livello globale, ad esempio aggiungendo il tuo nome di accesso.

    Nota

    Assicurati di creare il bucket S3 nella regione che usi per questo tutorial (us-east-1).

Altre risorse

Quando crei la tua applicazione, Managed Service for Apache Flink crea le seguenti CloudWatch risorse Amazon se non esistono già:

  • Un gruppo di log chiamato /AWS/KinesisAnalytics-java/<my-application>.

  • Un flusso di log denominato kinesis-analytics-log-stream.

Configurazione dell'ambiente di sviluppo locale

Per lo sviluppo e il debug, puoi eseguire l'applicazione Python Flink sul tuo computer. Puoi avviare l'applicazione dalla riga di comando con python main.py o in un Python a tua IDE scelta.

Nota

Sulla macchina di sviluppo, devi avere Python 3.10 o 3.11, Java 11, Apache Maven e Git installati. Ti consigliamo di usare un IDE esempio o Visual Studio Code. PyCharm Per verificare che tutti i prerequisiti siano soddisfatti, consulta Soddisfa i prerequisiti per completare gli esercizi prima di procedere.

Per sviluppare l'applicazione ed eseguirla localmente, è necessario installare la libreria Flink Python.

  1. Crea un ambiente Python autonomo VirtualEnv usando Conda o qualsiasi strumento Python simile.

  2. Installa la PyFlink libreria in quell'ambiente. Usa la stessa versione di runtime di Apache Flink che utilizzerai in Amazon Managed Service per Apache Flink. Attualmente, il runtime consigliato è 1.19.1.

    $ pip install apache-flink==1.19.1
  3. Assicurati che l'ambiente sia attivo quando esegui l'applicazione. Se esegui l'applicazione inIDE, assicurati che IDE stia utilizzando l'ambiente come runtime. Il processo dipende da quello IDE che state utilizzando.

    Nota

    Devi solo installare la PyFlink libreria. Non è necessario installare un cluster Apache Flink sul computer.

Autentica la tua sessione AWS

L'applicazione utilizza i flussi di dati Kinesis per pubblicare i dati. Quando si esegue localmente, è necessario disporre di una sessione AWS autenticata valida con autorizzazioni di scrittura nel flusso di dati Kinesis. Usa i seguenti passaggi per autenticare la tua sessione:

  1. Se non hai configurato il profilo AWS CLI e un profilo denominato con credenziali valide, consulta. Configura il AWS Command Line Interface (AWS CLI)

  2. Verifica che AWS CLI sia configurato correttamente e che gli utenti dispongano delle autorizzazioni per scrivere nel flusso di dati Kinesis pubblicando il seguente record di test:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Se hai IDE un plugin con cui integrarti AWS, puoi utilizzarlo per passare le credenziali all'applicazione in esecuzione su. IDE Per ulteriori informazioni, vedere AWS Toolkit for PyCharm,AWS Toolkit for Visual Studio Code AWS e Toolkit for IntelliJ. IDEA

Scarica ed esamina il codice Python per lo streaming di Apache Flink

Il codice dell'applicazione Python per questo esempio è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

  1. Clona il repository remoto con il comando seguente:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Passa alla directory ./python/GettingStarted.

Esamina i componenti dell'applicazione

Il codice dell'applicazione si trova inmain.py. Usiamo SQL embedded in Python per definire il flusso dell'applicazione.

Nota

Per un'esperienza di sviluppo ottimizzata, l'applicazione è progettata per funzionare senza modifiche al codice sia su Amazon Managed Service per Apache Flink che localmente, per lo sviluppo sulla tua macchina. L'applicazione utilizza la variabile di ambiente IS_LOCAL = true per rilevare quando è in esecuzione localmente. È necessario impostare la variabile IS_LOCAL = true di ambiente sulla shell o nella configurazione di esecuzione del fileIDE.

  • L'applicazione configura l'ambiente di esecuzione e legge la configurazione di runtime. Per funzionare sia su Amazon Managed Service for Apache Flink che localmente, l'applicazione controlla la IS_LOCAL variabile.

    • Di seguito è riportato il comportamento predefinito quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink:

      1. Carica le dipendenze impacchettate con l'applicazione. Per ulteriori informazioni, vedere (link)

      2. Carica la configurazione dalle proprietà di runtime che definisci nell'applicazione Amazon Managed Service for Apache Flink. Per ulteriori informazioni, consulta (link)

    • Quando l'applicazione rileva IS_LOCAL = true quando l'applicazione viene eseguita localmente:

      1. Carica le dipendenze esterne dal progetto.

      2. Carica la configurazione dal application_properties.json file incluso nel progetto.

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • L'applicazione definisce una tabella di origine con un'CREATE TABLEistruzione, utilizzando il Kinesis Connector. Questa tabella legge i dati dal flusso Kinesis di input. L'applicazione prende il nome dello stream, la regione e la posizione iniziale dalla configurazione di runtime.

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • L'applicazione definisce anche una tabella sink utilizzando il Kinesis Connector in questo esempio. Questo racconto invia i dati allo stream Kinesis in uscita.

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • Infine, l'applicazione esegue la tabella sink a SQL partire dalla tabella di origine. INSERT INTO... In un'applicazione più complessa, è probabile che siano necessari passaggi aggiuntivi per trasformare i dati prima di scriverli nel sink.

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • È necessario aggiungere un altro passaggio alla fine della main() funzione per eseguire l'applicazione localmente:

    if is_local: table_result.wait()

    Senza questa istruzione, l'applicazione termina immediatamente quando viene eseguita localmente. Non devi eseguire questa istruzione quando esegui l'applicazione in Amazon Managed Service for Apache Flink.

Gestisci le dipendenze JAR

Un' PyFlink applicazione richiede in genere uno o più connettori. L'applicazione in questo tutorial utilizza il Kinesis Connector. Poiché Apache Flink viene eseguito in JavaJVM, i connettori vengono distribuiti come JAR file, indipendentemente dal fatto che l'applicazione venga implementata in Python. È necessario impacchettare queste dipendenze con l'applicazione quando la si distribuisce su Amazon Managed Service for Apache Flink.

In questo esempio, mostriamo come usare Apache Maven per recuperare le dipendenze e impacchettare l'applicazione da eseguire su Managed Service for Apache Flink.

Nota

Esistono modi alternativi per recuperare e impacchettare le dipendenze. Questo esempio dimostra un metodo che funziona correttamente con uno o più connettori. Consente inoltre di eseguire l'applicazione localmente, per lo sviluppo e su Managed Service for Apache Flink senza modifiche al codice.

Usa il file pom.xml

Apache Maven utilizza il pom.xml file per controllare le dipendenze e il pacchetto delle applicazioni.

Tutte JAR le dipendenze sono specificate nel file del pom.xml blocco. <dependencies>...</dependencies>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

Per trovare l'elemento e la versione del connettore corretti da utilizzare, vedere. Usa i connettori Apache Flink con Managed Service for Apache Flink Assicurati di fare riferimento alla versione di Apache Flink che stai utilizzando. Per questo esempio, utilizziamo il connettore Kinesis. Per Apache Flink 1.19, la versione del connettore è. 4.3.0-1.19

Nota

Se si utilizza Apache Flink 1.19, non è stata rilasciata alcuna versione del connettore specifica per questa versione. Utilizzate i connettori rilasciati per la versione 1.18.

Dipendenze da scaricare e impacchettare

Usa Maven per scaricare le dipendenze definite nel pom.xml file e impacchettarle per l'applicazione Python Flink.

  1. Passa alla directory che contiene il progetto Python Getting Started chiamato. python/GettingStarted

  2. Esegui il comando seguente:

$ mvn package

Maven crea un nuovo file chiamato. ./target/pyflink-dependencies.jar Quando sviluppate localmente sulla vostra macchina, l'applicazione Python cerca questo file.

Nota

Se dimentichi di eseguire questo comando, quando tenti di eseguire l'applicazione, l'applicazione fallirà e restituirà l'errore: Could not find any factory for identifier «kinesis».

Scrivi record di esempio nel flusso di input

In questa sezione, invierai record di esempio allo stream per l'elaborazione dell'applicazione. Sono disponibili due opzioni per generare dati di esempio, utilizzando uno script Python o il Kinesis Data Generator.

Genera dati di esempio usando uno script Python

Puoi usare uno script Python per inviare record di esempio allo stream.

Nota

Per eseguire questo script Python, devi usare Python 3.x e avere installata la libreria for AWS SDKPython (Boto).

Per iniziare a inviare i dati di test al flusso di input Kinesis:

  1. Scarica lo script stock.py Python del generatore di dati dal repository Data generator GitHub .

  2. Esegui lo script stock.py:

    $ python stock.py

Mantieni lo script in esecuzione mentre completi il resto del tutorial. Ora puoi eseguire la tua applicazione Apache Flink.

Generazione di dati di esempio utilizzando Kinesis Data Generator

In alternativa all'utilizzo dello script Python, puoi usare Kinesis Data Generator, disponibile anche in una versione ospitata, per inviare dati di esempio casuali allo stream. Kinesis Data Generator viene eseguito nel browser e non è necessario installare nulla sul computer.

Per configurare ed eseguire Kinesis Data Generator:

  1. Segui le istruzioni nella documentazione di Kinesis Data Generator per configurare l'accesso allo strumento. Eseguirai un AWS CloudFormation modello che imposta un utente e una password.

  2. Accedi a Kinesis Data Generator tramite il file URL generato dal CloudFormation modello. Puoi trovarlo URL nella scheda Output dopo aver completato il CloudFormation modello.

  3. Configura il generatore di dati:

    • Regione: Seleziona la regione che stai utilizzando per questo tutorial: us-east-1

    • Stream/flusso di consegna: seleziona il flusso di input che l'applicazione utilizzerà: ExampleInputStream

    • Record al secondo: 100

    • Modello di registrazione: copia e incolla il seguente modello:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Prova il modello: scegli Modello di test e verifica che il record generato sia simile al seguente:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Avvia il generatore di dati: scegli Seleziona Invia dati.

Kinesis Data Generator sta ora inviando dati a. ExampleInputStream

Esegui l'applicazione localmente

Puoi testare l'applicazione localmente, eseguendola dalla riga di comando con python main.py o dal tuoIDE.

Per eseguire l'applicazione localmente, è necessario che sia installata la versione corretta della PyFlink libreria, come descritto nella sezione precedente. Per ulteriori informazioni, vedere (link)

Nota

Prima di continuare, verifica che i flussi di input e output siano disponibili. Per informazioni, consulta Crea due flussi di dati Amazon Kinesis. Inoltre, verifica di disporre dell'autorizzazione per leggere e scrivere da entrambi gli stream. Per informazioni, consulta Autentica la tua sessione AWS.

Importa il progetto Python nel tuo IDE

Per iniziare a lavorare sull'applicazione in usoIDE, è necessario importarla come progetto Python.

Il repository che hai clonato contiene diversi esempi. Ogni esempio è un progetto separato. Per questo tutorial, importa il contenuto della ./python/GettingStarted sottodirectory nella tuaIDE.

Importa il codice come progetto Python esistente.

Nota

Il processo esatto per importare un nuovo progetto Python varia a seconda del tipo di progetto IDE utilizzato.

Controllate la configurazione locale dell'applicazione

Quando viene eseguita localmente, l'applicazione utilizza la configurazione contenuta nel application_properties.json file nella cartella delle risorse del progetto sotto./src/main/resources. È possibile modificare questo file per utilizzare diversi nomi o regioni di stream Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Esegui la tua applicazione Python localmente

È possibile eseguire l'applicazione localmente, dalla riga di comando come un normale script Python o da. IDE

Per eseguire l'applicazione dalla riga di comando
  1. Assicurati che l'ambiente Python standalone come Conda VirtualEnv o dove hai installato la libreria Python Flink sia attualmente attivo.

  2. Assicurati di aver eseguito mvn package almeno una volta.

  3. Imposta la variabile di ambiente IS_LOCAL = true:

    $ export IS_LOCAL=true
  4. Esegui l'applicazione come un normale script Python.

    $python main.py
Per eseguire l'applicazione dall'interno di IDE
  1. Configura il tuo IDE per eseguire lo main.py script con la seguente configurazione:

    1. Usa l'ambiente Python autonomo come Conda VirtualEnv o dove hai installato la libreria. PyFlink

    2. Usa le AWS credenziali per accedere ai flussi di dati Kinesis di input e output.

    3. Imposta IS_LOCAL = true.

  2. Il processo esatto per impostare la configurazione di esecuzione dipende dall'utente IDE e varia.

  3. Dopo aver configurato il tuoIDE, esegui lo script Python e usa gli strumenti forniti da te IDE mentre l'applicazione è in esecuzione.

Ispezionate i log delle applicazioni localmente

Quando viene eseguita localmente, l'applicazione non mostra alcun registro nella console, a parte alcune righe stampate e visualizzate all'avvio dell'applicazione. PyFlink scrive i log in un file nella directory in cui è installata la libreria Python Flink. L'applicazione stampa la posizione dei log all'avvio. È inoltre possibile eseguire il comando seguente per trovare i registri:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. Elenca i file nella directory di registrazione. Di solito si trova un solo .log file.

  2. Coda il file mentre l'applicazione è in esecuzione:tail -f <log-path>/<log-file>.log.

Osserva i dati di input e output nei flussi Kinesis

Puoi osservare i record inviati al flusso di input da (Python di esempio che genera) o Kinesis Data Generator (link) utilizzando Data Viewer nella console Amazon Kinesis.

Per osservare i record:

Arresta l'esecuzione locale dell'applicazione

Arresta l'applicazione in esecuzione suIDE. IDEDi solito fornisce un'opzione di «stop». La posizione e il metodo esatti dipendono daIDE.

Package del codice dell'applicazione

In questa sezione, si utilizza Apache Maven per impacchettare il codice dell'applicazione e tutte le dipendenze richieste in un file.zip.

Esegui nuovamente il comando del pacchetto Maven:

$ mvn package

Questo comando genera il file. target/managed-flink-pyflink-getting-started-1.0.0.zip

Carica il pacchetto dell'applicazione in un bucket Amazon S3

In questa sezione, carichi il file.zip creato nella sezione precedente nel bucket Amazon Simple Storage Service (Amazon S3) creato all'inizio di questo tutorial. Se non hai completato questo passaggio, consulta (link).

Per caricare il JAR file di codice dell'applicazione
  1. Apri la console Amazon S3 all'indirizzo. https://console.aws.amazon.com/s3/

  2. Scegli il bucket che hai creato in precedenza per il codice dell'applicazione.

  3. Scegli Carica.

  4. Scegliere Add files (Aggiungi file).

  5. Passa al file.zip generato nel passaggio precedente:. target/managed-flink-pyflink-getting-started-1.0.0.zip

  6. Scegli Carica senza modificare altre impostazioni.

Crea e configura l'applicazione Managed Service for Apache Flink

È possibile creare e configurare un'applicazione Managed Service for Apache Flink utilizzando la console o il. AWS CLI Per questo tutorial, useremo la console.

Creazione dell'applicazione

  1. Apri la console Managed Service for Apache Flink su /flink https://console.aws.amazon.com

  2. Verifica che sia selezionata la regione corretta: Stati Uniti orientali (Virginia settentrionale) us-east-1.

  3. Apri il menu a destra e scegli Applicazioni Apache Flink, quindi Crea applicazione di streaming. In alternativa, scegli Crea applicazione di streaming dalla sezione Guida introduttiva della pagina iniziale.

  4. Nella pagina Crea applicazioni di streaming:

    • Per Scegli un metodo per configurare l'applicazione di elaborazione dello stream, scegli Crea da zero.

    • Per la configurazione di Apache Flink, versione Application Flink, scegli Apache Flink 1.19.

    • Per la configurazione dell'applicazione:

      • Per Nome applicazione, immetti MyApplication.

      • Per Descrizione, inserisci My Python test app.

      • In Accesso alle risorse dell'applicazione, scegli Create/update IAM role kinesis-analytics- MyApplication -us-east-1 con le politiche richieste.

    • Per le impostazioni di Template for applications:

      • Per Modelli, scegli Sviluppo.

    • Scegli Crea applicazione di streaming.

Nota

Quando crei un servizio gestito per l'applicazione Apache Flink utilizzando la console, hai la possibilità di creare un IAM ruolo e una policy per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste IAM risorse vengono denominate utilizzando il nome e la regione dell'applicazione come segue:

  • Policy: kinesis-analytics-service-MyApplication-us-west-2

  • Ruolo: kinesisanalytics-MyApplication-us-west-2

Amazon Managed Service for Apache Flink era precedentemente noto come Kinesis Data Analytics. Il nome delle risorse generate automaticamente ha il prefisso per garantire la compatibilità con le versioni precedenti. kinesis-analytics

Modifica la politica IAM

Modifica la IAM policy per aggiungere le autorizzazioni per accedere al bucket Amazon S3.

Per modificare la IAM policy per aggiungere le autorizzazioni del bucket S3
  1. Apri la console all'IAMindirizzo. https://console.aws.amazon.com/iam/

  2. Seleziona Policy. Scegli la policy kinesis-analytics-service-MyApplication-us-east-1 creata dalla console nella sezione precedente.

  3. Scegli Modifica, quindi scegli la JSONscheda.

  4. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (012345678901) con l'ID del tuo account.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Scegli Avanti e quindi seleziona Salva modifiche.

Configura l'applicazione

Modifica la configurazione dell'applicazione per impostare l'elemento del codice dell'applicazione.

Per configurare l'applicazione
  1. Nella MyApplicationpagina, scegli Configura.

  2. Nella sezione Posizione del codice dell'applicazione:

    • Per il bucket Amazon S3, seleziona il bucket creato in precedenza per il codice dell'applicazione. Scegli Sfoglia e seleziona il bucket corretto, quindi scegli Scegli. Non selezionare il nome del bucket.

    • Per Percorso dell'oggetto Amazon S3, inserisci managed-flink-pyflink-getting-started-1.0.0.zip

  3. Per le autorizzazioni di accesso, scegli Crea/aggiorna il IAM ruolo kinesis-analytics-MyApplication-us-east-1 con le politiche richieste.

  4. Passa alle proprietà di Runtime e mantieni i valori predefiniti per tutte le altre impostazioni.

  5. Scegliete Aggiungi nuovo elemento e aggiungete ciascuno dei seguenti parametri:

    ID gruppo Chiave Valore
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. Non modificare nessuna delle altre sezioni e scegli Salva modifiche.

Nota

Quando scegli di abilitare la CloudWatch registrazione di Amazon, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:

  • Gruppo di log: /aws/kinesis-analytics/MyApplication

  • Flusso di log: kinesis-analytics-log-stream

Esecuzione dell'applicazione.

L'applicazione è ora configurata e pronta per l'esecuzione.

Per eseguire l'applicazione
  1. Sulla console per Amazon Managed Service for Apache Flink, scegli La mia applicazione e scegli Esegui.

  2. Nella pagina successiva, nella pagina di configurazione del ripristino dell'applicazione, scegli Esegui con l'ultima istantanea, quindi scegli Esegui.

    Lo stato nell'applicazione descrive in dettaglio le transizioni da Ready Starting e poi a Running quando l'applicazione è stata avviata.

Quando l'applicazione è nello Running stato, ora puoi aprire la dashboard di Flink.

Per aprire il pannello di controllo
  1. Scegli Apri la dashboard di Apache Flink. La dashboard si apre in una nuova pagina.

  2. Nell'elenco dei lavori in esecuzione, scegli il singolo lavoro che puoi vedere.

    Nota

    Se hai impostato le proprietà di Runtime o hai modificato le IAM politiche in modo errato, lo stato dell'applicazione potrebbe cambiareRunning, ma la dashboard di Flink mostra che il lavoro viene riavviato continuamente. Si tratta di uno scenario di errore comune se l'applicazione non è configurata correttamente o non dispone delle autorizzazioni per accedere alle risorse esterne.

    Quando ciò accade, controlla la scheda Eccezioni nella dashboard di Flink per vedere la causa del problema.

Osserva le metriche dell'applicazione in esecuzione

Nella MyApplicationpagina, nella sezione Amazon CloudWatch metrics, puoi vedere alcune delle metriche fondamentali dell'applicazione in esecuzione.

Per visualizzare le metriche
  1. Accanto al pulsante Aggiorna, seleziona 10 secondi dall'elenco a discesa.

  2. Quando l'applicazione è in esecuzione ed è integra, puoi vedere la metrica di uptime aumentare continuamente.

  3. La metrica fullrestarts deve essere zero. Se è in aumento, la configurazione potrebbe presentare dei problemi. Per esaminare il problema, consulta la scheda Eccezioni nella dashboard di Flink.

  4. La metrica del numero di checkpoint non riusciti deve essere pari a zero in un'applicazione integra.

    Nota

    Questa dashboard mostra un set fisso di metriche con una granularità di 5 minuti. Puoi creare una dashboard applicativa personalizzata con qualsiasi metrica nella dashboard. CloudWatch

Osserva i dati di output nei flussi Kinesis

Assicurati di continuare a pubblicare i dati sull'input, usando lo script Python o il Kinesis Data Generator.

È ora possibile osservare l'output dell'applicazione in esecuzione su Managed Service for Apache Flink utilizzando il Data Viewer in https://console.aws.amazon.com/kinesis/, analogamente a quanto già fatto in precedenza.

Per visualizzare l'output
  1. Apri la console Kinesis in /kinesis. https://console.aws.amazon.com

  2. Verifica che la regione sia la stessa che stai usando per eseguire questo tutorial. Per impostazione predefinita, è US-East-1US East (Virginia settentrionale). Se necessario, modificare la regione.

  3. Scegli Data Streams.

  4. Seleziona lo stream che desideri osservare. Ai fini di questo tutorial, utilizza ExampleOutputStream.

  5. Scegli la scheda Data viewer.

  6. Seleziona uno Shard, mantieni Ultimo come posizione iniziale, quindi scegli Ottieni record. Potresti visualizzare l'errore «nessun record trovato per questa richiesta». In tal caso, scegli Riprova a recuperare i record. Vengono visualizzati i record più recenti pubblicati nello stream.

  7. Seleziona il valore nella colonna Dati per esaminare il contenuto del record nel JSON formato.

Arresta l'applicazione

Per interrompere l'applicazione, vai alla pagina della console dell'applicazione Managed Service for Apache Flink denominata. MyApplication

Per interrompere l'applicazione
  1. Dall'elenco a discesa Azione, scegli Stop.

  2. Lo stato nell'applicazione descrive in dettaglio le transizioni da Running e quindi a Ready quando l'applicazione viene completamente interrotta. Stopping

    Nota

    Non dimenticare di interrompere anche l'invio di dati al flusso di input dallo script Python o dal Kinesis Data Generator.

Approfondimenti

Pulisci le risorse AWS