ステップ 3: Flink アプリケーション向けの Managed Service for Apache Flink を作成して実行する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 3: Flink アプリケーション向けの Managed Service for Apache Flink を作成して実行する

この演習では、データストリームをソースおよびシンクとして使用して、Flink アプリケーション向けの Managed Service for Apache Flink を作成します。

2 つの Amazon Kinesis Data Streams を作成する

この演習の Flink アプリケーション向けの Managed Service for Apache Flink を作成する前に、2 つの Kinesis データストリーム (ExampleInputStreamExampleOutputStream) を作成してください。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールを使用した手順については、データストリームの作成および更新を参照してください。

データストリームを作成するには (AWS CLI)
  1. 最初のストリーム (ExampleInputStream) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を ExampleOutputStream に変更して同じコマンドを実行します。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

入力ストリームへのサンプルレコードの書き込み

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

  1. 次の内容で、stock.py という名前のファイルを作成します。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. このチュートリアルの後半では、アプリケーションにデータを送信する stock.py スクリプトを実行します。

    $ python stock.py

Apache Flink Streaming Java Code のダウンロードと検証

この例の Java アプリケーションコードは、 GitHubから入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. GettingStarted ディレクトリに移動します。

アプリケーションコードは CustomSinkStreamingJob.java ファイルと CloudWatchLogSink.java ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。

  • アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

アプリケーションコードのコンパイル

このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、演習を完了するための前提条件を参照してください。

Java アプリケーションには、次のコンポーネントが必要です。

  • プロジェクトオブジェクトモデル (pom.xml) ファイル。このファイルには、Flink アプリケーション向けの Amazon Managed Service for Apache Flink のライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。

  • アプリケーションのロジックを含む main メソッド。

注記

次のアプリケーション用の Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、Apache Flink ドキュメントで説明されているように構築する必要があります。

アプリケーションコードを作成してコンパイルするには
  1. Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。

  2. StreamingJob.java という名前のファイルに対して次のコードを使用します。

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    前述のコード例については、以下の点に注意してください。

    • このファイルには、アプリケーションの機能を定義する main メソッドが含まれています。

    • アプリケーションでは、ソースおよびシンクコネクタを作成し、StreamExecutionEnvironment オブジェクトを使用して外部リソースにアクセスします。

    • アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、createSourceFromApplicationProperties および createSinkFromApplicationProperties メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。

  3. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。

    • Maven コマンドラインツールを使用します。pom.xml ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。

      mvn package
    • 開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。

    パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。を使用してアプリケーションを作成する場合は AWS CLI、コードコンテンツタイプ (JAR または ZIP) を指定します。

  4. コンパイル中にエラーが発生した場合は、JAVA_HOME 環境変数が正しく設定されていることを確認します。

アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

target/java-getting-started-1.0.jar

Apache Flink Streaming Java Code のアップロードしてください

このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。

アプリケーションコードをアップロードするには
  1. Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. [バケットを作成] を選択します。

  3. [Bucket name (バケット名)] フィールドにka-app-code-<username>と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[次へ] をクリックします。

  4. 設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。

  5. アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。

  6. [バケットを作成] を選択します。

  7. Amazon S3 コンソールで ka-app-code- <username>バケットを選択し、[アップロード] を選択します。

  8. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した java-getting-started-1.0.jar ファイルに移動します。[次へ] をクリックします。

  9. アクセス許可の設定のステップでは、設定をそのままにします。[次へ] をクリックします。

  10. プロパティの設定のステップでは、設定をそのままにします。[アップロード] を選択します。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

Managed Service for Apache Flink アプリケーションを作成して実行する

Flink アプリケーション向けの Managed Service for Apache Flink は、コンソールまたは AWS CLIのいずれかを使用して作成し、実行することができます。

注記

コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) リソースと Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成する場合 AWS CLI、これらのリソースは個別に作成します。

アプリケーションの作成と実行 (コンソール)

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

アプリケーションの作成

  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. Amazon Kinesis ダッシュボードで、[分析アプリケーションを作成する] を選択します。

  3. [Kinesis Analytics - アプリケーションの作成] ページで、次のようにアプリケーションの詳細を指定します。

    • [アプリケーション名] にはMyApplicationと入力します。

    • [Description (説明)] にMy java test appと入力します。

    • [ランタイム] には、[Apache Flink 1.6] を選択します。

  4. [アクセス許可] には、[IAM ロールの作成 / 更新kinesis-analytics-MyApplication-us-west-2] を選択します。

  5. [Create application] を選択します。

注記

コンソールを使用して Flink アプリケーション向けの Managed Service for Apache Flink を作成するときは、アプリケーション用の IAM ロールとポリシーを作成するオプションがあります。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesis-analytics-MyApplication-us-west-2

IAM ポリシーの編集

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. [ポリシー] を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  3. [概要] ページで、[ポリシーの編集] を選択します。[JSON] タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アプリケーションの設定

  1. MyApplicationページで [設定] を選択します。

  2. [Configure application] ページで、[Code location] を次のように指定します。

    • [Amazon S3 バケット] で、ka-app-code-<username>と入力します。

    • [Amazon S3 オブジェクトへのパス] で、java-getting-started-1.0.jarと入力します。

  3. [Access to application resources] の [Access permissions] では、[Create / update IAM rolekinesis-analytics-MyApplication-us-west-2] を選択します。

  4. [Properties] の [Group ID] には、ProducerConfigPropertiesと入力します。

  5. 次のアプリケーションのプロパティと値を入力します。

    キー
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. [Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。

  7. CloudWatch ロギングするには、「有効化」チェックボックスを選択します。

  8. [更新] を選択します。

注記

CloudWatch ロギングを有効にすると、Apache Flink 用マネージドサービスによって自動的にロググループとログストリームが作成されます。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

  1. MyApplicationページで [実行] を選択します。アクションを確認します。

  2. アプリケーションが実行されたら、ページを更新します。コンソールには [Application graph] が示されます。

アプリケーションの停止

MyApplicationページで [Stop] を選択します。アクションを確認します。

アプリケーションの更新

コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。

MyApplicationページで [設定] を選択します。アプリケーションの設定を更新し、[更新] を選択します。

アプリケーションの作成と実行 (AWS CLI)

このセクションでは、を使用して Apache Flink 用管理サービスアプリケーションを作成して実行します。 AWS CLI Apache Flink アプリケーション用マネージドサービスは、kinesisanalyticsv2 AWS CLI コマンドを使用して Apache Flink アプリケーション用マネージドサービスを作成し、操作します。

アクセス許可ポリシーを作成する

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの read アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの write アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。

次のコードを使用して KAReadSourceStreamWriteSinkStream アクセス許可ポリシーを作成します。username を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

step-by-step アクセス権限ポリシーの作成手順については、IAM ユーザーガイドの「チュートリアル:初めてのカスタマー管理ポリシーを作成してアタッチする」を参照してください。

注記

AWS 他のサービスにアクセスするには、を使用できます。 AWS SDK for Java Managed Service for Apache Flink は、アプリケーションに関連付けられているサービス実行 IAM ロールに、SDK が必要とする認証情報を自動的に設定します。追加の手順は必要ありません。

IAM ロールを作成します。

このセクションでは、Flink アプリケーション向けの Managed Service for Apache Flink がソースストリームを読み取り、シンクストリームに書き込むために引き受けることができる IAM ロールを作成します。

Managed Service for Apache Flink が、許可のないままストリームにアクセスすることはできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

IAM ロールを作成するには
  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. ナビゲーションペインで [Roles (ロール)]、[Create Role (ロールの作成)] の順に選択します。

  3. [信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。[ユースケースの選択] で、[Kinesis Analytics ] を選択します。

    [次のステップ: アクセス許可] を選択します。

  4. [アクセス権限ポリシーをアタッチする] ページで、[Next: Review] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。

  5. [Create role (ロールの作成)] ページで、ロールの名前KA-stream-rw-role を入力します。[ロールを作成] を選択します。

    これで、KA-stream-rw-role と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

  6. アクセス許可ポリシーをロールにアタッチします。

    注記

    この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。このため、前のステップで作成したポリシー、アクセス許可ポリシーを作成する をアタッチします。

    1. [概要] ページで、[アクセス許可] タブを選択します。

    2. [Attach Policies (ポリシーのアタッチ)] を選択します。

    3. 検索ボックスにKAReadSourceStreamWriteSinkStream(前のセクションで作成したポリシー) と入力します。

    4. KA ReadInputStreamWriteOutputStream ポリシーを選択し、[ポリシーをアタッチ] を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

step-by-step ロールの作成方法については、『IAM ユーザーガイドの「IAM ロールの作成 (コンソール)」を参照してください。

Apache Flink アプリケーション用 Managed Serviceの作成

  1. 次の JSON コードを create_request.json という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (username) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. 前述のリクエストを指定して CreateApplication アクションを実行し、アプリケーションを作成します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

アプリケーションの起動

このセクションでは、StartApplication アクションを使用してアプリケーションを起動します。

アプリケーションを起動するには
  1. 次の JSON コードを start_request.json という名前のファイルに保存します。

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 前述のリクエストを指定して StartApplication アクションを実行し、アプリケーションを起動します。

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

アプリケーションが実行されます。Amazon CloudWatch コンソールで Apache Flink 用マネージドサービスのメトリックスを確認して、アプリケーションが動作していることを確認できます。

アプリケーションの停止

このセクションでは、StopApplication アクションを使用してアプリケーションを停止します。

アプリケーションを停止するには
  1. 次の JSON コードを stop_request.json という名前のファイルに保存します。

    {"ApplicationName": "test" }
  2. 次のリクエストを指定して StopApplication アクションを実行し、アプリケーションを停止します。

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

アプリケーションが停止します。