Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Eine Managed Service für Apache Flink for Python-Anwendung erstellen und ausführen
In diesem Abschnitt erstellen Sie eine Managed Service for Apache Flink-Anwendung für Python mit einem Kinesis-Stream als Quelle und Senke.
Dieser Abschnitt enthält die folgenden Schritte.
- Erstellen Sie abhängige Ressourcen
- Einrichten der lokalen Entwicklungsumgebung
- Laden Sie den Apache Flink-Streaming-Python-Code herunter und untersuchen Sie ihn
- Abhängigkeiten verwalten JAR
- Schreiben Sie Beispieldatensätze in den Eingabestream
- Führen Sie Ihre Anwendung lokal aus
- Beobachten Sie Eingabe- und Ausgabedaten in Kinesis-Streams
- Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
- Package Sie Ihren Anwendungscode
- Laden Sie das Anwendungspaket in einen Amazon S3 S3-Bucket hoch
- Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
- Nächster Schritt
Erstellen Sie abhängige Ressourcen
Bevor Sie für diese Übung einen Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:
-
Zwei Kinesis Streams für Eingaben und Ausgaben.
-
Ein Amazon S3 S3-Bucket zum Speichern des Anwendungscodes.
Anmerkung
In diesem Tutorial wird davon ausgegangen, dass Sie Ihre Anwendung in der Region us-east-1 bereitstellen. Wenn Sie eine andere Region verwenden, müssen Sie alle Schritte entsprechend anpassen.
Zwei Kinesis-Streams erstellen
Bevor Sie für diese Übung eine Managed Service for Apache Flink-Anwendung erstellen, erstellen Sie zwei Kinesis-Datenströme (ExampleInputStream
undExampleOutputStream
) in derselben Region, die Sie für die Bereitstellung Ihrer Anwendung verwenden werden (in diesem Beispiel us-east-1). Ihre Anwendung verwendet diese Streams für die Quell- und Ziel-Streams der Anwendung.
Sie können diese Streams mithilfe der Amazon-Kinesis-Konsole oder des folgenden AWS CLI -Befehls erstellen. Anweisungen für die Konsole finden Sie unter Erstellen und Aktualisieren von Datenströmen im Amazon Kinesis Data Streams Entwicklerhandbuch.
So erstellen Sie die Daten-Streams (AWS CLI)
-
Verwenden Sie den folgenden Amazon Kinesis
create-stream
AWS CLI Kinesis-Befehl, um den ersten Stream (ExampleInputStream
) zu erstellen.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
-
Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern den Stream-Namen in
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
Erstellen eines Amazon-S3-Buckets
Sie können ein Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressource finden Sie in den folgenden Themen:
-
Wie erstelle ich einen S3-Bucket? im Amazon Simple Storage Service-Benutzerhandbuch. Geben Sie dem Amazon S3 S3-Bucket einen weltweit eindeutigen Namen, indem Sie beispielsweise Ihren Anmeldenamen anhängen.
Anmerkung
Stellen Sie sicher, dass Sie den S3-Bucket in der Region erstellen, die Sie für dieses Tutorial verwenden (us-east-1).
Sonstige Ressourcen
Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:
-
Eine Protokollgruppe namens
/AWS/KinesisAnalytics-java/<my-application>
. -
Einen Protokollstream mit dem Namen
kinesis-analytics-log-stream
.
Einrichten der lokalen Entwicklungsumgebung
Für die Entwicklung und das Debuggen können Sie die Python Flink-Anwendung auf Ihrem Computer ausführen. Sie können die Anwendung von der Befehlszeile aus mit python
main.py
oder in einem Python IDE Ihrer Wahl starten.
Anmerkung
Auf Ihrem Entwicklungscomputer müssen Python 3.10 oder 3.11, Java 11, Apache Maven und Git installiert sein. Wir empfehlen, dass Sie einen IDE solchen Code PyCharm
Installieren Sie die PyFlink Bibliothek
Um Ihre Anwendung zu entwickeln und lokal auszuführen, müssen Sie die Flink-Python-Bibliothek installieren.
-
Erstellen Sie eine eigenständige Python-Umgebung mit VirtualEnv Conda oder einem ähnlichen Python-Tool.
-
Installieren Sie die PyFlink Bibliothek in dieser Umgebung. Verwenden Sie dieselbe Apache Flink-Laufzeitversion, die Sie in Amazon Managed Service für Apache Flink verwenden werden. Derzeit ist die empfohlene Laufzeit 1.19.1.
$ pip install apache-flink==1.19.1
-
Stellen Sie sicher, dass die Umgebung aktiv ist, wenn Sie Ihre Anwendung ausführen. Wenn Sie die Anwendung in der ausführenIDE, stellen Sie sicher, IDE dass die Umgebung als Laufzeit verwendet. Der Prozess hängt von dem abIDE, den Sie verwenden.
Anmerkung
Sie müssen nur die PyFlink Bibliothek installieren. Sie müssen keinen Apache Flink-Cluster auf Ihrem Computer installieren.
Authentifizieren Sie Ihre Sitzung AWS
Die Anwendung verwendet Kinesis-Datenströme, um Daten zu veröffentlichen. Bei der lokalen Ausführung benötigen Sie eine gültige AWS authentifizierte Sitzung mit Schreibberechtigungen in den Kinesis-Datenstrom. Verwenden Sie die folgenden Schritte, um Ihre Sitzung zu authentifizieren:
-
Wenn Sie das Profil AWS CLI und ein benanntes Profil mit gültigen Anmeldeinformationen nicht konfiguriert haben, finden Sie weitere Informationen unter. Richten Sie das AWS Command Line Interface (AWS CLI) ein
-
Vergewissern Sie sich, dass Ihre korrekt konfiguriert AWS CLI ist und dass Ihre Benutzer über Schreibberechtigungen in den Kinesis-Datenstrom verfügen, indem Sie den folgenden Testdatensatz veröffentlichen:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Wenn Sie IDE über ein Plug-in zur Integration verfügen AWS, können Sie es verwenden, um die Anmeldeinformationen an die Anwendung zu übergeben, die in der IDE ausgeführt wird. Weitere Informationen finden Sie unter AWS Toolkit für PyCharm
, AWS Toolkit for Visual Studio Code und AWS Toolkit für IntelliJ. IDEA
Laden Sie den Apache Flink-Streaming-Python-Code herunter und untersuchen Sie ihn
Der Python-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:
-
Klonen Sie das Remote-Repository, indem Sie den folgenden Befehl verwenden:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Navigieren Sie zum
./python/GettingStarted
Verzeichnis .
Überprüfen Sie die Anwendungskomponenten
Der Anwendungscode befindet sich inmain.py
. Wir verwenden SQL Embedded in Python, um den Ablauf der Anwendung zu definieren.
Anmerkung
Für ein optimiertes Entwicklererlebnis ist die Anwendung so konzipiert, dass sie ohne Codeänderungen sowohl auf Amazon Managed Service für Apache Flink als auch lokal für die Entwicklung auf Ihrem Computer ausgeführt werden kann. Die Anwendung verwendet die UmgebungsvariableIS_LOCAL =
true
, um zu erkennen, wann sie lokal ausgeführt wird. Sie müssen die Umgebungsvariable IS_LOCAL = true
entweder in Ihrer Shell oder in der Run-Konfiguration Ihres festlegenIDE.
-
Die Anwendung richtet die Ausführungsumgebung ein und liest die Laufzeitkonfiguration. Um sowohl auf Amazon Managed Service for Apache Flink als auch lokal zu funktionieren, überprüft die Anwendung die
IS_LOCAL
Variable.-
Folgendes ist das Standardverhalten, wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird:
-
Lädt die in der Anwendung enthaltenen Abhängigkeiten. Weitere Informationen finden Sie unter (Link)
-
Laden Sie die Konfiguration aus den Runtime-Eigenschaften, die Sie in der Anwendung Amazon Managed Service for Apache Flink definieren. Weitere Informationen finden Sie unter (Link)
-
-
Wenn die Anwendung erkennt
IS_LOCAL = true
, dass Sie Ihre Anwendung lokal ausführen:-
Lädt externe Abhängigkeiten aus dem Projekt.
-
Lädt die Konfiguration aus der im Projekt enthaltenen
application_properties.json
Datei.... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
Die Anwendung definiert mithilfe des Kinesis Connectors
eine Quelltabelle mit einer CREATE TABLE
Anweisung. Diese Tabelle liest Daten aus dem Kinesis-Eingabestream. Die Anwendung verwendet den Namen des Streams, die Region und die Anfangsposition aus der Laufzeitkonfiguration.table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
-
Die Anwendung definiert in diesem Beispiel auch eine Sinktabelle mithilfe des Kinesis Connectors
. Diese Geschichte sendet Daten an den Kinesis-Ausgabestream. table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
-
Schließlich führt die Anwendung aus, SQL dass die Sink-Tabelle aus
INSERT INTO...
der Quelltabelle stammt. In einer komplexeren Anwendung müssen Sie wahrscheinlich zusätzliche Schritte ausführen, um Daten zu transformieren, bevor Sie in die Datensenke schreiben.table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
-
Sie müssen am Ende der
main()
Funktion einen weiteren Schritt hinzufügen, um die Anwendung lokal auszuführen:if is_local: table_result.wait()
Ohne diese Anweisung wird die Anwendung sofort beendet, wenn Sie sie lokal ausführen. Sie dürfen diese Anweisung nicht ausführen, wenn Sie Ihre Anwendung in Amazon Managed Service for Apache Flink ausführen.
Abhängigkeiten verwalten JAR
Eine PyFlink Anwendung benötigt normalerweise einen oder mehrere Konnektoren. Die Anwendung in diesem Tutorial verwendet den Kinesis Connector
In diesem Beispiel zeigen wir, wie Sie Apache Maven verwenden, um die Abhängigkeiten abzurufen und die Anwendung so zu verpacken, dass sie auf Managed Service für Apache Flink ausgeführt wird.
Anmerkung
Es gibt alternative Möglichkeiten, Abhängigkeiten abzurufen und zu paketieren. Dieses Beispiel zeigt eine Methode, die mit einem oder mehreren Konnektoren korrekt funktioniert. Außerdem können Sie die Anwendung lokal, zu Entwicklungszwecken und auf Managed Service für Apache Flink ohne Codeänderungen ausführen.
Verwenden Sie die Datei pom.xml
Apache Maven verwendet die pom.xml
Datei, um Abhängigkeiten und das Paketieren von Anwendungen zu kontrollieren.
Alle JAR Abhängigkeiten sind in der pom.xml
Datei im <dependencies>...</dependencies>
Block angegeben.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
Informationen zum richtigen Artefakt und zur richtigen Version des zu verwendenden Konnektors finden Sie unterVerwenden Sie Apache Flink-Konnektoren mit Managed Service für Apache Flink. Vergewissern Sie sich, dass Sie sich auf die Version von Apache Flink beziehen, die Sie verwenden. In diesem Beispiel verwenden wir den Kinesis-Konnektor. Für Apache Flink 1.19 lautet die Connector-Version. 4.3.0-1.19
Anmerkung
Wenn Sie Apache Flink 1.19 verwenden, gibt es keine Connector-Version, die speziell für diese Version veröffentlicht wurde. Verwenden Sie die für 1.18 veröffentlichten Konnektoren.
Abhängigkeiten herunterladen und verpacken
Verwenden Sie Maven, um die in der pom.xml
Datei definierten Abhängigkeiten herunterzuladen und sie für die Python-Flink-Anwendung zu verpacken.
-
Navigieren Sie zu dem Verzeichnis, das das Python-Projekt Getting Started namens enthält
python/GettingStarted
. -
Führen Sie den folgenden Befehl aus:
$ mvn package
Maven erstellt eine neue Datei namens./target/pyflink-dependencies.jar
. Wenn Sie lokal auf Ihrem Computer entwickeln, sucht die Python-Anwendung nach dieser Datei.
Anmerkung
Wenn Sie vergessen, diesen Befehl auszuführen, schlägt er bei dem Versuch, Ihre Anwendung auszuführen, mit der folgenden Fehlermeldung fehl: Es konnte keine Factory für den Bezeichner „kinesis“ gefunden werden.
Schreiben Sie Beispieldatensätze in den Eingabestream
In diesem Abschnitt senden Sie Beispieldatensätze an den Stream, damit die Anwendung sie verarbeiten kann. Sie haben zwei Möglichkeiten, Beispieldaten zu generieren, entweder mit einem Python-Skript oder mit dem Kinesis Data Generator
Generieren Sie Beispieldaten mit einem Python-Skript
Sie können ein Python-Skript verwenden, um Beispieldatensätze an den Stream zu senden.
Anmerkung
Um dieses Python-Skript auszuführen, müssen Sie Python 3.x verwenden und die AWS SDKfor Python (Boto)
Um mit dem Senden von Testdaten an den Kinesis-Eingabestream zu beginnen:
-
Laden Sie das
stock.py
Python-Skript für den Datengenerator aus dem GitHub Datengenerator-Repositoryherunter. -
Führen Sie das
stock.py
Skript aus:$ python stock.py
Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen. Sie können jetzt Ihre Apache Flink-Anwendung ausführen.
Generieren Sie Beispieldaten mit Kinesis Data Generator
Alternativ zur Verwendung des Python-Skripts können Sie Kinesis Data Generator
So richten Sie Kinesis Data Generator ein und führen ihn aus:
-
Folgen Sie den Anweisungen in der Kinesis Data Generator-Dokumentation
, um den Zugriff auf das Tool einzurichten. Sie werden eine AWS CloudFormation Vorlage ausführen, die einen Benutzer und ein Passwort einrichtet. -
Greifen Sie über die von der CloudFormation Vorlage URL generierte Datei auf Kinesis Data Generator zu. Sie finden das auf URL der Registerkarte Ausgabe, nachdem die CloudFormation Vorlage fertiggestellt wurde.
-
Konfigurieren Sie den Datengenerator:
-
Region: Wählen Sie die Region aus, die Sie für dieses Tutorial verwenden: us-east-1
-
Stream/Delivery-Stream: Wählen Sie den Eingabestream aus, den die Anwendung verwenden soll:
ExampleInputStream
-
Datensätze pro Sekunde: 100
-
Datensatzvorlage: Kopieren Sie die folgende Vorlage und fügen Sie sie ein:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Testen Sie die Vorlage: Wählen Sie Testvorlage und stellen Sie sicher, dass der generierte Datensatz dem folgenden ähnelt:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Starten Sie den Datengenerator: Wählen Sie Select Send Data.
Kinesis Data Generator sendet jetzt Daten an denExampleInputStream
.
Führen Sie Ihre Anwendung lokal aus
Sie können die Anwendung lokal testen, indem Sie sie von der Befehlszeile aus mit python main.py
oder von Ihrem aus ausführenIDE.
Um Ihre Anwendung lokal ausführen zu können, muss die richtige Version der PyFlink Bibliothek installiert sein, wie im vorherigen Abschnitt beschrieben. Weitere Informationen finden Sie unter (Link)
Anmerkung
Bevor Sie fortfahren, stellen Sie sicher, dass die Eingabe- und Ausgabestreams verfügbar sind. Siehe Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams. Stellen Sie außerdem sicher, dass Sie über Lese- und Schreibberechtigungen für beide Streams verfügen. Siehe Authentifizieren Sie Ihre Sitzung AWS.
Importiere das Python-Projekt in dein IDE
Um mit der Arbeit an der Anwendung in Ihrem zu beginnenIDE, müssen Sie sie als Python-Projekt importieren.
Das Repository, das Sie geklont haben, enthält mehrere Beispiele. Jedes Beispiel ist ein separates Projekt. Importieren Sie für dieses Tutorial den Inhalt im ./python/GettingStarted
Unterverzeichnis in IhrIDE.
Importieren Sie den Code als vorhandenes Python-Projekt.
Anmerkung
Der genaue Vorgang zum Importieren eines neuen Python-Projekts hängt davon ab, IDE welches Sie verwenden.
Überprüfen Sie die lokale Anwendungskonfiguration
Bei der lokalen Ausführung verwendet die Anwendung die Konfiguration in der application_properties.json
Datei im Ressourcenordner des Projekts unter./src/main/resources
. Sie können diese Datei bearbeiten, um verschiedene Kinesis-Stream-Namen oder -Regionen zu verwenden.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Führen Sie Ihre Python-Anwendung lokal aus
Sie können Ihre Anwendung lokal ausführen, entweder über die Befehlszeile als normales Python-Skript oder überIDE.
Um Ihre Anwendung von der Befehlszeile aus auszuführen
-
Stellen Sie sicher, dass die eigenständige Python-Umgebung wie Conda oder VirtualEnv in der Sie die Python-Flink-Bibliothek installiert haben, derzeit aktiv ist.
-
Stellen Sie sicher, dass Sie
mvn package
mindestens einmal ausgeführt haben. -
Legen Sie die
IS_LOCAL = true
-Umgebungsvariable fest:$ export IS_LOCAL=true
-
Führen Sie die Anwendung als normales Python-Skript aus.
$python main.py
Um die Anwendung aus dem heraus auszuführen IDE
-
Konfigurieren Sie IDE Ihren so, dass das
main.py
Skript mit der folgenden Konfiguration ausgeführt wird:-
Verwenden Sie die eigenständige Python-Umgebung wie Conda oder den VirtualEnv Ort, an dem Sie die PyFlink Bibliothek installiert haben.
-
Verwenden Sie die AWS Anmeldeinformationen, um auf die Eingabe- und Ausgabe-Kinesis-Datenstreams zuzugreifen.
-
Set
IS_LOCAL = true
.
-
-
Der genaue Vorgang zum Einstellen der Laufkonfiguration hängt von Ihnen ab IDE und ist unterschiedlich.
-
Wenn Sie Ihre eingerichtet habenIDE, führen Sie das Python-Skript aus und verwenden Sie die von Ihnen bereitgestellten Tools, IDE während die Anwendung ausgeführt wird.
Überprüfen Sie die Anwendungsprotokolle lokal
Wenn die Anwendung lokal ausgeführt wird, zeigt sie kein Protokoll in der Konsole an, mit Ausnahme einiger Zeilen, die beim Start der Anwendung gedruckt und angezeigt werden. PyFlink schreibt Protokolle in eine Datei in dem Verzeichnis, in dem die Python-Flink-Bibliothek installiert ist. Die Anwendung gibt beim Start den Speicherort der Protokolle aus. Sie können auch den folgenden Befehl ausführen, um die Protokolle zu finden:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
Listet die Dateien im Logging-Verzeichnis auf. Normalerweise finden Sie eine einzelne
.log
Datei. -
Speichern Sie die Datei, während die Anwendung läuft:
tail -f <log-path>/<log-file>.log
.
Beobachten Sie Eingabe- und Ausgabedaten in Kinesis-Streams
Sie können Datensätze beobachten, die vom (generierenden Beispiel-Python) oder dem Kinesis Data Generator (Link) an den Eingabestream gesendet wurden, indem Sie den Data Viewer in der Amazon Kinesis Kinesis-Konsole verwenden.
Um Aufzeichnungen zu beobachten:
Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
Stoppen Sie die Anwendung, die in Ihrem läuftIDE. Das bietet IDE normalerweise eine „Stopp“ -Option. Der genaue Standort und die Methode hängen von der abIDE.
Package Sie Ihren Anwendungscode
In diesem Abschnitt verwenden Sie Apache Maven, um den Anwendungscode und alle erforderlichen Abhängigkeiten in einer ZIP-Datei zu verpacken.
Führen Sie den Maven-Paketbefehl erneut aus:
$ mvn package
Dieser Befehl generiert die Dateitarget/managed-flink-pyflink-getting-started-1.0.0.zip
.
Laden Sie das Anwendungspaket in einen Amazon S3 S3-Bucket hoch
In diesem Abschnitt laden Sie die .zip-Datei, die Sie im vorherigen Abschnitt erstellt haben, in den Amazon Simple Storage Service (Amazon S3) -Bucket hoch, den Sie zu Beginn dieses Tutorials erstellt haben. Wenn Sie diesen Schritt noch nicht abgeschlossen haben, finden Sie weitere Informationen unter (Link).
Um die JAR Anwendungscodedatei hochzuladen
Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/
. -
Wählen Sie den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben.
-
Klicken Sie auf Hochladen.
-
Klicken Sie auf Add files.
-
Navigieren Sie zu der im vorherigen Schritt generierten ZIP-Datei:
target/managed-flink-pyflink-getting-started-1.0.0.zip
. -
Wählen Sie Hochladen, ohne andere Einstellungen zu ändern.
Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
Sie können eine Managed Service for Apache Flink-Anwendung entweder mit der Konsole oder der erstellen und konfigurieren. AWS CLI Für dieses Tutorial verwenden wir die Konsole.
Erstellen der Anwendung
Öffnen Sie die Managed Service for Apache Flink-Konsole unter https://console.aws.amazon.com /flink
-
Stellen Sie sicher, dass die richtige Region ausgewählt ist: USA Ost (Nord-Virginia) us-east-1.
-
Öffnen Sie das Menü auf der rechten Seite und wählen Sie Apache Flink-Anwendungen und dann Streaming-Anwendung erstellen. Wählen Sie alternativ im Bereich Erste Schritte auf der Startseite die Option Streaming-Anwendung erstellen aus.
-
Gehen Sie auf der Seite Streaming-Anwendungen erstellen wie folgt vor:
-
Wählen Sie unter Methode zum Einrichten der Streamverarbeitungsanwendung die Option Von Grund auf neu erstellen aus.
-
Wählen Sie für Apache Flink-Konfiguration, Application Flink-Version, Apache Flink 1.19.
-
Für die Anwendungskonfiguration:
-
Geben Sie als Anwendungsname ein
MyApplication
. -
Geben Sie für Beschreibung den Text
My Python test app
ein. -
Wählen Sie unter Zugriff auf Anwendungsressourcen die Option Create/update IAM role kinesis-analytics- MyApplication -us-east-1 with required policies aus.
-
-
Für die Einstellungen „Vorlage für Anwendungen“:
-
Wählen Sie für Vorlagen die Option Entwicklung aus.
-
-
Wählen Sie Streaming-Anwendung erstellen aus.
-
Anmerkung
Wenn Sie mithilfe der Konsole eine Managed Service for Apache Flink-Anwendung erstellen, haben Sie die Möglichkeit, eine IAM Rolle und Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM Ressourcen werden anhand Ihres Anwendungsnamens und Ihrer Region wie folgt benannt:
-
Richtlinie:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rolle:
kinesisanalytics-
MyApplication
-us-west-2
Amazon Managed Service für Apache Flink war früher als Kinesis Data Analytics bekannt. Dem Namen der Ressourcen, die automatisch generiert werden, wird aus kinesis-analytics
Gründen der Abwärtskompatibilität ein Präfix vorangestellt.
Bearbeiten Sie die Richtlinie IAM
Bearbeiten Sie die IAM Richtlinie, um Berechtigungen für den Zugriff auf den Amazon S3 S3-Bucket hinzuzufügen.
Um die IAM Richtlinie zu bearbeiten, um S3-Bucket-Berechtigungen hinzuzufügen
Öffnen Sie die IAM Konsole unter https://console.aws.amazon.com/iam/
. -
Wählen Sie Policies (Richtlinien). Wählen Sie die
kinesis-analytics-service-MyApplication-us-east-1
-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. -
Wählen Sie Bearbeiten und dann die JSONRegisterkarte.
-
Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (
012345678901
) durch Ihre Konto-ID.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Wählen Sie Weiter und dann Änderungen speichern aus.
Konfigurieren Sie die Anwendung
Bearbeiten Sie die Anwendungskonfiguration, um das Anwendungscode-Artefakt festzulegen.
Konfigurieren der Anwendung
-
Wählen Sie auf der MyApplicationSeite Configure aus.
-
Gehen Sie im Abschnitt Speicherort des Anwendungscodes wie folgt vor:
-
Wählen Sie für Amazon S3 S3-Bucket den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben. Wählen Sie Durchsuchen und wählen Sie den richtigen Bucket aus. Wählen Sie dann Auswählen. Wählen Sie nicht den Bucket-Namen aus.
-
Geben Sie als Pfad zum Amazon-S3-Objekt den Wert
managed-flink-pyflink-getting-started-1.0.0.zip
ein.
-
-
Wählen Sie für Zugriffsberechtigungen die Option IAMRolle
kinesis-analytics-MyApplication-us-east-1
mit den erforderlichen Richtlinien erstellen/aktualisieren aus. -
Wechseln Sie zu den Runtime-Eigenschaften und behalten Sie die Standardwerte für alle anderen Einstellungen bei.
-
Wählen Sie Neues Element hinzufügen und fügen Sie jeden der folgenden Parameter hinzu:
Gruppen-ID Schlüssel Wert InputStream0
stream.name
ExampleInputStream
InputStream0
flink.stream.initpos
LATEST
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
kinesis.analytics.flink.run.options
python
main.py
kinesis.analytics.flink.run.options
jarfile
lib/pyflink-dependencies.jar
-
Ändern Sie keinen der anderen Abschnitte und wählen Sie Änderungen speichern.
Anmerkung
Wenn Sie sich dafür entscheiden, die CloudWatch Amazon-Protokollierung zu aktivieren, erstellt Managed Service für Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:
-
Protokollgruppe:
/aws/kinesis-analytics/MyApplication
-
Protokollstream:
kinesis-analytics-log-stream
Führen Sie die Anwendung aus.
Die Anwendung ist jetzt konfiguriert und kann ausgeführt werden.
Ausführen der Anwendung
-
Wählen Sie auf der Konsole für Amazon Managed Service für Apache Flink My Application und anschließend Run aus.
-
Wählen Sie auf der nächsten Seite, der Konfigurationsseite für die Anwendungswiederherstellung, die Option Mit neuestem Snapshot ausführen und anschließend Ausführen aus.
Der Status in den Anwendungsdetails wechselt von
Ready
zuStarting
und dann zu demRunning
Zeitpunkt, an dem die Anwendung gestartet wurde.
Wenn sich die Anwendung im Running
Status befindet, können Sie jetzt das Flink-Dashboard öffnen.
So öffnen Sie das -Dashboard
-
Wählen Sie Apache Flink-Dashboard öffnen. Das Dashboard wird auf einer neuen Seite geöffnet.
-
Wählen Sie in der Liste „Laufende Jobs“ den einzelnen Job aus, den Sie sehen können.
Anmerkung
Wenn Sie die Runtime-Eigenschaften festgelegt oder die IAM Richtlinien falsch bearbeitet haben, ändert sich der Anwendungsstatus möglicherweise in
Running
, aber das Flink-Dashboard zeigt an, dass der Job kontinuierlich neu gestartet wird. Dies ist ein häufiges Fehlerszenario, wenn die Anwendung falsch konfiguriert ist oder keine Berechtigungen für den Zugriff auf die externen Ressourcen hat.In diesem Fall überprüfen Sie im Flink-Dashboard auf der Registerkarte Ausnahmen die Ursache des Problems.
Beobachten Sie die Metriken der laufenden Anwendung
Auf der MyApplicationSeite, im Abschnitt CloudWatch Amazon-Metriken, können Sie einige der grundlegenden Metriken der laufenden Anwendung sehen.
Um die Metriken einzusehen
-
Wählen Sie neben der Schaltfläche „Aktualisieren“ in der Dropdownliste die Option 10 Sekunden aus.
-
Wenn die Anwendung läuft und fehlerfrei ist, können Sie sehen, dass die Uptime-Metrik kontinuierlich zunimmt.
-
Die Metrik für vollständige Neustarts sollte Null sein. Wenn sie zunimmt, kann es bei der Konfiguration zu Problemen kommen. Um das Problem zu untersuchen, überprüfen Sie den Tab Ausnahmen im Flink-Dashboard.
-
Die Metrik „Anzahl fehlgeschlagener Checkpoints“ sollte in einer fehlerfreien Anwendung Null sein.
Anmerkung
Dieses Dashboard zeigt einen festen Satz von Metriken mit einer Granularität von 5 Minuten. Sie können ein benutzerdefiniertes Anwendungs-Dashboard mit beliebigen Metriken im CloudWatch Dashboard erstellen.
Beobachten Sie die Ausgabedaten in Kinesis-Streams
Vergewissern Sie sich, dass Sie weiterhin Daten in der Eingabe veröffentlichen, entweder mit dem Python-Skript oder dem Kinesis Data Generator.
Sie können jetzt die Ausgabe der Anwendung beobachten, die auf Managed Service for Apache Flink ausgeführt wird, indem Sie den Datenviewer in der verwenden https://console.aws.amazon.com/kinesis/
Um die Ausgabe anzusehen
Öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.
-
Stellen Sie sicher, dass die Region mit der Region übereinstimmt, die Sie für die Ausführung dieses Tutorials verwenden. Standardmäßig ist es US-East-1US East (Nord-Virginia). Ändern Sie bei Bedarf die Region.
-
Wählen Sie Data Streams aus.
-
Wählen Sie den Stream aus, den Sie beobachten möchten. Verwenden Sie für dieses Tutorial
ExampleOutputStream
. -
Wählen Sie die Registerkarte Datenanzeige.
-
Wählen Sie einen beliebigen Shard aus, behalten Sie „Letzte“ als Startposition bei und wählen Sie dann „Datensätze abrufen“. Möglicherweise wird die Fehlermeldung „Für diese Anfrage wurde kein Datensatz gefunden“ angezeigt. Wenn ja, wählen Sie „Erneut versuchen, Datensätze abzurufen“. Die neuesten im Stream veröffentlichten Datensätze werden angezeigt.
-
Wählen Sie den Wert in der Datenspalte aus, um den Inhalt des Datensatzes im JSON Format zu überprüfen.
Beenden Sie die Anwendung
Um die Anwendung zu beenden, rufen Sie die Konsolenseite der Anwendung Managed Service for Apache Flink mit dem Namen auf. MyApplication
So stoppen Sie die Anwendung
-
Wählen Sie in der Dropdownliste Aktion die Option Stopp aus.
-
Der Status in den Anwendungsdetails wechselt von
Running
zu und dann zu demReady
ZeitpunktStopping
, an dem die Anwendung vollständig gestoppt wurde.Anmerkung
Vergessen Sie nicht, auch das Senden von Daten aus dem Python-Skript oder dem Kinesis Data Generator an den Eingabestream zu beenden.