Apache Flink アプリケーション用 Managed Serviceを作成して実行する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Flink アプリケーション用 Managed Serviceを作成して実行する

この演習では、データストリームをソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。

2 つの Amazon Kinesis Data Streams を作成する

この演習でAmazon Managed Service for Apache Flink を作成する前に、2 つの Kinesis データストリーム (ExampleInputStreamExampleOutputStream) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールを使用した手順については、データストリームの作成および更新を参照してください。

データストリームを作成するには (AWS CLI)
  1. 最初のストリーム (ExampleInputStream) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を ExampleOutputStream に変更して同じコマンドを実行します。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

入力ストリームへのサンプルレコードの書き込み

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

  1. 次の内容で、stock.py という名前のファイルを作成します。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. このチュートリアルの後半では、アプリケーションにデータを送信する stock.py スクリプトを実行します。

    $ python stock.py

Apache Flink Streaming Java Code のダウンロードと検証

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. GettingStarted ディレクトリに移動します。

アプリケーションコードは CustomSinkStreamingJob.java ファイルと CloudWatchLogSink.java ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。

  • アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

アプリケーションコードのコンパイル

このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、演習を完了するための前提条件を参照してください。

Java アプリケーションには、次のコンポーネントが必要です。

  • プロジェクトオブジェクトモデル (pom.xml) ファイル。ファイルには、Amazon Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。

  • アプリケーションのロジックを含む main メソッド。

注記

次のアプリケーション用の Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、Apache Flink ドキュメントで説明されているように構築する必要があります。

アプリケーションコードを作成してコンパイルするには
  1. Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。

  2. StreamingJob.java という名前のファイルに対して次のコードを使用します。

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    前述のコード例については、以下の点に注意してください。

    • このファイルには、アプリケーションの機能を定義する main メソッドが含まれています。

    • アプリケーションでは、ソースおよびシンクコネクタを作成し、StreamExecutionEnvironment オブジェクトを使用して外部リソースにアクセスします。

    • アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、createSourceFromApplicationProperties および createSinkFromApplicationProperties メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。

  3. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。

    • Maven コマンドラインツールを使用します。pom.xml ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。

      mvn package
    • 開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。

    パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。を使用してアプリケーションを作成する場合は AWS CLI、コードコンテンツタイプ (JAR または ZIP) を指定します。

  4. コンパイル中にエラーが発生した場合は、JAVA_HOME 環境変数が正しく設定されていることを確認します。

アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

target/java-getting-started-1.0.jar

Apache Flink Streaming Java Code のアップロードしてください

このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。

アプリケーションコードをアップロードするには
  1. Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. [バケットを作成] を選択します。

  3. [Bucket name (バケット名)] フィールドにka-app-code-<username>と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[Next (次へ)] を選択します。

  4. 設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。

  5. アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。

  6. [バケットを作成] を選択します。

  7. Amazon S3 コンソールで ka-app-code-<username> バケットを選択し、[アップロード] を選択します。

  8. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した java-getting-started-1.0.jar ファイルに移動します。[Next (次へ)] を選択します。

  9. アクセス許可の設定のステップでは、設定をそのままにします。[Next (次へ)] を選択します。

  10. プロパティの設定のステップでは、設定をそのままにします。[アップロード] を選択します。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

Managed Service for Apache Flink アプリケーションを作成して実行する

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。

注記

コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) と Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

アプリケーションを作成して実行する (コンソール)

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

アプリケーションの作成

  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. Amazon Kinesis ダッシュボードで、[分析アプリケーションを作成する] を選択します。

  3. [Kinesis Analytics - アプリケーションの作成] ページで、次のようにアプリケーションの詳細を指定します。

    • [アプリケーション名] にはMyApplicationと入力します。

    • [Description (説明)] にMy java test appと入力します。

    • [ランタイム] には、[Apache Flink 1.6] を選択します。

  4. [アクセス許可] には、[IAM ロールの作成 / 更新kinesis-analytics-MyApplication-us-west-2] を選択します。

  5. [Create application] を選択します。

注記

コンソールを使用して Amazon Managed Service for Apache Flink を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesis-analytics-MyApplication-us-west-2

IAM ポリシーを編集する

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. [ポリシー] を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  3. [概要] ページで、[ポリシーの編集] を選択します。[JSON] タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アプリケーションを設定する

  1. [MyApplication] ページで、[Congirue] を選択します。

  2. [Configure application] ページで、[Code location] を次のように指定します。

    • [Amazon S3 バケット] で、ka-app-code-<username>と入力します。

    • [Amazon S3 オブジェクトへのパス] で、java-getting-started-1.0.jarと入力します。

  3. [Access to application resources] の [Access permissions] では、[Create / update IAM rolekinesis-analytics-MyApplication-us-west-2] を選択します。

  4. [Properties] の [Group ID] には、ProducerConfigPropertiesと入力します。

  5. 次のアプリケーションのプロパティと値を入力します。

    キー
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. [Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。

  7. [CloudWatch logging] では、[Enable] チェックボックスをオンにします。

  8. [Update] (更新) を選択します。

注記

CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

  1. [MyApplication] ページで、[Run] を選択します。アクションを確認します。

  2. アプリケーションが実行されたら、ページを更新します。コンソールには [Application graph] が示されます。

アプリケーションを停止する

[MyApplication] ページで、[Stop] を選択します。アクションを確認します。

アプリケーションの更新

コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。

[MyApplication] ページで、[Congirue] を選択します。アプリケーションの設定を更新し、[更新] を選択します。

アプリケーションを作成して実行する (AWS CLI)

このセクションでは、 を使用して Managed Service for Apache Flink アプリケーション AWS CLI を作成して実行します。Managed Service for Apache Flink は、 kinesisanalyticsv2 AWS CLI コマンドを使用して Managed Service for Apache Flink アプリケーションを作成して操作します。

アクセス許可ポリシーを作成する

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの read アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの write アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。

次のコードを使用して KAReadSourceStreamWriteSinkStream アクセス許可ポリシーを作成します。username を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

許可ポリシーを作成する詳しい手順については、IAM ユーザーガイドチュートリアル: はじめてのカスタマーマネージドポリシーの作成とアタッチを参照してください。

注記

他の AWS サービスにアクセスするには、 を使用できます AWS SDK for Java。Managed Service for Apache Flink は、アプリケーションに関連付けられているサービス実行 IAM ロールに、SDK が必要とする認証情報を自動的に設定します。追加の手順は必要ありません。

IAM ロールを作成します。

このセクションでは、Managed Service for Apache Flink がソースストリームを読み取り、シンクストリームに書き込むために想定できる IAM ロールを作成します。

Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

IAM ロールを作成するには
  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. ナビゲーションペインで [Roles (ロール)]、[Create Role (ロールの作成)] の順に選択します。

  3. [信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。[ユースケースの選択] で、[Kinesis Analytics ] を選択します。

    [Next: Permissions] (次へ: アクセス許可) を選択します。

  4. [アクセス権限ポリシーをアタッチする] ページで、[Next: Review] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。

  5. [Create role (ロールの作成)] ページで、ロールの名前KA-stream-rw-role を入力します。[ロールの作成] を選択します。

    これで、KA-stream-rw-role と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

  6. アクセス許可ポリシーをロールにアタッチします。

    注記

    この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。このため、前のステップで作成したポリシー、アクセス許可ポリシーを作成する をアタッチします。

    1. [概要] ページで、[アクセス許可] タブを選択します。

    2. [Attach Policies (ポリシーのアタッチ)] を選択します。

    3. 検索ボックスにKAReadSourceStreamWriteSinkStream(前のセクションで作成したポリシー) と入力します。

    4. [KAReadInputStreamWriteOutputStream] ポリシーを選択し、[ポリシーのアタッチ] を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

ロールを作成する手順については、IAM ユーザーガイドIAM ロールの作成 (コンソール)を参照してください。

Apache Flink アプリケーション用 Managed Serviceの作成

  1. 次の JSON コードを create_request.json という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (username) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. 前述のリクエストを指定して CreateApplication アクションを実行し、アプリケーションを作成します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

アプリケーションの起動

このセクションでは、StartApplication アクションを使用してアプリケーションを起動します。

アプリケーションを起動するには
  1. 次の JSON コードを start_request.json という名前のファイルに保存します。

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 前述のリクエストを指定して StartApplication アクションを実行し、アプリケーションを起動します。

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。

アプリケーションの停止

このセクションでは、StopApplication アクションを使用してアプリケーションを停止します。

アプリケーションを停止するには
  1. 次の JSON コードを stop_request.json という名前のファイルに保存します。

    {"ApplicationName": "test" }
  2. 次のリクエストを指定して StopApplication アクションを実行し、アプリケーションを停止します。

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

アプリケーションが停止します。