Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erste Schritte (Scala)
Anmerkung
Ab Version 1.15 ist Flink Scala-frei. Anwendungen können jetzt Java API von jeder Scala-Version aus verwenden. Flink verwendet Scala intern immer noch in einigen Schlüsselkomponenten, macht Scala jedoch nicht im Benutzercode-Classloader verfügbar. Aus diesem Grund müssen Sie Scala-Abhängigkeiten zu Ihren -Archiven hinzufügen. JAR
Weitere Informationen zu den Scala-Änderungen in Flink 1.15 finden Sie unter Scalafrei in One Fifteen.
In dieser Übung erstellen Sie eine Managed Service for Apache Flink-Anwendung für Scala mit einem Kinesis-Stream als Quelle und Senke.
Dieses Thema enthält die folgenden Abschnitte:
- Erstellen Sie abhängige Ressourcen
- Schreiben Sie Beispieldatensätze in den Eingabestream
- Laden Sie den Anwendungscode herunter und überprüfen Sie ihn
- Kompilieren Sie den Anwendungscode und laden Sie ihn hoch
- Erstellen Sie die Anwendung (Konsole) und führen Sie sie aus
- Erstellen Sie die Anwendung und führen Sie sie aus (CLI)
- AWS Ressourcen bereinigen
Erstellen Sie abhängige Ressourcen
Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:
Zwei Kinesis Streams für Eingaben und Ausgaben.
Einen Amazon S3-Bucket zum Speichern des Anwendungscodes (
ka-app-code-
)<username>
Sie können die Kinesis Streams und den Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressourcen finden Sie in den folgenden Themen:
Data Streams erstellen und aktualisieren im Amazon Kinesis Data Streams Entwicklerleitfaden. Benennen Sie Ihre Data Streams
ExampleInputStream
undExampleOutputStream
.So erstellen Sie die Daten-Streams (AWS CLI)
Verwenden Sie den folgenden Amazon Kinesis Kinesis-Befehl create-stream, um den ersten Stream (
ExampleInputStream
) zu erstellen AWS CLI .aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern den Stream-Namen in
ExampleOutputStream
.aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Wie erstelle ich einen S3-Bucket? im Amazon Simple Storage Service Benutzerhandbuch. Geben Sie dem Amazon S3-Bucket einen global eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen, z. B.
ka-app-code-
.<username>
Sonstige Ressourcen
Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:
Eine Protokollgruppe mit dem Namen
/AWS/KinesisAnalytics-java/MyApplication
Einen Protokollstream mit dem Namen
kinesis-analytics-log-stream
Schreiben Sie Beispieldatensätze in den Eingabestream
In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.
Anmerkung
Dieser Abschnitt erfordert AWS SDK for Python (Boto)
Anmerkung
Das Python-Skript in diesem Abschnitt verwendet die AWS CLI. Sie müssen Ihren so konfigurieren AWS CLI , dass er Ihre Kontoanmeldeinformationen und Ihre Standardregion verwendet. Geben Sie Folgendes ein AWS CLI, um Ihre zu konfigurieren:
aws configure
-
Erstellen Sie eine Datei
stock.py
mit dem folgenden Inhalt:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Führen Sie das
stock.py
Skript aus:$ python stock.py
Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen.
Laden Sie den Anwendungscode herunter und überprüfen Sie ihn
Der Python-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:
Installieren Sie den Git-Client, wenn Sie dies noch nicht getan haben. Weitere Informationen finden Sie unter Git installieren
. Klonen Sie das Remote-Repository mit dem folgenden Befehl:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Navigieren Sie zum
amazon-kinesis-data-analytics-java-examples/scala/GettingStarted
Verzeichnis .
Beachten Sie Folgendes zum Anwendungscode:
Eine
build.sbt
-Datei enthält Informationen über die Konfiguration und Abhängigkeiten der Anwendung, einschließlich der Bibliotheken des Managed Service für Apache Flink.Die
BasicStreamingJob.scala
-Datei enthält die Hauptmethode, die die Funktionalität der Anwendung definiert.Die Anwendung verwendet eine Kinesis-Quelle zum Lesen aus dem Quell-Stream. Der folgende Codeausschnitt erstellt die Kinesis-Quelle:
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }
Die Anwendung verwendet auch eine Kinesis-Senke, um in den Ergebnisstream zu schreiben. Der folgende Codeausschnitt erstellt die Kinesis-Senke:
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
Die Anwendung erstellt Quell- und Senken-Konnektoren, um mithilfe eines StreamExecutionEnvironment Objekts auf externe Ressourcen zuzugreifen.
Die Anwendung erstellt Quell- und Senkenkonnektoren mit dynamischen Anwendungseigenschaften. Die Laufzeiteigenschaften der Anwendung werden gelesen, um die Konnektoren zu konfigurieren. Weitere Informationen zu Laufzeiteigenschaften finden Sie unter Laufzeiteigenschaften.
Kompilieren Sie den Anwendungscode und laden Sie ihn hoch
In diesem Abschnitt kompilieren Sie Ihren Anwendungscode und laden ihn in den Amazon-S3-Bucket hoch, den Sie im Abschnitt Erstellen Sie abhängige Ressourcen erstellt haben.
Kompilieren des Anwendungscodes
In diesem Abschnitt verwenden Sie das SBT
Um Ihren Anwendungscode zu verwenden, kompilieren Sie ihn und packen ihn in eine JAR Datei. Sie können Ihren Code kompilieren und verpacken mitSBT:
sbt assembly
-
Wenn die Anwendung erfolgreich kompiliert wurde, wird die folgende Datei erstellt:
target/scala-3.2.0/getting-started-scala-1.0.jar
Hochladen des Apache Flink-Streaming-Scala-Codes
In diesem Abschnitt erstellen Sie einen Amazon S3-Bucket und laden Ihren Anwendungscode hoch.
Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/
. Wählen Sie Bucket erstellen aus
Geben Sie
ka-app-code-<username>
im Feld Bucket-Name ein. Fügen Sie dem Bucket-Namen ein Suffix hinzu, wie z. B. Ihren Benutzernamen, damit er global eindeutig ist. Wählen Sie Weiter.Lassen Sie im Schritt Optionen konfigurieren die Einstellungen unverändert und klicken Sie auf Weiter.
Lassen Sie im Schritt Berechtigungen festlegen die Einstellungen unverändert und klicken Sie auf Weiter.
Wählen Sie Bucket erstellen aus.
Wählen Sie den Bucket
ka-app-code-<username>
und dann Hochladen aus.-
Klicken Sie im Schritt Auswählen von Dateien auf Hinzufügen von Dateien. Navigieren Sie zu der
getting-started-scala-1.0.jar
Datei, die Sie im vorherigen Schritt erstellt haben. Sie müssen keine der Einstellungen für das Objekt ändern. Wählen Sie daher Hochladen.
Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.