Managed Service for Apache Flink アプリケーションを作成して実行する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink アプリケーションを作成して実行する

このステップでは、Kinesis データストリームをソースとシンクとして Apache Flink アプリケーション用 Managed Service を作成します。

依存リソースを作成する

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。

  • 入出力用の 2 つの Kinesis データストリーム

  • アプリケーションのコードを保存する Amazon S3 バケット

    注記

    このチュートリアルでは、米国東部 (バージニア北部) リージョンにアプリケーションをデプロイすることを前提としています。別のリージョンを使用する場合は、それに応じてすべてのステップを適応させます。

2 つの Amazon Kinesis データストリームを作成する

この演習で Apache Flink アプリケーションのマネージドサービスを作成する前に、2 つの Kinesis データストリーム (ExampleInputStreamExampleOutputStream) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは、Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールでの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」 の 「データストリームの作成および更新」 を参照してください。を使用してストリームを作成するには AWS CLI、次のコマンドを使用して、アプリケーションに使用するリージョンを調整します。

データストリームを作成するには (AWS CLI)
  1. 最初のストリーム (ExampleInputStream) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. アプリケーションが出力の書き込みに使用する 2 番目のストリームを作成するには、同じコマンドを実行し、ストリーム名を に変更しますExampleOutputStream

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

アプリケーションコードの Amazon S3 バケットを作成する

Amazon S3 バケットは、コンソールを使用して作成できます。コンソールを使用して Amazon S3 バケットを作成する方法については、「Amazon S3 ユーザーガイド」の「バケットの作成」を参照してください。 Amazon S3 例えば、ログイン名を追加するなど、グローバルに一意の名前を使用して Amazon S3 バケットに名前を付けます。

注記

このチュートリアルで使用するリージョン (us-east-1) にバケットを作成してください。

その他のリソース

アプリケーションを作成すると、Managed Service for Apache Flink は、次の Amazon CloudWatch リソースがまだ存在しない場合、自動的に作成します。

  • /AWS/KinesisAnalytics-java/<my-application>という名前のロググループ。

  • kinesis-analytics-log-stream というログストリーム

ローカルの開発環境のセットアップ

開発とデバッグのために、IDE選択したマシンで Apache Flink アプリケーションを直接実行できます。Apache Flink の依存関係は、Apache Maven を使用して通常の Java の依存関係のように処理されます。

注記

開発マシンには、Java JDK11、Maven、Git がインストールされている必要があります。Eclipse Java Neon IntelliJ IDEAなどの開発環境を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、「」を参照してください演習を完了するための前提条件を満たす。マシンに Apache Flink クラスターをインストールする必要はありません

セッションを認証する AWS

アプリケーションは Kinesis データストリームを使用してデータを公開します。ローカルで実行する場合は、Kinesis データストリームに書き込むためのアクセス許可を持つ有効な AWS 認証済みセッションが必要です。セッションを認証するには、次のステップに従います。

  1. AWS CLI と、有効な認証情報が設定されている名前付きプロファイルがない場合は、「」を参照してくださいAWS Command Line Interface (AWS CLI) のセットアップ

  2. AWS CLI が正しく設定され、次のテストレコードを発行して Kinesis データストリームに書き込むアクセス許可がユーザーに付与されていることを確認します。

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. と統合するプラグインIDEが にある場合は AWS、それを使用して、 で実行されているアプリケーションに認証情報を渡すことができますIDE。詳細については、AWS 「 Toolkit for IntelliJIDEA」およびAWS 「 Toolkit for Eclipse」を参照してください。

Apache Flink ストリーミング Java コードをダウンロードして調べる

この例の Java アプリケーションコードは、 から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted ディレクトリに移動します。

アプリケーションコンポーネントを確認する

アプリケーションは、 com.amazonaws.services.msf.BasicStreamingJob クラスに完全に実装されています。main() メソッドは、ストリーミングデータを処理して実行するデータフローを定義します。

注記

開発者エクスペリエンスを最適化するために、アプリケーションは Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに実行され、 で開発できるように設計されていますIDE。

  • Amazon Managed Service for Apache Flink および で を実行するときに機能するようにランタイム設定を読み取るためにIDE、アプリケーションは でローカルでスタンドアロンを実行しているかどうかを自動的に検出しますIDE。この場合、アプリケーションはランタイム設定を異なる方法でロードします。

    1. アプリケーションで のスタンドアロンモードで実行されていることがわかったらIDE、プロジェクトのリソースフォルダに含まれる application_properties.json ファイルを作成します。ファイルの内容は次のとおりです。

    2. アプリケーションが Amazon Managed Service for Apache Flink で実行されると、デフォルトの動作により、Amazon Managed Service for Apache Flink アプリケーションで定義するランタイムプロパティからアプリケーション設定がロードされます。Managed Service for Apache Flink アプリケーションの作成と設定 を参照してください。

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • main() メソッドは、アプリケーションデータフローを定義して実行します。

    • デフォルトのストリーミング環境を初期化します。この例では、 StreamExecutionEnvironmentで使用する と、 で使用する DataSteam APIとStreamTableEnvironmentSQLテーブル の両方を作成する方法を示しますAPI。2 つの環境オブジェクトは、異なる を使用するための、同じランタイム環境への 2 つの個別の参照ですAPIs。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • アプリケーション設定パラメータをロードします。これにより、アプリケーションが実行されている場所に応じて、正しい場所から自動的にロードされます。

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • アプリケーションは、Kinesis Consumer コネクタを使用して入力ストリームからデータを読み取るソースを定義します。入力ストリームの設定は、 PropertyGroupId= で定義されますInputStream0。ストリームの名前とリージョンは、aws.regionそれぞれ stream.nameおよび という名前のプロパティにあります。わかりやすくするために、このソースはレコードを文字列として読み取ります。

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • 次に、アプリケーションは Kinesis Streams Sink コネクタを使用してシンクを定義し、出力ストリームにデータを送信します。出力ストリーム名とリージョンはOutputStream0、入力ストリームと同様に PropertyGroupId= で定義されます。シンクは、ソースからデータを取得DataStreamしている内部に直接接続されます。実際のアプリケーションでは、ソースとシンクの間に何らかの変換があります。

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • 最後に、先ほど定義したデータフローを実行します。これは、データフローが必要とするすべての演算子を定義した後、 main()メソッドの最後の命令である必要があります。

      env.execute("Flink streaming Java API skeleton");

pom.xml ファイルを使用する

pom.xml ファイルは、アプリケーションに必要なすべての依存関係を定義し、Flink に必要なすべての依存関係を含む fat-jar を構築するように Maven Shade プラグインを設定します。

  • 一部の依存関係にはprovidedスコープがあります。これらの依存関係は、アプリケーションが Amazon Managed Service for Apache Flink で実行されるときに自動的に使用できます。アプリケーションをコンパイルしたり、 でアプリケーションをローカルで実行したりするために必要ですIDE。詳細については、「アプリケーションをローカルで実行する」を参照してください。Amazon Managed Service for Apache Flink で使用するランタイムと同じ Flink バージョンを使用していることを確認してください。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • このアプリケーションで使用される Kinesis コネクタなど、デフォルトのスコープの pom に Apache Flink 依存関係を追加する必要があります。詳細については、「Apache Flink コネクタ」を参照してください。アプリケーションに必要な Java 依存関係を追加することもできます。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Maven Java Compiler プラグインは、コードが Apache Flink で現在サポートされているJDKバージョンである Java 11 に対してコンパイルされていることを確認します。

  • Maven Shade プラグインは fat-jar をパッケージ化します。ただし、ランタイムによって提供されるライブラリは除きます。また、 ServicesResourceTransformerと の 2 つのトランスフォーマーも指定しますManifestResourceTransformer。後者は、 mainメソッドを含む クラスを設定してアプリケーションを起動します。メインクラスの名前を変更する場合は、このトランスフォーマーを更新することを忘れないでください。

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

サンプルレコードを入力ストリームに書き込む

このセクションでは、アプリケーションが処理するサンプルレコードをストリームに送信します。サンプルデータを生成するには、Python スクリプトまたは Kinesis Data Generator を使用して 2 つのオプションがあります。

Python スクリプトを使用してサンプルデータを生成する

Python スクリプトを使用して、サンプルレコードをストリームに送信できます。

注記

この Python スクリプトを実行するには、Python 3.x を使用し、 AWS SDK for Python (Boto) ライブラリがインストールされている必要があります。

Kinesis 入力ストリームへのテストデータの送信を開始するには:

  1. データジェネレータリポジトリ stock.py からデータジェネレータ Python スクリプトをダウンロードします。 GitHub

  2. stock.py スクリプトを実行します。

    $ python stock.py

チュートリアルの残りの部分を完了する間は、スクリプトを実行したままにします。Apache Flink アプリケーションを実行できるようになりました。

Kinesis Data Generator を使用してサンプルデータを生成する

Python スクリプトを使用する代わりに、ホストバージョン でも利用可能な Kinesis Data Generator を使用して、ランダムなサンプルデータをストリームに送信できます。Kinesis Data Generator はブラウザで実行されるため、マシンに何もインストールする必要はありません。

Kinesis Data Generator をセットアップして実行するには:

  1. Kinesis Data Generator ドキュメントの指示に従って、ツールへのアクセスを設定します。ユーザーとパスワードを設定する AWS CloudFormation テンプレートを実行します。

  2. テンプレートによってURL生成された を介して Kinesis Data Generator CloudFormationにアクセスします。 CloudFormation テンプレートが完了するとURL、出力タブに が表示されます。

  3. データジェネレーターを設定します。

    • リージョン: このチュートリアルで使用しているリージョンを選択します: us-east-1

    • ストリーム/配信ストリーム: アプリケーションが使用する入力ストリームを選択します。 ExampleInputStream

    • 1 秒あたりのレコード数: 100

    • レコードテンプレート: 次のテンプレートをコピーして貼り付けます。

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. テンプレートをテストする: テストテンプレートを選択し、生成されたレコードが次のようになっていることを確認します。

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. データジェネレーターを起動する: データの送信 を選択します

Kinesis Data Generator は、現在 にデータを送信していますExampleInputStream

アプリケーションをローカルで実行する

Flink アプリケーションをローカルで で実行およびデバッグできますIDE。

注記

続行する前に、入力ストリームと出力ストリームが使用可能であることを確認します。2 つの Amazon Kinesis データストリームを作成する を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認します。セッションを認証する AWS を参照してください。

ローカル開発環境を設定するにはJDK、Java 開発IDE用の Java 11、Apache Maven、および が必要です。必要な前提条件を満たしていることを確認します。演習を完了するための前提条件を満たす を参照してください。

Java プロジェクトを にインポートする IDE

でアプリケーションの使用を開始するにはIDE、Java プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、 ./java/GettingStartedサブディレクトリのコンテンツを にインポートしますIDE。

Maven を使用して、コードを既存の Java プロジェクトとして挿入します。

注記

新しい Java プロジェクトをインポートする正確なプロセスは、IDE使用している によって異なります。

ローカルアプリケーション設定を確認する

ローカルで実行すると、アプリケーションは のプロジェクトのリソースフォルダにある application_properties.json ファイルの設定を使用します./src/main/resources。このファイルを編集して、異なる Kinesis ストリーム名またはリージョンを使用できます。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

IDE 実行設定をセットアップする

Java アプリケーションを実行する場合と同様に、メインクラス を実行するIDEことでcom.amazonaws.services.msf.BasicStreamingJob、 から直接 Flink アプリケーションを実行およびデバッグできます。アプリケーションを実行する前に、実行設定をセットアップする必要があります。設定は、使用している IDE によって異なります。例えば、IntelliJ IDEAドキュメントの「実行/デバッグ設定」を参照してください。特に、以下を設定する必要があります。

  1. クラスパス にprovided依存関係を追加します。これは、ローカルで実行するときに、providedスコープ を持つ依存関係がアプリケーションに渡されるようにするために必要です。この設定を行わないと、アプリケーションはすぐにclass not foundエラーを表示します。

  2. Kinesis ストリームにアクセスするための AWS 認証情報をアプリケーション に渡します。最も速い方法は Toolkit AWS for IntelliJ IDEAを使用することです。実行設定でこのIDEプラグインを使用すると、特定の AWS プロファイルを選択できます。 AWS 認証は、このプロファイルを使用して行われます。認証情報を直接渡す AWS 必要はありません。

  3. JDK11 を使用してアプリケーションIDEを実行していることを確認します。

でアプリケーションを実行する IDE

の実行設定をセットアップしたらBasicStreamingJob、通常の Java アプリケーションのように実行またはデバッグできます。

注記

Maven によって生成された fat-jar をコマンドラインjava -jar ...から直接実行することはできません。この jar には、アプリケーションのスタンドアロン実行に必要な Flink コア依存関係は含まれていません。

アプリケーションが正常に起動すると、スタンドアロンのミニクラスターとコネクタの初期化に関する情報がログに記録されます。この後、アプリケーションの起動時に Flink が通常出力する多数のログINFOと一部のWARNログが続きます。

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

初期化が完了すると、アプリケーションはそれ以上ログエントリを出力しません。データが流れている間は、ログは出力されません。

アプリケーションがデータを正しく処理しているかどうかを確認するには、次のセクションで説明するように、入出力 Kinesis ストリームを検査できます。

注記

フローデータに関するログを出力しないことは、Flink アプリケーションの通常の動作です。すべてのレコードにログを出力するとデバッグに便利ですが、本番環境で実行するとかなりのオーバーヘッドが発生する可能性があります。

Kinesis ストリームで入出力データを監視する

Amazon Kinesis コンソールのデータビューワーを使用して、 (サンプル Python の生成) または Kinesis Data Generator (リンク) Amazon Kinesis によって入力ストリームに送信されたレコードを確認できます。

レコードを観察するには
  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. リージョンが、このチュートリアルを実行しているリージョンと同じであることを確認します。デフォルトでは、米国東部 (バージニア北部) です。リージョンが一致しない場合は、リージョンを変更します。

  3. データストリーム を選択します

  4. 監視するストリームを ExampleInputStreamまたは のいずれかで選択します。 ExampleOutputStream.

  5. データビューワータブを選択します。

  6. 任意のシャード を選択し、最新の を開始位置 のままにして、レコードの取得 を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。その場合は、レコードの取得を再試行 を選択します。ストリームディスプレイに公開された最新のレコード。

  7. データ列の値を選択して、レコードの内容を JSON 形式で検査します。

ローカルで実行されているアプリケーションを停止します。

で実行されているアプリケーションを停止しますIDE。IDE は通常、「停止」オプションを提供します。正確な場所と方法は、使用している IDE によって異なります。

アプリケーションコードをコンパイルしてパッケージ化する

このセクションでは、Apache Maven を使用して Java コードをコンパイルし、JARファイルにパッケージ化します。Maven コマンドラインツールまたは を使用して、コードをコンパイルしてパッケージ化できますIDE。

Maven コマンドラインを使用してコンパイルおよびパッケージ化するには:

Java GettingStarted プロジェクトを含むディレクトリに移動し、次のコマンドを実行します。

$ mvn package

を使用してコンパイルおよびパッケージ化するにはIDE:

IDE Maven 統合mvn packageから を実行します。

どちらの場合も、次のJARファイルが作成されます: target/amazon-msf-java-stream-app-1.0.jar

注記

から「ビルドプロジェクト」を実行しても、 JAR ファイルが作成されないIDE場合があります。

アプリケーションコードJARファイルをアップロードする

このセクションでは、前のセクションで作成したJARファイルを、このチュートリアルの冒頭で作成した Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。このステップを完了していない場合は、 (リンク) を参照してください。

アプリケーションコードJARファイルをアップロードするには
  1. で Amazon S3 コンソールを開きますhttps://console.aws.amazon.com/s3/

  2. アプリケーションコード用に以前に作成したバケットを選択します。

  3. アップロードを選択します。

  4. ファイルの追加を選択します。

  5. 前のステップで生成されたJARファイルに移動します: target/amazon-msf-java-stream-app-1.0.jar

  6. 他の設定を変更せずにアップロードを選択します。

警告

で正しいJARファイルを選択していることを確認してください<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar

target ディレクトリには、アップロードする必要のない他のJARファイルも含まれています。

Managed Service for Apache Flink アプリケーションの作成と設定

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。このチュートリアルでは、 コンソールを使用します。

注記

コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) と Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

アプリケーションの作成

アプリケーションを作成するには
  1. https://console.aws.amazon.com/flink で Apache Flink 用 Managed Serviceコンソールを開く

  2. 正しいリージョンが選択されていることを確認します: us-east-1 米国東部 (バージニア北部)

  3. 右側のメニューを開き、Apache Flink アプリケーション を選択し、ストリーミングアプリケーション を作成します。または、最初のページの「開始方法」コンテナで「ストリーミングアプリケーションの作成」を選択します。

  4. ストリーミングアプリケーションの作成ページで、次の操作を行います。

    • ストリーム処理アプリケーションを設定する方法を選択します。「最初から作成」を選択します。

    • Apache Flink 設定、Application Flink バージョン: Apache Flink 1.19 を選択します。

  5. アプリケーションを設定する

    • アプリケーション名: と入力しますMyApplication

    • 説明: と入力しますMy java test app

    • アプリケーションリソースへのアクセス: 必要なポリシーkinesis-analytics-MyApplication-us-east-1を使用してIAMロールを作成/更新を選択します。

  6. アプリケーション設定用にテンプレートを設定する

    • テンプレート: 開発 を選択します。

  7. ページの下部にあるストリーミングアプリケーションの作成を選択します。

注記

コンソールを使用して Managed Service for Apache Flink アプリケーションを作成する場合、アプリケーション用に IAMロールとポリシーを作成するオプションがあります。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらのIAMリソースの名前は、次のようにアプリケーション名とリージョンを使用して付けられます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-east-1

  • ロール: kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service for Apache Flink は、以前は Kinesis Data Analytics と呼ばれていました。自動的に作成されるリソースの名前には、下位互換性kinesis-analytics-のためにプレフィックスが付けられます。

IAM ポリシーを編集する

IAM ポリシーを編集して、Kinesis データストリームにアクセスするためのアクセス許可を追加します。

ポリシーを編集するには
  1. https://console.aws.amazon.com/iam/ で IAM コンソールを開きます。

  2. [Policies] (ポリシー) を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-east-1 ポリシーを選択します。

  3. 編集 を選択し、JSONタブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルアカウント IDs (012345678901) をアカウント ID で指定します。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. ページの下部にある次へ を選択し、変更の保存 を選択します。

アプリケーションを設定する

アプリケーション設定を編集して、アプリケーションコードアーティファクトを設定します。

設定を編集するには
  1. MyApplication ページで、 の設定 を選択します。

  2. 「アプリケーションコードの場所」セクションで:

    • Amazon S3 バケット では、アプリケーションコード用に以前に作成したバケットを選択します。参照 を選択して正しいバケットを選択し、 の選択 を選択します。バケット名はクリックしないでください。

    • [Amazon S3 オブジェクトへのパス] で、amazon-msf-java-stream-app-1.0.jarと入力します。

  3. アクセス許可 で、必要なポリシー kinesis-analytics-MyApplication-us-east-1でIAMロールを作成/更新を選択します。

  4. ランタイムプロパティ セクションで、次のプロパティを追加します。

  5. 新しい項目を追加 を選択し、次の各パラメータを追加します。

    グループ ID キー
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. 他のセクションは変更しないでください。

  7. [変更の保存] を選択します。

注記

Amazon CloudWatch ログ記録を有効にすると、Managed Service for Apache Flink によってロググループとログストリームが作成されます。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

これでアプリケーションが設定され、実行できるようになります。

アプリケーションを実行するには
  1. Amazon Managed Service for Apache Flink のコンソールで、マイアプリケーションを選択し、 の実行を選択します

  2. 次のページのアプリケーション復元設定ページで、最新のスナップショットで実行 を選択し、 を実行 を選択します。

    アプリケーションのステータスの詳細が、アプリケーションの開始Running時に から Starting Readyに、そして から に移行します。

アプリケーションが Runningステータスになったら、Flink ダッシュボードを開くことができます。

ダッシュボードを開くには
  1. Apache Flink ダッシュボードを開く を選択します。ダッシュボードが新しいページで開きます。

  2. ジョブの実行リストで、表示できる 1 つのジョブを選択します。

    注記

    ランタイムプロパティを設定したり、IAMポリシーを誤って編集したりすると、アプリケーションのステータスが になる可能性がありますがRunning、Flink ダッシュボードにはジョブが継続的に再起動していることが表示されます。これは、アプリケーションが誤って設定されているか、外部リソースにアクセスする許可がない場合によくある障害シナリオです。

    この場合、Flink ダッシュボードの例外タブをチェックして、問題の原因を確認します。

実行中のアプリケーションのメトリクスを確認する

MyApplication このページの「Amazon CloudWatch メトリクス」セクションには、実行中のアプリケーションからの基本的なメトリクスの一部が表示されます。

メトリクスを表示するには
  1. 更新ボタンの横にあるドロップダウンリストから 10 秒を選択します。

  2. アプリケーションが実行されていて正常であれば、稼働時間メトリクスが継続的に増加していることを確認できます。

  3. フル再起動メトリクスはゼロである必要があります。増加している場合は、設定に問題がある可能性があります。問題を調査するには、Flink ダッシュボードの例外タブを確認します。

  4. 正常なアプリケーションでは、失敗したチェックポイントの数メトリクスはゼロである必要があります。

    注記

    このダッシュボードには、5 分の粒度を持つメトリクスの固定セットが表示されます。ダッシュボード内の任意のメトリクスを使用して、カスタムアプリケーション CloudWatch ダッシュボードを作成できます。

Kinesis ストリームの出力データを確認する

Python スクリプトまたは Kinesis Data Generator を使用して、まだ入力にデータを公開していることを確認してください。

でデータビューワーを使用して、Apache Flink 用 Managed Service で実行されているアプリケーションの出力を確認できるようになりました。これはhttps://console.aws.amazon.com/kinesis/、以前に実行したものと同様です。

出力を表示するには
  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. リージョンがこのチュートリアルの実行に使用しているリージョンと同じであることを確認します。デフォルトでは、米国東部 (バージニア北us-east-1USです。必要に応じてリージョンを変更します。

  3. データストリーム を選択します

  4. 監視するストリームを選択します。このチュートリアルでは、ExampleOutputStream を使用します。

  5. データビューワータブを選択します。

  6. 任意のシャード を選択し、最新の を開始位置 のままにして、レコードの取得 を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。その場合は、レコードの取得を再試行 を選択します。ストリームディスプレイに公開された最新のレコード。

  7. データ列の値を選択して、レコードの内容を JSON 形式で検査します。

アプリケーションの停止

アプリケーションを停止するには、 という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動しますMyApplication

アプリケーションを停止するには
  1. アクションドロップダウンリストから、停止 を選択します。

  2. アプリケーションのステータスの詳細が から Runningに移行しStopping、アプリケーションが完全に停止Readyすると に遷移します。

    注記

    また、Python スクリプトまたは Kinesis Data Generator からの入力ストリームへのデータ送信も停止することを忘れないでください。

次のステップ

AWS リソースをクリーンアップする