Fase 3: Creazione ed esecuzione di un servizio gestito per Apache Flink per applicazioni Flink - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Fase 3: Creazione ed esecuzione di un servizio gestito per Apache Flink per applicazioni Flink

In questo esercizio, verrà creato un servizio gestito per l'applicazione Apache Flink per applicazioni Flink con flussi di dati come origine e come sink.

Creazione di due flussi di dati Amazon Kinesis

Prima di creare un servizio gestito per Apache Flink per le applicazioni Flink per questo esercizio, crea due flussi di dati Kinesis (ExampleInputStream e ExampleOutputStream). L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.

Puoi creare questi flussi utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta Creazione e aggiornamento dei flussi di dati.

Per creare i flussi di dati (AWS CLI)
  1. Per creare il primo stream (ExampleInputStream), usa il seguente comando Amazon Kinesis create-stream AWS CLI .

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Scrittura di record di esempio nel flusso di input

In questa sezione, viene utilizzato uno script Python per scrivere record di esempio nel flusso per l'applicazione da elaborare.

Nota

Questa sezione richiede AWS SDK for Python (Boto).

  1. Crea un file denominato stock.py con i seguenti contenuti:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. Successivamente nel tutorial, esegui lo script stock.py per inviare dati all'applicazione.

    $ python stock.py

Download ed esame del codice Java di streaming di Apache Flink

Il codice dell'applicazione Java per questi esempi è disponibile presso GitHub. Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

  1. Clona il repository remoto con il comando seguente:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. Passa alla directory GettingStarted.

Il codice dell'applicazione si trova nei file CustomSinkStreamingJob.java e CloudWatchLogSink.java. Tieni presente quanto segue riguardo al codice dell'applicazione:

  • L'applicazione utilizza un'origine Kinesis per leggere dal flusso di origine. Il seguente snippet crea il sink Kinesis:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

Compilazione del codice dell'applicazione

In questa sezione, viene utilizzato il compilatore Apache Maven per creare il codice Java per l'applicazione. Per ulteriori informazioni sull'installazione di Apache Maven e Java Development Kit (JDK), consulta Prerequisiti per il completamento degli esercizi.

L'applicazione Java richiede i componenti seguenti:

  • Un file Project Object Model (pom.xml). Questo file contiene le informazioni sulla configurazione e le dipendenze dell'applicazione, incluse le librerie del servizio gestito per Apache Flink per le applicazioni Flink.

  • Un metodo main contenente la logica dell'applicazione.

Nota

Per utilizzare il connettore Kinesis per la seguente applicazione, è necessario scaricare il codice sorgente per il connettore e compilarlo come descritto nella documentazione di Apache Flink.

Per creare e compilare il codice dell'applicazione
  1. Creare un'applicazione Java/Maven nell'ambiente di sviluppo. Per ulteriori informazioni su come creare un'applicazione, consulta la documentazione per l'ambiente di sviluppo:

  2. Utilizzare il codice seguente per un file denominato StreamingJob.java.

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    Notare quanto segue riguardo l'esempio di codice precedente:

    • Questo file contiene il metodo main che definisce la funzionalità dell'applicazione.

    • L'applicazione crea connettori di origine e sink per accedere alle risorse esterne utilizzando un oggetto StreamExecutionEnvironment.

    • L'applicazione crea connettori di origine e sink utilizzando proprietà statiche. Per usare proprietà dell'applicazione dinamiche, utilizza i metodi createSourceFromApplicationProperties e createSinkFromApplicationProperties per creare i connettori. Questi metodi leggono le proprietà dell'applicazione per configurare il connettori.

  3. Per usare il codice dell'applicazione, compila il codice e comprimilo in un file JAR. È possibile compilare e creare un pacchetto del codice in uno di due modi:

    • Utilizzare lo strumento Maven della riga di comando. Crea il file JAR eseguendo il comando seguente nella directory che contiene il file pom.xml:

      mvn package
    • Utilizza il tuo ambiente di sviluppo. Per informazioni dettagliate, consulta la documentazione relativa all'ambiente di sviluppo.

    È possibile caricare il pacchetto come un file JAR, oppure comprimere il pacchetto e caricarlo come un file ZIP. Se create l'applicazione utilizzando il AWS CLI, specificate il tipo di contenuto del codice (JAR o ZIP).

  4. Se si verificano errori durante la compilazione, verifica che la variabile di ambiente JAVA_HOME sia impostata correttamente.

Se l'applicazione viene compilata correttamente, viene creato il seguente file:

target/java-getting-started-1.0.jar

Caricamento del codice Java di streaming di Apache Flink

In questa sezione, viene creato un bucket Amazon Simple Storage Service (Amazon S3) e caricato il codice dell'applicazione.

Per caricare il codice dell'applicazione
  1. Apri la console Amazon S3 all'indirizzo https://console.aws.amazon.com/s3/.

  2. Seleziona Crea bucket.

  3. Inserisci ka-app-code-<username> nel campo Nome bucket. Aggiungi un suffisso al nome del bucket, ad esempio il nome utente, per renderlo globalmente univoco. Seleziona Successivo.

  4. Nella fase Configura opzioni, non modificare le impostazioni e scegli Successivo.

  5. Nella fase Imposta autorizzazioni, non modificare le impostazioni e scegli Successivo.

  6. Seleziona Crea bucket.

  7. Nella console Amazon S3, scegli il bucket ka-app-code - e scegli Carica. <username>

  8. Nella fase Seleziona file, scegli Aggiungi file. Individua il file java-getting-started-1.0.jar creato nella fase precedente. Seleziona Successivo.

  9. Nella fase Imposta autorizzazioni, non modificare le impostazioni. Seleziona Successivo.

  10. Nella fase Imposta proprietà, non modificare le impostazioni. Scegli Carica.

Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.

Creazione ed esecuzione di un servizio gestito per Apache Flink per applicazioni Flink

È possibile creare ed eseguire un servizio gestito per Apache Flink per le applicazioni Flink utilizzando la console o la AWS CLI.

Nota

Quando crei l'applicazione utilizzando la console, le tue risorse AWS Identity and Access Management (IAM) e Amazon CloudWatch Logs vengono create automaticamente. Quando crei l'applicazione utilizzando AWS CLI, crei queste risorse separatamente.

Creazione ed esecuzione dell'applicazione (console)

Segui questi passaggi per creare, configurare, aggiornare ed eseguire l'applicazione utilizzando la console.

Creazione dell'applicazione

  1. Apri la console Kinesis all'indirizzo https://console.aws.amazon.com/kinesis.

  2. Sul pannello di controllo di Amazon Kinesis, scegli Crea applicazione di analisi.

  3. Nella pagina Kinesis Analytics - Create application (Kinesis Analytics - Crea applicazione), fornire i dettagli dell'applicazione come segue:

    • Per Nome applicazione, immetti MyApplication.

    • Per Descrizione, inserisci My java test app.

    • Per Runtime, scegliere Apache Flink 1.6.

  4. Per Autorizzazioni di accesso, scegli Crea/aggiorna kinesis-analytics-MyApplication-us-west-2 per il ruolo IAM.

  5. Scegli Crea applicazione.

Nota

Quando crei un servizio gestito per Apache Flink per l'applicazione Flink tramite la console, hai la possibilità di avere un ruolo e una policy IAM creati per l'applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:

  • Policy: kinesis-analytics-service-MyApplication-us-west-2

  • Ruolo: kinesis-analytics-MyApplication-us-west-2

Modifica della policy IAM

Modifica la policy IAM per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.

  1. Apri la console IAM all'indirizzo https://console.aws.amazon.com/iam/.

  2. Seleziona Policy. Scegli la policy kinesis-analytics-service-MyApplication-us-west-2 creata dalla console nella sezione precedente.

  3. Nella pagina Riepilogo, scegli Modifica policy. Scegli la scheda JSON.

  4. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci gli ID account di esempio (012345678901) con il tuo ID account.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configurazione dell'applicazione

  1. Nella MyApplicationpagina, scegli Configura.

  2. Nella pagina Configura applicazione, fornisci la Posizione del codice:

    • Per Bucket Amazon S3, inserisci ka-app-code-<username>.

    • Per Percorso dell'oggetto Amazon S3, inserisci java-getting-started-1.0.jar

  3. In Accesso alle risorse dell'applicazione, per Autorizzazioni di accesso, scegli Crea/aggiorna kinesis-analytics-MyApplication-us-west-2 per il ruolo IAM.

  4. In Proprietà, per ID gruppo, inserisci ProducerConfigProperties.

  5. Immetti i valori e le proprietà dell'applicazione seguenti:

    Chiave Valore
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. In Monitoraggio, accertati che il Monitoraggio del livello dei parametri sia impostato su Applicazione.

  7. Per la CloudWatch registrazione, seleziona la casella di controllo Abilita.

  8. Scegli Aggiorna.

Nota

Quando scegli di abilitare la CloudWatch registrazione, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:

  • Gruppo di log: /aws/kinesis-analytics/MyApplication

  • Flusso di log: kinesis-analytics-log-stream

Esecuzione dell'applicazione

  1. Nella MyApplicationpagina, scegli Esegui. Conferma l'operazione.

  2. Quando l'applicazione è in esecuzione, aggiorna la pagina. La console mostra il Grafico dell'applicazione.

Interruzione dell'applicazione

Nella MyApplicationpagina, scegli Stop. Conferma l'operazione.

Aggiornamento dell'applicazione

Tramite la console, puoi aggiornare le impostazioni dell'applicazione, ad esempio le proprietà dell'applicazione, le impostazioni di monitoraggio e la posizione o il nome di file del JAR dell'applicazione. Puoi anche ricaricare il JAR dell'applicazione dal bucket Amazon S3 se è necessario aggiornare il codice dell'applicazione.

Nella MyApplicationpagina, scegli Configura. Aggiorna le impostazioni dell'applicazione e scegli Aggiorna.

Creazione ed esecuzione dell'applicazione (AWS CLI)

In questa sezione, si utilizza AWS CLI per creare ed eseguire l'applicazione Managed Service for Apache Flink. Managed Service for Apache Flink for Flink Applications utilizza il kinesisanalyticsv2 AWS CLI comando per creare e interagire con le applicazioni Managed Service for Apache Flink.

Creazione di una policy di autorizzazione

Innanzitutto, crea una policy di autorizzazione con due istruzioni: una che concede le autorizzazioni per l'operazione read sul flusso di origine e un'altra che concede le autorizzazioni per operazioni write sul flusso di sink. Collega quindi la policy a un ruolo IAM (che verrà creato nella sezione successiva). Pertanto, quando il servizio gestito per Apache Flink assume il ruolo, il servizio disporrà delle autorizzazioni necessarie per leggere dal flusso di origine e scrivere nel flusso di sink.

Utilizza il codice seguente per creare la policy di autorizzazione KAReadSourceStreamWriteSinkStream. Sostituisci username con il nome utente utilizzato per creare il bucket Amazon S3 per archiviare il codice dell'applicazione. Sostituisci l'ID account nei nomi della risorsa Amazon (ARN) (012345678901) con il tuo ID account.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Per step-by-step istruzioni su come creare una politica di autorizzazioni, consulta il Tutorial: Create and Attach Your First Customer Managed Policy nella IAM User Guide.

Nota

Per accedere ad altri AWS servizi, puoi utilizzare il AWS SDK for Java. Il servizio gestito per Apache Flink imposta automaticamente le credenziali richieste dall'SDK su quelle del ruolo IAM di esecuzione del servizio associato all'applicazione. Non sono richieste fasi aggiuntive.

Creazione di un ruolo IAM

In questa sezione, viene creato un ruolo IAM per il servizio gestito per Apache Flink per le applicazioni Flink che può essere assunto per leggere un flusso di origine e scrivere nel flusso di sink.

Il servizio gestito per Apache Flink non può accedere al tuo flusso senza autorizzazioni. Queste autorizzazioni possono essere assegnate con un ruolo IAM. Ad ogni ruolo IAM sono collegate due policy. La policy di attendibilità concede al servizio gestito per Apache Flink l'autorizzazione per assumere il ruolo e la policy di autorizzazione determina cosa può fare il servizio assumendo questo ruolo.

Collega la policy di autorizzazione creata nella sezione precedente a questo ruolo.

Per creare un ruolo IAM
  1. Aprire la console IAM all'indirizzo https://console.aws.amazon.com/iam/.

  2. Nel riquadro di navigazione, seleziona Ruoli, quindi Crea nuovo ruolo.

  3. In Seleziona tipo di identità attendibile, scegli Servizio AWS . In Scegli il servizio che utilizzerà questo ruolo, scegli Kinesis. In Seleziona il tuo caso d'uso, scegli Analisi dei dati Kinesis.

    Scegli Successivo: Autorizzazioni.

  4. Nella pagina Allega policy di autorizzazione, seleziona Successivo: esamina. Collega le policy di autorizzazione dopo aver creato il ruolo.

  5. Nella pagina Crea ruolo, immetti KA-stream-rw-role per Nome ruolo. Scegli Crea ruolo.

    È stato creato un nuovo ruolo IAM denominato KA-stream-rw-role. Successivamente, aggiorna le policy di attendibilità e di autorizzazione per il ruolo.

  6. Collega la policy di autorizzazione al ruolo.

    Nota

    Per questo esercizio, il servizio gestito per Apache Flink assume questo ruolo per la lettura di dati da un flusso di dati Kinesis (origine) e la scrittura dell'output in un altro flusso di dati Kinesis. Pertanto, devi collegare la policy creata nella fase precedente, Creazione di una policy di autorizzazione.

    1. Nella pagina Riepilogo, scegli la scheda Autorizzazioni.

    2. Scegliere Collega policy.

    3. Nella casella di ricerca, immetti KAReadSourceStreamWriteSinkStream (la policy creata nella sezione precedente).

    4. Scegli la ReadInputStreamWriteOutputStream politica KA, e scegli Allega politica.

È stato creato il ruolo di esecuzione del servizio che l'applicazione utilizzerà per accedere alle risorse. Prendi nota dell'ARN del nuovo ruolo.

Per step-by-step istruzioni sulla creazione di un ruolo, consulta Creating an IAM Role (Console) nella IAM User Guide.

Creazione dell'applicazione del servizio gestito per Apache Flink

  1. Salvare il seguente codice JSON in un file denominato create_request.json. Sostituisci l'ARN del ruolo di esempio con l'ARN per il ruolo creato in precedenza. Sostituisci il suffisso dell'ARN del bucket (username) con il suffisso scelto nella sezione precedente. Sostituisci l'ID account di esempio (012345678901) nel ruolo di esecuzione del servizio con il tuo ID account.

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. Esegui l'operazione CreateApplication con la richiesta precedente per creare l'applicazione:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

L'applicazione è ora creata. Avvia l'applicazione nella fase successiva.

Avvio dell'applicazione

In questa sezione, viene utilizzata l'operazione StartApplication per avviare l'applicazione.

Per avviare l'applicazione
  1. Salvare il seguente codice JSON in un file denominato start_request.json.

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. Esegui l'operazione StartApplication con la richiesta precedente per avviare l'applicazione:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

L'applicazione è ora in esecuzione. Puoi controllare i parametri del servizio gestito per Apache Flink sulla CloudWatch console Amazon per verificare che l'applicazione funzioni.

Interruzione dell'applicazione

In questa sezione, viene utilizzata l'operazione StopApplication per interrompere l'applicazione.

Per interrompere l'applicazione
  1. Salvare il seguente codice JSON in un file denominato stop_request.json.

    {"ApplicationName": "test" }
  2. Esegui l'operazione StopApplication con la seguente richiesta di interrompere l'applicazione:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

L'applicazione è ora interrotta.