Schritt 1: Erstellen der Eingabe- und Ausgabe-Streams - Amazon-Kinesis-Data-Analytics für SQL-Anwendungen

Für neue Projekte empfehlen wir, den neuen Managed Service für Apache Flink Studio anstelle von Kinesis-Data-Analytics-for-SQL-Anwendungen zu verwenden. Der Managed Service für Apache Flink Studio kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie in wenigen Minuten anspruchsvolle Anwendungen zur Stream-Verarbeitung erstellen können.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schritt 1: Erstellen der Eingabe- und Ausgabe-Streams

Bevor Sie eine Amazon Kinesis Data Analytics-Anwendung für das Hotspots-Beispiel erstellen, erstellen Sie zwei Kinesis-Datenströme. Konfigurieren Sie einen der Streams als Streaming-Quelle für Ihre Anwendung und den anderen Stream als das Ziel, an das Kinesis Data Analytics die Ausgabe Ihrer Anwendung weiterleitet.

Schritt 1.1: Erstellen der Kinesis-Datenströme

In diesem Abschnitt erstellen Sie zwei Kinesis-Datenströme: ExampleInputStream und ExampleOutputStream.

Erstellen Sie diese Daten-Streams mithilfe der Konsole oder der AWS CLI.

  • So erstellen Sie die Daten-Streams mithilfe der Konsole:

    1. Melden Sie sich bei AWS Management Console an und öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.

    2. Klicken Sie im Navigationsbereich auf Data Streams (Daten-Streams).

    3. Klicken Sie auf Create Kinesis stream (Kinesis-Stream erstellen) und erstellen Sie einen Stream mit einer Shard namens ExampleInputStream.

    4. Wiederholen Sie den vorherigen Schritt und erstellen Sie einen Stream mit einer Shard namens ExampleOutputStream.

  • So erstellen Sie einen Daten-Stream mithilfe der AWS CLI:

    • Erstellen Sie die Streams (ExampleInputStream und ExampleOutputStream) mithilfe des folgenden create-stream AWS CLI Kinesis-Befehls. Zum Erstellen des zweiten Streams, den die Anwendung zum Schreiben der Ausgabe verwenden wird, führen Sie den gleichen Befehl aus und ändern Sie den Namen des Streams in ExampleOutputStream.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Schritt 1.2: Schreiben Sie Beispieldatensätze in den Eingabe-Stream

In diesem Schritt führen Sie Python-Code aus, um kontinuierlich Beispieldatensätze zu generieren und zum ExampleInputStream-Stream zu schreiben.

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. Installieren Sie Python und pip.

    Informationen zur Installation von Python finden Sie auf der Website für Python.

    Sie können mithilfe von pip Abhängigkeiten installieren. Informationen zur Installation von pip finden Sie unter Installation auf der Website für pip.

  2. Führen Sie den folgenden Python-Code aus. Dieser Code führt Folgendes aus:

    • Generiert einen potenziellen Hotspot an einer Stelle in der XY-Ebene.

    • Generiert eine Gruppe von 1000 Punkten für jeden Hotspot. Von diesen Punkten werden 20 Prozent rund um den Hotspot gruppiert. Der Rest wird nach dem Zufallsprinzip innerhalb des gesamten Raums generiert.

    • Der put-record-Befehl schreibt die JSON-Datensätze in den Stream.

    Wichtig

    Laden Sie diese Datei nicht auf einen Webserver hoch, da sie Ihre AWS-Anmeldeinformationen enthält.

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )

Nächster Schritt

Schritt 2: Erstellen Sie die Kinesis Data Analytics-Anwendung