Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Schritt 3: Erstellen und Ausführen eines Managed Service für Apache Flink für die Flink-Anwendung
In dieser Übung erstellen Sie eine Anwendung von Managed Service für Apache Flink für Flink mit Datenströmen als Quelle und Senke.
Dieser Abschnitt enthält die folgenden Schritte:
- Erstellen von zwei Amazon Kinesis Data Streams
- Schreiben Sie Beispieldatensätze in den Eingabe-Stream
- Herunterladen und Überprüfen des Apache Flink-Streaming-Java-Codes
- Kompilieren des Anwendungscodes
- Hochladen des Apache Flink-Streaming-Java-Codes
- Erstellen und führen Sie die Anwendung Managed Service für Apache Flink aus
Erstellen von zwei Amazon Kinesis Data Streams
Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink für Flink erstellen, erstellen Sie zwei Kinesis-Datenströme (ExampleInputStream
und ExampleOutputStream
). Ihre Anwendung verwendet diese Streams für die Quell- und Ziel-Streams der Anwendung.
Sie können diese Streams mithilfe der Amazon-Kinesis-Konsole oder des folgenden AWS CLI -Befehls erstellen. Detaillierte Konsolenanweisungen finden Sie unter Erstellen und Aktualisieren von Daten-Streams.
So erstellen Sie die Daten-Streams (AWS CLI)
-
Verwenden Sie den folgenden Amazon Kinesis
create-stream
AWS CLI Kinesis-Befehl, um den ersten Stream (ExampleInputStream
) zu erstellen.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern den Stream-Namen in
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Schreiben Sie Beispieldatensätze in den Eingabe-Stream
In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.
Anmerkung
Dieser Abschnitt erfordert AWS SDK for Python (Boto)
-
Erstellen Sie eine Datei
stock.py
mit dem folgenden Inhalt:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
-
Im weiteren Verlauf des Tutorials führen Sie das
stock.py
-Skript zum Senden von Daten an die Anwendung aus.$ python stock.py
Herunterladen und Überprüfen des Apache Flink-Streaming-Java-Codes
Der Java-Anwendungscode für diese Beispiele ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:
-
Klonen Sie das Remote-Repository mit dem folgenden Befehl:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
-
Navigieren Sie zum
GettingStarted
Verzeichnis .
Der Anwendungscode befindet sich in den Dateien CloudWatchLogSink.java
und CustomSinkStreamingJob.java
. Beachten Sie Folgendes zum Anwendungscode:
-
Die Anwendung verwendet eine Kinesis-Quelle zum Lesen aus dem Quell-Stream. Der folgende Codeausschnitt erstellt die Kinesis-Senke:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Kompilieren des Anwendungscodes
In diesem Abschnitt verwenden Sie den Apache Maven-Compiler zum Erstellen des Java-Codes für die Anwendung. Weitere Informationen zum Installieren von Apache Maven und des Java Development Kit (JDK) finden Sie unter Voraussetzungen für das Fertigstellen der Übungen.
Ihre Java-Anwendung erfordert die folgenden Komponenten:
-
Eine Projektobjektmodell (pom.xml)
-Datei. Diese Datei enthält Informationen über die Konfiguration und Abhängigkeiten der Anwendung, einschließlich der Bibliotheken des Managed Service für Apache Flink für Flink-Anwendungen. -
Eine
main
-Methode, die die Logik der Anwendung enthält.
Anmerkung
Zur Nutzung des Kinesis-Konnektors für die folgende Anwendung müssen Sie den Quellcode für den Konnektor herunterladen und ihn erstellen. Einzelheiten dazu finden Sie in der Apache-Flink-Dokumentation
So erstellen und kompilieren Sie den Anwendungscode
-
Erstellen Sie eine Java/Maven-Anwendung in Ihrer Entwicklungsumgebung. Weitere Informationen zum Erstellen einer Anwendung finden Sie in der Dokumentation für Ihre Entwicklungsumgebung:
-
Verwenden Sie den folgenden Code für eine Datei mit dem Namen
StreamingJob.java
.package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
Beachten Sie die folgenden Informationen zum vorherigen Codebeispiel:
-
Diese Datei enthält die
main
-Methode, die die Funktionalität der Anwendung definiert. -
Ihre Anwendung erstellt Quell- und Senkenkonnektoren für den Zugriff auf externe Ressourcen, indem ein
StreamExecutionEnvironment
-Objekt verwendet wird. -
Die Anwendung erstellt Quell- und Senkenkonnektoren mit statischen Eigenschaften. Zum Verwenden dynamischer Anwendungseigenschaften verwenden Sie die Methoden
createSourceFromApplicationProperties
undcreateSinkFromApplicationProperties
, um die Konnektoren zu erstellen. Diese Methoden lesen die Eigenschaften der Anwendung zum Konfigurieren der Konnektoren.
-
-
Zum Verwenden Ihres Anwendungscodes kompilieren und packen Sie ihn in eine JAR-Datei. Sie können Ihren Code auf zwei Arten kompilieren und packen:
-
Verwenden Sie das Befehlszeilen-Maven-Tool. Erstellen Sie Ihre JAR-Datei, indem Sie den folgenden Befehl in dem Verzeichnis ausführen, das die
pom.xml
-Datei enthält:mvn package
-
Verwenden Sie Ihre Entwicklungsumgebung. Weitere Informationen finden Sie in der Dokumentation Ihrer Entwicklungsumgebung.
Sie können Ihr Paket als JAR-Datei hochladen oder komprimieren und als ZIP-Datei hochladen. Wenn Sie Ihre Anwendung mit dem erstellen AWS CLI, geben Sie Ihren Codeinhaltstyp (JAR oder ZIP) an.
-
-
Wenn während der Erstellung Fehler aufgetreten sind, überprüfen Sie, ob Ihre
JAVA_HOME
-Umgebungsvariable richtig eingestellt ist.
Wenn die Anwendung erfolgreich kompiliert wurde, wird die folgende Datei erstellt:
target/java-getting-started-1.0.jar
Hochladen des Apache Flink-Streaming-Java-Codes
In diesem Abschnitt erstellen Sie einen Amazon Simple Storage Service (Amazon S3)-Bucket und laden Ihren Anwendungscode hoch.
So laden Sie den Anwendungscode hoch
Öffnen Sie die Amazon-S3-Konsole unter https://console.aws.amazon.com/s3/
. -
Wählen Sie Bucket erstellen aus.
-
Geben Sie
ka-app-code-
im Feld Bucket-Name ein. Fügen Sie dem Bucket-Namen ein Suffix hinzu, wie z. B. Ihren Benutzernamen, damit er global eindeutig ist. Wählen Sie Weiter aus.<username>
-
Lassen Sie im Schritt Optionen konfigurieren die Einstellungen unverändert und klicken Sie auf Weiter.
-
Lassen Sie im Schritt Berechtigungen festlegen die Einstellungen unverändert und klicken Sie auf Weiter.
-
Wählen Sie Bucket erstellen aus.
-
Wählen Sie in der Amazon S3 S3-Konsole den
<username>Bucket ka-app-code- und wählen Sie Upload aus.
-
Klicken Sie im Schritt Auswählen von Dateien auf Hinzufügen von Dateien. Navigieren Sie zu der
java-getting-started-1.0.jar
Datei, die Sie im vorherigen Schritt erstellt haben. Wählen Sie Weiter aus. -
Lassen Sie im Schritt Berechtigungen festlegen die Einstellungen unverändert. Wählen Sie Weiter aus.
-
Lassen Sie im Schritt Eigenschaften festlegen die Einstellungen unverändert. Klicken Sie auf Hochladen.
Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.
Erstellen und führen Sie die Anwendung Managed Service für Apache Flink aus
Sie können eine Anwendung von Managed Service für Apache Flink für Flink entweder über die Konsole oder AWS CLI erstellen und ausführen.
Anmerkung
Wenn Sie die Anwendung mithilfe der Konsole erstellen, werden Ihre AWS Identity and Access Management (IAM) und Amazon CloudWatch Logs-Ressourcen für Sie erstellt. Wenn Sie die Anwendung mithilfe von erstellen AWS CLI, erstellen Sie diese Ressourcen separat.
Themen
Erstellen und Ausführen der Anwendung (Konsole)
Befolgen Sie diese Schritte, um die Anwendung über die Konsole zu erstellen, zu konfigurieren, zu aktualisieren und auszuführen.
Erstellen Sie die Anwendung
Öffnen Sie die Kinesis-Konsole unter.https://console.aws.amazon.com/kinesis
. -
Wählen Sie auf dem Amazon-Kinesis-Dashboard die Option Create analytics application (Analyseanwendung erstellen) aus.
-
Geben Sie auf der Seite Kinesis Analytics – Anwendung erstellen die Anwendungsdetails wie folgt an:
-
Geben Sie als Anwendungsname ein
MyApplication
. -
Geben Sie für Beschreibung den Text
My java test app
ein. -
Wählen Sie für Runtime (Laufzeit) die Option Apache Flink 1.6 aus.
-
-
Wählen Sie für Zugriffsberechtigungen die Option Erstellen / Aktualisieren Sie IAM-Rolle
kinesis-analytics-MyApplication-us-west-2
aus. -
Wählen Sie Create application aus.
Anmerkung
Beim Erstellen einer Anwendung von Managed Service für Apache Flink für Flink mit der Konsole haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:
-
Richtlinie:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rolle:
kinesis-analytics-
MyApplication
-us-west-2
Bearbeiten der IAM-Richtlinie
Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf die Kinesis-Datenströme.
Öffnen Sie die IAM-Konsole unter https://console.aws.amazon.com/iam/
. -
Wählen Sie Policies (Richtlinien). Wählen Sie die
kinesis-analytics-service-MyApplication-us-west-2
-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. -
Wählen Sie auf der Seite Summary (Übersicht) die Option Edit policy (Richtlinie bearbeiten) aus. Wählen Sie den Tab JSON.
-
Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie die beispielhaften Konto-IDs (
012345678901
) mit Ihrer Konto-ID.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
] }
Konfigurieren der Anwendung
-
Wählen Sie auf der MyApplicationSeite Configure aus.
-
Klicken Sie auf der Seite Configure application (Anwendung konfigurieren) auf die Option Code location (Codespeicherort):
-
Geben Sie für Amazon-S3-Bucket
ka-app-code-
ein.<username>
-
Geben Sie als Pfad zum Amazon-S3-Objekt den Wert
java-getting-started-1.0.jar
ein.
-
-
Wählen Sie unter Zugriff auf Anwendungsressourcen für Zugriffsberechtigungen die Option IAM-Rolle
kinesis-analytics-MyApplication-us-west-2
erstellen/aktualisieren aus. -
Geben Sie unter Eigenschaften für Gruppen-ID den Text
ProducerConfigProperties
ein. -
Geben Sie die folgenden Eigenschaften und Werte der Anwendung ein:
Schlüssel Wert flink.inputstream.initpos
LATEST
aws:region
us-west-2
AggregationEnabled
false
-
Stellen Sie unter Überwachung sicher, dass die Ebene der Überwachungsmetriken auf Anwendung eingestellt ist.
-
Wählen Sie für die CloudWatch Protokollierung das Kontrollkästchen Aktivieren aus.
-
Wählen Sie Aktualisieren.
Anmerkung
Wenn Sie die CloudWatch Protokollierung aktivieren möchten, erstellt Managed Service for Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:
-
Protokollgruppe:
/aws/kinesis-analytics/MyApplication
-
Protokollstream:
kinesis-analytics-log-stream
Ausführen der Anwendung
-
Wählen Sie auf der MyApplicationSeite die Option Ausführen aus. Bestätigen Sie die Aktion.
-
Wenn die Anwendung ausgeführt wird, aktualisieren Sie die Seite. Die Konsole zeigt den Application graph (Anwendungs-Graph) an.
Stoppen der Anwendung
Wählen Sie auf der MyApplicationSeite Stopp aus. Bestätigen Sie die Aktion.
Aktualisieren der Anwendung
Mithilfe der Konsole können Sie Anwendungseinstellungen wie beispielsweise Anwendungseigenschaften, Überwachungseinstellungen und den Speicherort oder den Dateinamen der JAR-Anwendungsdatei aktualisieren. Außerdem können Sie die JAR-Anwendungsdatei erneut aus dem Amazon-S3-Bucket laden, wenn Sie den Anwendungscode aktualisieren müssen.
Wählen Sie auf der MyApplicationSeite Configure aus. Aktualisieren Sie die Anwendungseinstellungen und klicken Sie auf Aktualisieren.
Erstellen und Ausführen der Anwendung (AWS CLI)
In diesem Abschnitt verwenden Sie die, AWS CLI um die Anwendung Managed Service for Apache Flink zu erstellen und auszuführen. Managed Service for Apache Flink for Flink Applications verwendet den kinesisanalyticsv2
AWS CLI Befehl, um Managed Service für Apache Flink-Anwendungen zu erstellen und mit ihnen zu interagieren.
Erstellen einer Berechtigungsrichtlinie
Zuerst erstellen Sie eine Berechtigungsrichtlinie mit zwei Anweisungen: eine, die Berechtigungen für die read
-Aktion auf den Quell-Stream zulässt, und eine andere, die Berechtigungen für die write
-Aktionen auf den Senken-Stream zulässt. Anschließend fügen Sie die Richtlinie an eine IAM-Rolle (die Sie im nächsten Abschnitt erstellen) an. Wenn Managed Service für Apache Flink also die Rolle übernimmt, verfügt der Service über die erforderlichen Berechtigungen zum Lesen aus dem Quell-Stream und zum Schreiben in den Senken-Stream.
Verwenden Sie den folgenden Code zum Erstellen der KAReadSourceStreamWriteSinkStream
-Berechtigungsrichtlinie. Ersetzen Sie
durch den Benutzernamen, den Sie verwendet haben, um den Amazon-S3-Bucket zum Speichern des Anwendungscodes zu erstellen. Ersetzen Sie die Konto-ID in den Amazon-Ressourcennamen (ARNs) (username
) mit Ihrer Konto-ID.012345678901
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-
username
", "arn:aws:s3:::ka-app-code-username
/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
step-by-step Anweisungen zum Erstellen einer Berechtigungsrichtlinie finden Sie unter Tutorial: Create and Attach Your First Customer Managed Policy im IAM-Benutzerhandbuch.
Anmerkung
Um auf andere AWS Dienste zuzugreifen, können Sie den AWS SDK for Java verwenden. Managed Service für Apache Flink setzt die vom SDK benötigten Anmeldeinformationen automatisch auf die der IAM-Rolle für die Dienstausführung, die mit Ihrer Anwendung verknüpft ist. Es sind keine weiteren Schritte erforderlich.
Erstellen einer IAM-Rolle
In diesem Abschnitt erstellen Sie eine IAM-Rolle, die die Anwendung von Managed Service für Apache Flink für Flink annehmen kann, um einen Quell-Stream zu lesen und in den Senken-Stream zu schreiben.
Managed Service für Apache Flink kann ohne Berechtigungen nicht auf Ihren Stream zugreifen. Sie erteilen diese Berechtigungen über eine IAM-Rolle. Jeder IAM-Rolle sind zwei Richtlinien angefügt. Die Vertrauensrichtlinie erteilt Managed Service für Apache Flink die Berechtigung zum Übernehmen der Rolle und die Berechtigungsrichtlinie bestimmt, was Managed Service für Apache Flink nach Annahme der Rolle tun kann.
Sie können die Berechtigungsrichtlinie, die Sie im vorherigen Abschnitt erstellt haben, dieser Rolle anfügen.
So erstellen Sie eine IAM-Rolle
Öffnen Sie die IAM-Konsole unter https://console.aws.amazon.com/iam/
. -
Wählen Sie im Navigationsbereich Roles (Rollen) und Create Role (Rolle erstellen) aus.
-
Wählen Sie unter Typ der vertrauenswürdigen Entität auswählen die Option AWS -Service aus. Wählen Sie unter Choose the service that will use this role (Wählen Sie den Service aus, der diese Rolle verwendet) die Option Kinesis aus. Wählen Sie unter Select your use case (Wählen Sie Ihren Anwendungsfall aus) die Option Kinesis Analytics aus.
Wählen Sie Weiter: Berechtigungen aus.
-
Wählen Sie auf der Seite Attach permissions policies (Berechtigungsrichtlinien hinzufügen) Next: Review (Weiter: Überprüfen) aus. Sie fügen Berechtigungsrichtlinien an, nachdem Sie die Rolle erstellt haben.
-
Geben Sie auf der Seite Create role (Rolle erstellen) den Text
KA-stream-rw-role
für Role name (Rollenname) ein. Wählen Sie Rolle erstellen aus.Jetzt haben Sie eine neue IAM-Rolle mit dem Namen
KA-stream-rw-role
erstellt. Im nächsten Schritt aktualisieren Sie die Vertrauens- und Berechtigungsrichtlinien für die Rolle. -
Fügen Sie die Berechtigungsrichtlinie der Rolle an.
Anmerkung
Für diese Übung übernimmt Managed Service für Apache Flink diese Rolle sowohl für das Lesen von Daten aus einem Kinesis-Datenstrom (Quelle) als auch zum Schreiben der Ausgabedaten in einen anderen Kinesis-Datenstrom. Daher fügen Sie die Richtlinie an, die Sie im vorherigen Schritt erstellt haben, Erstellen einer Berechtigungsrichtlinie.
-
Wählen Sie auf der Seite Summary (Übersicht) die Registerkarte Permissions (Berechtigungen) aus.
-
Wählen Sie Attach Policies (Richtlinien anfügen) aus.
-
Geben Sie im Suchfeld
KAReadSourceStreamWriteSinkStream
(die Richtlinie, die Sie im vorhergehenden Abschnitt erstellt haben) ein. -
Wählen Sie die ReadInputStreamWriteOutputStreamKA-Richtlinie und anschließend die Option Richtlinie anhängen aus.
-
Sie haben nun die Service-Ausführungsrolle erstellt, die Ihre Anwendung für den Zugriff auf Ressourcen verwendet. Notieren Sie sich den ARN der neuen Rolle.
step-by-step Anweisungen zum Erstellen einer Rolle finden Sie unter Erstellen einer IAM-Rolle (Konsole) im IAM-Benutzerhandbuch.
Erstellen Sie die Anwendung Managed Service für Apache Flink
-
Speichern Sie den folgenden JSON-Code in eine Datei mit dem Namen
create_request.json
. Ersetzen Sie den Beispiel-Rollen-ARN durch den ARN für die Rolle, die Sie zuvor erstellt haben. Ersetzen Sie das Bucket-ARN-Suffix (
) mit dem Suffix, das Sie im vorherigen Abschnitt gewählt haben. Ersetzen Sie die beispielhafte Konto-ID (username
) in der Service-Ausführungsrolle mit Ihrer Konto-ID.012345678901
{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::
012345678901
:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username
", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
Führen Sie die
CreateApplication
-Aktion mit der vorherigen Anforderung zum Erstellen der Anwendung aus:aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
Die Anwendung wird nun erstellt. Sie starten die Anwendung im nächsten Schritt.
Starten der Anwendung
In diesem Abschnitt verwenden Sie die StartApplication
-Aktion, um die Anwendung zu starten.
So starten Sie die Anwendung
-
Speichern Sie den folgenden JSON-Code in eine Datei mit dem Namen
start_request.json
.{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
-
Führen Sie die
StartApplication
-Aktion mit der vorherigen Anforderung zum Starten der Anwendung aus:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
Die Anwendung wird jetzt ausgeführt. Sie können die Kennzahlen Managed Service for Apache Flink auf der CloudWatch Amazon-Konsole überprüfen, um sicherzustellen, dass die Anwendung funktioniert.
Stoppen der Anwendung
In diesem Abschnitt verwenden Sie die StopApplication
-Aktion, um die Anwendung zu stoppen.
So stoppen Sie die Anwendung
-
Speichern Sie den folgenden JSON-Code in eine Datei mit dem Namen
stop_request.json
.{"ApplicationName": "test" }
-
Führen Sie die
StopApplication
-Aktion mit der folgenden Anforderung zum Stoppen der Anwendung aus:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
Die Anwendung wird nun gestoppt.