Erste Schritte (Scala) - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erste Schritte (Scala)

Anmerkung

Ab Version 1.15 ist Flink Scala-frei. Anwendungen können jetzt Java API von jeder Scala-Version aus verwenden. Flink verwendet Scala intern immer noch in einigen Schlüsselkomponenten, macht Scala jedoch nicht im Benutzercode-Classloader verfügbar. Aus diesem Grund müssen Sie Scala-Abhängigkeiten zu Ihren -Archiven hinzufügen. JAR

Weitere Informationen zu den Scala-Änderungen in Flink 1.15 finden Sie unter Scalafrei in One Fifteen.

In dieser Übung erstellen Sie eine Managed Service for Apache Flink-Anwendung für Scala mit einem Kinesis-Stream als Quelle und Senke.

Erstellen Sie abhängige Ressourcen

Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:

  • Zwei Kinesis Streams für Eingaben und Ausgaben.

  • Einen Amazon S3-Bucket zum Speichern des Anwendungscodes (ka-app-code-<username>)

Sie können die Kinesis Streams und den Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressourcen finden Sie in den folgenden Themen:

  • Data Streams erstellen und aktualisieren im Amazon Kinesis Data Streams Entwicklerleitfaden. Benennen Sie Ihre Data Streams ExampleInputStream und ExampleOutputStream.

    So erstellen Sie die Daten-Streams (AWS CLI)

    • Verwenden Sie den folgenden Amazon Kinesis Kinesis-Befehl create-stream, um den ersten Stream (ExampleInputStream) zu erstellen AWS CLI .

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern den Stream-Namen in ExampleOutputStream.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Wie erstelle ich einen S3-Bucket? im Amazon Simple Storage Service Benutzerhandbuch. Geben Sie dem Amazon S3-Bucket einen global eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen, z. B. ka-app-code-<username>.

Sonstige Ressourcen

Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:

  • Eine Protokollgruppe mit dem Namen /AWS/KinesisAnalytics-java/MyApplication

  • Einen Protokollstream mit dem Namen kinesis-analytics-log-stream

Schreiben Sie Beispieldatensätze in den Eingabestream

In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.

Anmerkung

Dieser Abschnitt erfordert AWS SDK for Python (Boto).

Anmerkung

Das Python-Skript in diesem Abschnitt verwendet die AWS CLI. Sie müssen Ihren so konfigurieren AWS CLI , dass er Ihre Kontoanmeldeinformationen und Ihre Standardregion verwendet. Geben Sie Folgendes ein AWS CLI, um Ihre zu konfigurieren:

aws configure
  1. Erstellen Sie eine Datei stock.py mit dem folgenden Inhalt:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Führen Sie das stock.pySkript aus:

    $ python stock.py

    Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen.

Laden Sie den Anwendungscode herunter und überprüfen Sie ihn

Der Python-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:

  1. Installieren Sie den Git-Client, wenn Sie dies noch nicht getan haben. Weitere Informationen finden Sie unter Git installieren.

  2. Klonen Sie das Remote-Repository mit dem folgenden Befehl:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Navigieren Sie zum amazon-kinesis-data-analytics-java-examples/scala/GettingStarted Verzeichnis .

Beachten Sie Folgendes zum Anwendungscode:

  • Eine build.sbt-Datei enthält Informationen über die Konfiguration und Abhängigkeiten der Anwendung, einschließlich der Bibliotheken des Managed Service für Apache Flink.

  • Die BasicStreamingJob.scala-Datei enthält die Hauptmethode, die die Funktionalität der Anwendung definiert.

  • Die Anwendung verwendet eine Kinesis-Quelle zum Lesen aus dem Quell-Stream. Der folgende Codeausschnitt erstellt die Kinesis-Quelle:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    Die Anwendung verwendet auch eine Kinesis-Senke, um in den Ergebnisstream zu schreiben. Der folgende Codeausschnitt erstellt die Kinesis-Senke:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • Die Anwendung erstellt Quell- und Senken-Konnektoren, um mithilfe eines StreamExecutionEnvironment Objekts auf externe Ressourcen zuzugreifen.

  • Die Anwendung erstellt Quell- und Senkenkonnektoren mit dynamischen Anwendungseigenschaften. Die Laufzeiteigenschaften der Anwendung werden gelesen, um die Konnektoren zu konfigurieren. Weitere Informationen zu Laufzeiteigenschaften finden Sie unter Laufzeiteigenschaften.

Kompilieren Sie den Anwendungscode und laden Sie ihn hoch

In diesem Abschnitt kompilieren Sie Ihren Anwendungscode und laden ihn in den Amazon-S3-Bucket hoch, den Sie im Abschnitt Erstellen Sie abhängige Ressourcen erstellt haben.

Kompilieren des Anwendungscodes

In diesem Abschnitt verwenden Sie das SBTBuild-Tool, um den Scala-Code für die Anwendung zu erstellen. Informationen zur Installation SBT finden Sie unter Installieren von sbt mit dem CS-Setup. Sie müssen auch das Java Development Kit (JDK) installieren. Siehe Voraussetzungen für das Fertigstellen der Übungen.

  1. Um Ihren Anwendungscode zu verwenden, kompilieren Sie ihn und packen ihn in eine JAR Datei. Sie können Ihren Code kompilieren und verpacken mitSBT:

    sbt assembly
  2. Wenn die Anwendung erfolgreich kompiliert wurde, wird die folgende Datei erstellt:

    target/scala-3.2.0/getting-started-scala-1.0.jar
Hochladen des Apache Flink-Streaming-Scala-Codes

In diesem Abschnitt erstellen Sie einen Amazon S3-Bucket und laden Ihren Anwendungscode hoch.

  1. Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/.

  2. Wählen Sie Bucket erstellen aus

  3. Geben Sie ka-app-code-<username> im Feld Bucket-Name ein. Fügen Sie dem Bucket-Namen ein Suffix hinzu, wie z. B. Ihren Benutzernamen, damit er global eindeutig ist. Wählen Sie Weiter.

  4. Lassen Sie im Schritt Optionen konfigurieren die Einstellungen unverändert und klicken Sie auf Weiter.

  5. Lassen Sie im Schritt Berechtigungen festlegen die Einstellungen unverändert und klicken Sie auf Weiter.

  6. Wählen Sie Bucket erstellen aus.

  7. Wählen Sie den Bucket ka-app-code-<username> und dann Hochladen aus.

  8. Klicken Sie im Schritt Auswählen von Dateien auf Hinzufügen von Dateien. Navigieren Sie zu der getting-started-scala-1.0.jarDatei, die Sie im vorherigen Schritt erstellt haben.

  9. Sie müssen keine der Einstellungen für das Objekt ändern. Wählen Sie daher Hochladen.

Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.