Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
In questo esercizio, viene creata un'applicazione del servizio gestito per Apache Flink che trasforma i dati utilizzando Apache Beam
Nota
Per impostare i prerequisiti richiesti per questo esercizio, completa innanzitutto l'esercizio Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink.
Questo argomento contiene le sezioni seguenti:
Crea risorse dipendenti
Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, è necessario creare le seguenti risorse dipendenti:
Due flussi di dati Kinesis (
ExampleInputStream
eExampleOutputStream
)Un bucket Amazon S3 per archiviare il codice dell'applicazione (
ka-app-code-
)<username>
Puoi creare i flussi Kinesis e un bucket S3 utilizzando la console. Per istruzioni sulla creazione di queste risorse, consulta i seguenti argomenti:
Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis. Assegna un nome ai flussi di dati
ExampleInputStream
eExampleOutputStream
.Come si crea un bucket S3? nella Guida per l'utente di Amazon Simple Storage Service. Assegna al bucket Amazon S3 un nome globalmente univoco aggiungendo il tuo nome di accesso, ad esempio
ka-app-code-
.<username>
Scrivi record di esempio nel flusso di input
In questa sezione, viene utilizzato uno script Python per scrivere stringhe casuali nel flusso per l'applicazione da elaborare.
Nota
Questa sezione richiede AWS SDK for Python (Boto)
-
Crea un file denominato
ping.py
con i seguenti contenuti:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
Esegui lo script
ping.py
:$ python ping.py
Mantieni lo script in esecuzione mentre completi il resto del tutorial.
Scarica ed esamina il codice dell'applicazione
Il codice dell'applicazione Java per questo esempio è disponibile da GitHub. Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito.
Installa il client Git se non lo hai già fatto. Per ulteriori informazioni, consulta Installazione di Git
. Clona il repository remoto con il comando seguente:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Passa alla directory
amazon-kinesis-data-analytics-java-examples/Beam
.
Il codice dell'applicazione si trova nei file BasicBeamStreamingJob.java
. Tieni presente quanto segue riguardo al codice dell'applicazione:
L'applicazione utilizza Apache Beam ParDo
per elaborare i record in entrata richiamando una funzione di trasformazione personalizzata chiamata. PingPongFn
Il codice per richiamare la funzione
PingPongFn
è il seguente:.apply("Pong transform", ParDo.of(new PingPongFn())
Le applicazioni del servizio gestito per Apache Flink che utilizzano Apache Beam richiedono i seguenti componenti. Se questi componenti e versioni non vengono inclusi in
pom.xml
, l'applicazione carica le versioni errate dalle dipendenze dell'ambiente e, poiché le versioni non corrispondono, si blocca in fase di runtime.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
La funzione di trasformazione
PingPongFn
passa i dati di input nel flusso di output, a meno che i dati di input non siano ping, nel qual caso nel flusso di output viene emessa la stringa pong\n.Il codice della funzione di trasformazione è il seguente:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
Compila il codice dell'applicazione
Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:
Installa Java e Maven se non lo hai già fatto. Per ulteriori informazioni, consulta Completa i prerequisiti richiesti nel tutorial Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink.
Compila l'applicazione con il seguente comando:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
Nota
Il codice di origine fornito si basa sulle librerie di Java 11.
La compilazione dell'applicazione crea il JAR file dell'applicazione ()target/basic-beam-app-1.0.jar
.
Caricate il codice Java di streaming Apache Flink
In questa sezione, il codice dell'applicazione viene caricato nel bucket Amazon S3 creato nella sezione Crea risorse dipendenti.
-
Nella console Amazon S3, scegli il - ka-app-code
<username>
bucket e scegli Upload. -
Nella fase Seleziona file, scegli Aggiungi file. Individua il file
basic-beam-app-1.0.jar
creato nella fase precedente. Non è necessario modificare alcuna delle impostazioni dell'oggetto, quindi scegli Carica.
Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.
Crea ed esegui l'applicazione Managed Service for Apache Flink
Segui questi passaggi per creare, configurare, aggiornare ed eseguire l'applicazione utilizzando la console.
Creazione dell'applicazione
Apri la console Managed Service for Apache Flink all'indirizzo /flink https://console.aws.amazon.com
-
Nella dashboard del servizio gestito per Apache Flink, scegli Crea un'applicazione di analisi.
-
Nella pagina Servizio gestito per Apache Flink: crea applicazione, fornisci i dettagli dell'applicazione nel modo seguente:
-
Per Nome applicazione, inserisci
MyApplication
. -
Per Runtime, scegli Apache Flink.
Nota
Apache Beam non è attualmente compatibile con Apache Flink versione 1.19 o successiva.
Seleziona Apache Flink versione 1.15 dal menu a discesa della versione.
-
-
Per le autorizzazioni di accesso, scegli Crea/aggiorna ruolo. IAM
kinesis-analytics-MyApplication-us-west-2
-
Scegli Crea applicazione.
Nota
Quando crei un servizio gestito per l'applicazione Apache Flink utilizzando la console, hai la possibilità di creare un IAM ruolo e una policy per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste IAM risorse vengono denominate utilizzando il nome e la regione dell'applicazione come segue:
-
Policy:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Ruolo:
kinesis-analytics-MyApplication-
us-west-2
Modifica la IAM politica
Modifica la IAM policy per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.
Apri la console all'IAMindirizzo. https://console.aws.amazon.com/iam/
-
Seleziona Policy. Scegli la policy
kinesis-analytics-service-MyApplication-us-west-2
creata dalla console nella sezione precedente. -
Nella pagina Riepilogo, scegli Modifica policy. Scegli la JSONscheda.
-
Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (
012345678901
) con l'ID del tuo account.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Configura l'applicazione
-
Nella MyApplicationpagina, scegli Configura.
-
Nella pagina Configura applicazione, fornisci la Posizione del codice:
-
Per Bucket Amazon S3, inserisci
ka-app-code-
.<username>
-
Per Percorso dell'oggetto Amazon S3, inserisci
basic-beam-app-1.0.jar
-
-
In Accesso alle risorse dell'applicazione, per Autorizzazioni di accesso, scegli Crea/aggiorna IAM ruolo
kinesis-analytics-MyApplication-us-west-2
. -
Immetti i seguenti dati:
ID gruppo Chiave Valore BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
In Monitoraggio, accertati che il Monitoraggio del livello dei parametri sia impostato su Applicazione.
-
Per la CloudWatch registrazione, seleziona la casella di controllo Abilita.
-
Scegli Aggiorna.
Nota
Quando scegli di abilitare la CloudWatch registrazione, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:
-
Gruppo di log:
/aws/kinesis-analytics/MyApplication
-
Flusso di log:
kinesis-analytics-log-stream
Questo flusso di log viene utilizzato per monitorare l'applicazione. Non si tratta dello stesso flusso di log utilizzato dall'applicazione per inviare i risultati.
Esecuzione dell'applicazione.
Il grafico del processo Flink può essere visualizzato eseguendo l'applicazione, aprendo il pannello di controllo di Apache Flink e scegliendo il processo Flink desiderato.
Puoi controllare le metriche del servizio gestito per Apache Flink sulla CloudWatch console per verificare che l'applicazione funzioni.
Pulisci le risorse AWS
Questa sezione include le procedure per ripulire AWS le risorse create nel tutorial di Tumbling Window.
Questo argomento contiene le sezioni seguenti:
Eliminare l'applicazione Managed Service for Apache Flink
Apri la console Managed Service for Apache Flink all'indirizzo /flink https://console.aws.amazon.com
nel pannello Managed Service for Apache Flink, scegliete. MyApplication
Nella pagina dell'applicazione, scegli Elimina e quindi conferma l'eliminazione.
Eliminare i flussi di dati Kinesis
Apri la console Kinesis in /kinesis. https://console.aws.amazon.com
Nel pannello Kinesis Data Streams, scegli. ExampleInputStream
Nella ExampleInputStreampagina, scegli Elimina Kinesis Stream e conferma l'eliminazione.
Nella pagina Kinesis Streams, scegli, scegli Azioni ExampleOutputStream, scegli Elimina, quindi conferma l'eliminazione.
Elimina l'oggetto e il bucket Amazon S3
Apri la console Amazon S3 all'indirizzo. https://console.aws.amazon.com/s3/
Scegli il ka-app-code -
<username>
secchio.Per confermare l'eliminazione, scegli Elimina, quindi inserisci il nome del bucket.
Elimina le tue IAM risorse
Apri la IAM console all'indirizzo https://console.aws.amazon.com/iam/
. Nella barra di navigazione, scegli Policy.
Nel controllo filtro, inserisci kinesis.
Scegli la politica kinesis-analytics-service- MyApplication -us-west-2.
Seleziona Operazioni di policy e quindi Elimina.
Nella barra di navigazione, scegli Ruoli.
Scegli il ruolo kinesis-analytics- MyApplication -us-west-2.
Quindi scegli Elimina ruolo e conferma l'eliminazione.
CloudWatch Elimina le tue risorse
Apri la CloudWatch console all'indirizzo https://console.aws.amazon.com/cloudwatch/
. Nella barra di navigazione, scegli Log.
Scegli il gruppo di log MyApplication/aws/kinesis-analytics/.
Quindi scegli Elimina gruppo di log e conferma l'eliminazione.
Passaggi successivi
Ora che hai creato ed eseguito un'applicazione di base del servizio gestito per Apache Flink che trasforma i dati utilizzando Apache Beam, consulta la seguente applicazione come esempio di una soluzione del servizio gestito per Apache Flink più avanzata.
Workshop in streaming su Beam nel servizio gestito per Apache Flink
: in questo workshop, viene esplorato un esempio completo che combina aspetti di batch e streaming in un'unica pipeline Apache Beam uniforme.