Erstellen eines Studio-Notebooks mit Amazon MSK - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erstellen eines Studio-Notebooks mit Amazon MSK

In diesem Tutorial wird beschrieben, wie Sie ein Studio-Notebook erstellen, das einen Amazon-MSK-Cluster als Quelle verwendet.

Aufstellen

Für dieses Tutorial benötigen Sie einen Amazon-MSK-Cluster, der Klartextzugriff ermöglicht. Wenn Sie noch keinen Amazon MSK-Cluster eingerichtet haben, folgen Sie dem Tutorial Erste Schritte mit Amazon MSK, um eine Amazon VPC, einen Amazon MSK-Cluster, ein Thema und eine Amazon EC2-Client-Instance zu erstellen.

Gehen Sie beim Befolgen des Tutorials wie folgt vor:

Fügen Sie Ihrer VPC ein NAT-Gateway hinzu

Wenn Sie einen Amazon MSK-Cluster erstellt haben, indem Sie dem Tutorial Erste Schritte mit Amazon MSK gefolgt sind, oder wenn Ihre bestehende Amazon VPC noch kein NAT-Gateway für ihre privaten Subnetze hat, müssen Sie Ihrer Amazon VPC ein NAT-Gateway hinzufügen. Das folgende Diagramm zeigt die Architektur.

VPC architecture with public and private subnets, NAT gateway, Glue Data Catalog, KDA Notebook, and MSK Kafka Broker. (AI generated)

Gehen Sie wie folgt vor, um ein NAT-Gateway für Ihre Amazon VPC zu erstellen:

  1. Öffnen Sie die Amazon VPC-Konsole unter https://console.aws.amazon.com/vpc/.

  2. Wählen Sie in der linken Navigationsleiste NAT-Gateway aus.

  3. Wählen Sie auf der Seite NAT-Gateways die Option NAT-Gateway erstellen aus.

  4. Geben Sie auf der Seite NAT-Gateway erstellen die folgenden Werte an:

    Name — optional ZeppelinGateway
    Subnetz AWS KafkaTutorialSubnet1
    Elastische IP-Zuweisungs-ID Wählen Sie eine verfügbare Elastic IP aus. Wenn keine Elastic IPs verfügbar sind, wählen Sie Allocate Elastic IP und dann die Elastic IP aus, die die Konsole erstellt.

    Wählen Sie NAT-Gateway erstellen aus.

  5. Wählen Sie in der linken Navigationsleiste Routing-Tabellen aus.

  6. Klicken Sie auf Create Route Table (Routing-Tabelle erstellen).

  7. Geben Sie auf der Seite Routing-Tabelle erstellen folgende Informationen ein:

    • Name-Tag: ZeppelinRouteTable

    • VPC: Wählen Sie Ihre VPC (z. B. AWS KafkaTutorialVPC).

    Wählen Sie Erstellen.

  8. Wählen Sie in der Liste der Routentabellen. ZeppelinRouteTable Klicken Sie auf der Registerkarte Routen auf Routen bearbeiten.

  9. Wählen Sie auf der Seite Routen bearbeiten die Option Route hinzufügen aus.

  10. Geben Sie im Für-Ziel 0.0.0.0/0 ein. Wählen Sie für Target die Option NAT Gateway, ZeppelinGateway. Wählen Sie Routen speichern aus. Klicken Sie auf Schließen.

  11. Wählen Sie auf der Seite Routing-Tabellen die ZeppelinRouteTableOption „Subnetzzuordnungen“ aus. Wählen Sie Subnetzzuordnungen bearbeiten aus.

  12. Wählen Sie auf der Seite Subnetzzuordnungen bearbeiten die Optionen AWS KafkaTutorialSubnet2 und AWS KafkaTutorialSubnet 3 aus. Wählen Sie Speichern.

Erstellen Sie eine AWS Glue Verbindung und eine Tabelle

Ihr Studio-Notebook verwendet eine AWS Glue-Datenbank für Metadaten zu Ihrer Amazon MSK-Datenquelle. In diesem Abschnitt erstellen Sie eine AWS Glue Verbindung, die beschreibt, wie Sie auf Ihren Amazon MSK-Cluster zugreifen können, und eine AWS Glue Tabelle, die beschreibt, wie Sie die Daten in Ihrer Datenquelle für Clients wie Ihr Studio-Notebook präsentieren.

Eine Verbindung erstellen
  1. Melden Sie sich bei der an AWS Management Console und öffnen Sie die AWS Glue Konsole unter https://console.aws.amazon.com/glue/.

  2. Wenn Sie noch keine AWS Glue Datenbank haben, wählen Sie in der linken Navigationsleiste Datenbanken aus. Wählen Sie Datenbank hinzufügen. Geben Sie im Fenster Datenbank hinzufügen default als Namen der Datenbank ein. Wählen Sie Erstellen.

  3. Wählen Sie in der linken Navigationsleiste Verbindungen aus. Wählen Sie Verbindung hinzufügen aus.

  4. Geben Sie im Fenster Verbindung hinzufügen die folgenden Werte ein:

    • Geben Sie für Verbindungsname ZeppelinConnection ein.

    • Wählen Sie für Verbindungstyp den Eintrag Kafka.

    • Geben Sie für Kafka-Bootstrap-Server-URLs die Bootstrap-Broker-Zeichenfolge für Ihren Cluster an. Sie können die Bootstrap-Broker entweder über die MSK-Konsole oder durch Eingabe des folgenden CLI-Befehls abrufen:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Deaktivieren Sie das Kontrollkästchen SSL-Verbindung erforderlich.

    Wählen Sie Weiter aus.

  5. Geben Sie auf der VPC-Seite die folgenden Werte an:

    • Wählen Sie für VPC den Namen Ihrer VPC (z. B. AWS KafkaTutorialVPC).

    • Wählen Sie für Subnetz 2 aus.AWS KafkaTutorialSubnet

    • Wählen Sie für Sicherheitsgruppen alle verfügbaren Gruppen aus.

    Wählen Sie Weiter aus.

  6. Wählen Sie auf der Seite Verbindungseigenschaften / Verbindungszugriff die Option Fertigstellen aus.

Erstellen einer Tabelle
Anmerkung

Sie können die Tabelle entweder manuell erstellen, wie in den folgenden Schritten beschrieben, oder Sie können den Konnektorcode zum Erstellen einer Tabelle für Managed Service für Apache Flink in Ihrem Notebook innerhalb von Apache Zeppelin verwenden, um Ihre Tabelle über eine DDL-Anweisung zu erstellen. Sie können dann einchecken AWS Glue , um sicherzustellen, dass die Tabelle korrekt erstellt wurde.

  1. Wählen Sie in der linken Navigationsleiste die Option Tabellen. Wählen Sie auf der Seite Tabellen die Optionen Tabellen hinzufügen, Tabelle manuell hinzufügen aus.

  2. Geben Sie auf der Seite Eigenschaften Ihrer Tabelle einrichten stock als Tabellennamen ein. Stellen Sie sicher, dass Sie die Datenbank auswählen, die Sie zuvor erstellt haben. Wählen Sie Weiter aus.

  3. Wählen Sie auf der Seite Datenspeicher hinzufügen die Option Kafka aus. Geben Sie als Themennamen Ihren Themennamen ein (z. B. AWS KafkaTutorialTopic). Wählen Sie für Verbindung ZeppelinConnection.

  4. Wählen Sie auf der Seite Klassifikation die Option JSON aus. Wählen Sie Weiter aus.

  5. Wählen Sie auf der Seite Schema definieren die Option „Spalte hinzufügen“, um eine Spalte hinzuzufügen. Fügen Sie Spalten mit den folgenden Eigenschaften hinzu:

    Spaltenname Datentyp
    ticker string
    price double

    Wählen Sie Weiter aus.

  6. Überprüfen Sie auf der nächsten Seite Ihre Einstellungen und wählen Sie Fertigstellen.

  7. Wählen Sie die neu erstellte Tabelle aus der Liste der Tabellen aus.

  8. Wählen Sie Tabelle bearbeiten und fügen Sie eine Eigenschaft mit dem Schlüssel managed-flink.proctime und dem Wert proctime hinzu.

  9. Wählen Sie Apply (Anwenden) aus.

Erstellen Sie ein Studio-Notebook mit Amazon MSK

Nachdem Sie die Ressourcen erstellt haben, die Ihre Anwendung verwendet, erstellen Sie Ihr Studio-Notebook.

Sie können Ihre Anwendung entweder mit AWS Management Console oder mit dem erstellen AWS CLI.
Anmerkung

Sie können ein Studio-Notebook auch von der Amazon MSK-Konsole aus erstellen, indem Sie einen vorhandenen Cluster auswählen und dann Daten in Echtzeit verarbeiten wählen.

Erstellen Sie ein Studio-Notizbuch mit dem AWS Management Console

  1. Öffnen Sie die Konsole Managed Service für Apache Flink unter https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. Wählen Sie auf der Seite Managed Service für Apache Flink-Anwendungen die Registerkarte Studio aus. Wählen Sie Studio-Notebook erstellen.

    Anmerkung

    Um ein Studio-Notebook über die Amazon MSK- oder Kinesis Data Streams-Konsolen zu erstellen, wählen Sie Ihren Amazon MSK-Eingabe-Cluster oder Kinesis Data Stream aus und wählen Sie dann Daten in Echtzeit verarbeiten aus.

  3. Geben Sie auf der Seite Notebook-Instance erstellen folgende Informationen ein:

    • Geben Sie MyNotebook als Studio-Notebookname.

    • Wählen Sie Standard für die AWS -Glue-Datenbank.

    Wählen Sie Studio-Notebook erstellen.

  4. Wählen Sie MyNotebookauf der Seite die Registerkarte Konfiguration aus. Wählen Sie im Abschnitt Netzwerk die Option Bearbeiten.

  5. Wählen Sie auf der MyNotebook Seite Netzwerk bearbeiten für die VPC-Konfiguration basierend auf dem Amazon MSK-Cluster aus. Wählen Sie Ihren Amazon MSK-Cluster für den Amazon MSK-Cluster aus. Wählen Sie Änderungen speichern.

  6. Wählen Sie auf der MyNotebookSeite die Option Ausführen aus. Warten Sie, bis der Status Wird ausgeführt angezeigt wird.

Erstellen Sie ein Studio-Notizbuch mit dem AWS CLI

Gehen Sie wie folgt vor AWS CLI, um Ihr Studio-Notizbuch mit dem zu erstellen:

  1. Stellen Sie sicher, dass Sie über die folgenden Informationen verfügen: Sie benötigen diese Werte, um Ihre Anwendung zu erstellen.

    • Ihre Konto-ID.

    • Die Subnetz-IDs und Sicherheitsgruppen-ID für die Amazon-VPC, die Ihren Amazon-MSK-Cluster enthält.

  2. Erstellen Sie eine Datei mit dem Namen create.json und den folgenden Inhalten. Ersetzen Sie die Platzhalterwerte durch Ihre Informationen.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Um Ihre Anwendung zu erstellen, führen Sie den folgenden Befehl aus.

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Wenn der Befehl abgeschlossen ist, sollte eine Ausgabe wie die folgende angezeigt werden, die die Details für Ihr neues Studio-Notebook enthält:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Um Ihre Anwendung zu starten, führen Sie den folgenden Befehl aus. Ersetzen Sie die Beispielwerte durch Ihre Konto-ID.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Senden Sie Daten an Ihren Amazon MSK-Cluster

In diesem Abschnitt führen Sie ein Python-Skript in Ihrem Amazon EC2-Client aus, um Daten an Ihre Amazon MSK-Datenquelle zu senden.

  1. Stellen Sie eine Verbindung zu Ihrem Amazon EC2-Client her.

  2. Führen Sie die folgenden Befehle aus, um Python Version 3, Pip und das Kafka für Python-Paket zu installieren, und bestätigen Sie die Aktionen:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Konfigurieren Sie das AWS CLI auf Ihrem Client-Computer, indem Sie den folgenden Befehl eingeben:

    aws configure

    Geben Sie Ihre Kontoanmeldeinformationen ein, und us-east-1 für die region.

  4. Erstellen Sie eine Datei mit dem Namen stock.py und den folgenden Inhalten. Ersetzen Sie den Beispielwert durch die Bootstrap Brokers-Zeichenfolge Ihres Amazon MSK-Clusters und aktualisieren Sie den Themennamen, falls Ihr Thema nicht: AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Führen Sie das Skript mit dem folgenden Befehl aus:

    $ python3 stock.py
  6. Lassen Sie das Skript laufen, während Sie den folgenden Abschnitt abschließen.

Testen Sie Ihr Studio-Notebook

In diesem Abschnitt verwenden Sie Ihr Studio-Notebook, um Daten aus Ihrem Amazon MSK-Cluster abzufragen.

  1. Öffnen Sie die Konsole Managed Service für Apache Flink unter https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard.

  2. Wählen Sie auf der Seite Managed Service für Apache Flink-Anwendungen die Registerkarte Studio-Notebook aus. Wählen Sie. MyNotebook

  3. Wählen Sie MyNotebookauf der Seite „In Apache Zeppelin öffnen“.

    Die Oberfläche von Apache Zeppelin wird in einer neuen Registerkarte geöffnet.

  4. Auf der Seite Willkommen bei Zeppelin! wählen Sie Zeppelin neue Notiz aus.

  5. Geben Sie auf der Seite Zeppelin Notiz die folgende Abfrage in eine neue Notiz ein:

    %flink.ssql(type=update) select * from stock

    Wählen Sie das Ausführungssymbol.

    Die Anwendung zeigt Daten aus dem Amazon MSK-Cluster an.

Um das Apache Flink-Dashboard für Ihre Anwendung zu öffnen und betriebliche Aspekte zu sehen, wählen Sie FLINK JOB. Weitere Informationen zum Flink-Dashboard finden Sie unter Apache Flink-Dashboard im Managed Service für Apache Flink Entwicklerhandbuch.

Weitere Beispiele für Flink-Streaming-SQL-Abfragen finden Sie unter Abfragen in der Apache Flink-Dokumentation.