Creazione di un'applicazione utilizzando Apache Beam - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Creazione di un'applicazione utilizzando Apache Beam

In questo esercizio, viene creata un'applicazione del servizio gestito per Apache Flink che trasforma i dati utilizzando Apache Beam. Apache Beam è un modello di programmazione per l'elaborazione di dati di streaming. Per informazioni sull'utilizzo di Apache Beam con il servizio gestito per Apache Flink, consulta Utilizzo di Apache Beam.

Nota

Per impostare i prerequisiti richiesti per questo esercizio, completa innanzitutto l'esercizio Guida introduttiva (API) DataStream .

Crea risorse dipendenti

Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, è necessario creare le seguenti risorse dipendenti:

  • Due flussi di dati Kinesis (ExampleInputStream e ExampleOutputStream)

  • Un bucket Amazon S3 per archiviare il codice dell'applicazione (ka-app-code-<username>)

Puoi creare i flussi Kinesis e un bucket S3 utilizzando la console. Per istruzioni sulla creazione di queste risorse, consulta i seguenti argomenti:

  • Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis. Assegna un nome ai flussi di dati ExampleInputStream e ExampleOutputStream.

  • Come si crea un bucket S3? nella Guida per l'utente di Amazon Simple Storage Service. Assegna al bucket Amazon S3 un nome globalmente univoco aggiungendo il tuo nome di accesso, ad esempio ka-app-code-<username>.

Scrivi record di esempio nel flusso di input

In questa sezione, viene utilizzato uno script Python per scrivere stringhe casuali nel flusso per l'applicazione da elaborare.

Nota

Questa sezione richiede AWS SDK for Python (Boto).

  1. Crea un file denominato ping.py con i seguenti contenuti:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Esegui lo script ping.py:

    $ python ping.py

    Mantieni lo script in esecuzione mentre completi il resto del tutorial.

Scarica ed esamina il codice dell'applicazione

Il codice dell'applicazione Java per questo esempio è disponibile da GitHub. Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito.

  1. Installa il client Git se non lo hai già fatto. Per ulteriori informazioni, consulta Installazione di Git.

  2. Clona il repository remoto con il comando seguente:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Passa alla directory amazon-kinesis-data-analytics-java-examples/Beam.

Il codice dell'applicazione si trova nei file BasicBeamStreamingJob.java. Tieni presente quanto segue riguardo al codice dell'applicazione:

  • L'applicazione utilizza Apache Beam ParDoper elaborare i record in entrata richiamando una funzione di trasformazione personalizzata chiamata. PingPongFn

    Il codice per richiamare la funzione PingPongFn è il seguente:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Le applicazioni del servizio gestito per Apache Flink che utilizzano Apache Beam richiedono i seguenti componenti. Se questi componenti e versioni non vengono inclusi in pom.xml, l'applicazione carica le versioni errate dalle dipendenze dell'ambiente e, poiché le versioni non corrispondono, si blocca in fase di runtime.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • La funzione di trasformazione PingPongFn passa i dati di input nel flusso di output, a meno che i dati di input non siano ping, nel qual caso nel flusso di output viene emessa la stringa pong\n.

    Il codice della funzione di trasformazione è il seguente:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Compila il codice dell'applicazione

Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

  1. Installa Java e Maven se non lo hai già fatto. Per ulteriori informazioni, consulta Prerequisiti nel tutorial Guida introduttiva (API) DataStream .

  2. Compila l'applicazione con il seguente comando:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    Nota

    Il codice di origine fornito si basa sulle librerie di Java 11.

La compilazione dell'applicazione crea il file JAR dell'applicazione (target/basic-beam-app-1.0.jar).

Carica il codice Java per lo streaming di Apache Flink

In questa sezione, il codice dell'applicazione viene caricato nel bucket Amazon S3 creato nella sezione Crea risorse dipendenti.

  1. Nella console Amazon S3, scegli il bucket ka-app-code - e scegli Carica. <username>

  2. Nella fase Seleziona file, scegli Aggiungi file. Individua il file basic-beam-app-1.0.jar creato nella fase precedente.

  3. Non è necessario modificare alcuna delle impostazioni dell'oggetto, quindi scegli Carica.

Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.

Crea ed esegui l'applicazione Managed Service for Apache Flink

Segui questi passaggi per creare, configurare, aggiornare ed eseguire l'applicazione utilizzando la console.

Creazione dell'applicazione

  1. Apri la console del servizio gestito per Apache Flink all'indirizzo https://console.aws.amazon.com/flink

  2. Nella dashboard del servizio gestito per Apache Flink, scegli Crea un'applicazione di analisi.

  3. Nella pagina Servizio gestito per Apache Flink: crea applicazione, fornisci i dettagli dell'applicazione nel modo seguente:

    • Per Nome applicazione, inserisci MyApplication.

    • Per Runtime, scegli Apache Flink.

      Nota

      Apache Beam non è attualmente compatibile con Apache Flink versione 1.19 o successiva.

    • Seleziona Apache Flink versione 1.15 dal menu a discesa della versione.

  4. Per Autorizzazioni di accesso, scegli Crea/aggiorna kinesis-analytics-MyApplication-us-west-2 per il ruolo IAM.

  5. Scegli Crea applicazione.

Nota

Quando crei un'applicazione del servizio gestito per Apache Flink tramite la console, hai la possibilità di avere un ruolo e una policy IAM creati per l'applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:

  • Policy: kinesis-analytics-service-MyApplication-us-west-2

  • Ruolo: kinesis-analytics-MyApplication-us-west-2

Modifica la policy IAM

Modifica la policy IAM per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.

  1. Apri la console IAM all'indirizzo https://console.aws.amazon.com/iam/.

  2. Seleziona Policy. Scegli la policy kinesis-analytics-service-MyApplication-us-west-2 creata dalla console nella sezione precedente.

  3. Nella pagina Riepilogo, scegli Modifica policy. Scegli la scheda JSON.

  4. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci gli ID account di esempio (012345678901) con il tuo ID account.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configura l'applicazione

  1. Nella MyApplicationpagina, scegli Configura.

  2. Nella pagina Configura applicazione, fornisci la Posizione del codice:

    • Per Bucket Amazon S3, inserisci ka-app-code-<username>.

    • Per Percorso dell'oggetto Amazon S3, inserisci basic-beam-app-1.0.jar

  3. In Accedi alle risorse dell'applicazione, per Autorizzazioni di accesso, scegli Crea/aggiorna kinesis-analytics-MyApplication-us-west-2 per il ruolo IAM.

  4. Inserisci i seguenti dati:

    ID gruppo Chiave Valore
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. In Monitoraggio, accertati che il Monitoraggio del livello dei parametri sia impostato su Applicazione.

  6. Per la CloudWatch registrazione, seleziona la casella di controllo Abilita.

  7. Scegli Aggiorna.

Nota

Quando scegli di abilitare la CloudWatch registrazione, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:

  • Gruppo di log: /aws/kinesis-analytics/MyApplication

  • Flusso di log: kinesis-analytics-log-stream

Questo flusso di log viene utilizzato per monitorare l'applicazione. Non si tratta dello stesso flusso di log utilizzato dall'applicazione per inviare i risultati.

Esecuzione dell'applicazione.

Il grafico del processo Flink può essere visualizzato eseguendo l'applicazione, aprendo il pannello di controllo di Apache Flink e scegliendo il processo Flink desiderato.

Puoi controllare le metriche del servizio gestito per Apache Flink sulla CloudWatch console per verificare che l'applicazione funzioni.

Pulisci le risorse AWS

Questa sezione include le procedure per ripulire AWS le risorse create nel tutorial di Tumbling Window.

Eliminare l'applicazione Managed Service for Apache Flink

  1. Apri la console del servizio gestito per Apache Flink all'indirizzo https://console.aws.amazon.com/flink

  2. nel pannello Managed Service for Apache Flink, scegli. MyApplication

  3. Nella pagina dell'applicazione, scegli Elimina e quindi conferma l'eliminazione.

Eliminare i flussi di dati Kinesis

  1. Apri la console Kinesis all'indirizzo https://console.aws.amazon.com/kinesis.

  2. Nel pannello Kinesis Data Streams, scegli. ExampleInputStream

  3. Nella ExampleInputStreampagina, scegli Elimina Kinesis Stream e conferma l'eliminazione.

  4. Nella pagina Kinesis Streams, scegli, scegli Azioni ExampleOutputStream, scegli Elimina, quindi conferma l'eliminazione.

Elimina l'oggetto e il bucket Amazon S3

  1. Apri la console Amazon S3 all'indirizzo https://console.aws.amazon.com/s3/.

  2. <username>Scegli il bucket ka-app-code-.

  3. Per confermare l'eliminazione, scegli Elimina, quindi inserisci il nome del bucket.

Elimina le tue risorse IAM

  1. Aprire la console IAM all'indirizzo https://console.aws.amazon.com/iam/.

  2. Nella barra di navigazione, scegli Policy.

  3. Nel controllo filtro, inserisci kinesis.

  4. Scegli la politica kinesis-analytics-service- MyApplication -us-west-2.

  5. Seleziona Operazioni di policy e quindi Elimina.

  6. Nella barra di navigazione, scegli Ruoli.

  7. Scegli il ruolo kinesis-analytics- MyApplication -us-west-2.

  8. Quindi scegli Elimina ruolo e conferma l'eliminazione.

CloudWatch Elimina le tue risorse

  1. Apri la CloudWatch console all'indirizzo https://console.aws.amazon.com/cloudwatch/.

  2. Nella barra di navigazione, scegli Log.

  3. Scegli il gruppo di log MyApplication/aws/kinesis-analytics/.

  4. Quindi scegli Elimina gruppo di log e conferma l'eliminazione.

Passaggi successivi

Ora che hai creato ed eseguito un'applicazione di base del servizio gestito per Apache Flink che trasforma i dati utilizzando Apache Beam, consulta la seguente applicazione come esempio di una soluzione del servizio gestito per Apache Flink più avanzata.