ステップ 3: Flink アプリケーション用の Kinesis Data Analytics を作成および実行する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 3: Flink アプリケーション用の Kinesis Data Analytics を作成および実行する

この演習では、ソースとシンクとしてデータストリームを使用して、Flink アプリケーション用の Kinesis Data Analytics を作成します。

2 つの Amazon Kinesis Data Streams を作成する

この演習の Flink アプリケーション用 Kinesis Data Analytics を作成する前に、2 つの Kinesis Data Streams (ExampleInputStreamExampleOutputStream) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールを使用した手順については、データストリームの作成および更新を参照してください。

データストリームを作成するには (AWS CLI)

  1. 次の Amazon Kinesis create-stream AWS CLI コマンドを使用して、1 つ目のストリーム (ExampleInputStream) を作成します。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を ExampleOutputStream に変更して同じコマンドを実行します。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

入力ストリームへのサンプルレコードの書き込み

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

  1. 次の内容で、stock.py という名前のファイルを作成します。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
  2. このチュートリアルの後半では、アプリケーションにデータを送信する stock.py スクリプトを実行します。

    $ python stock.py

Apache Flink Streaming Java Code のダウンロードと検証

この例の Java アプリケーションコードは、から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. GettingStarted ディレクトリに移動します。

アプリケーションコードは CustomSinkStreamingJob.java ファイルと CloudWatchLogSink.java ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。

  • アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

アプリケーションコードのコンパイル

このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、演習を完了するための前提条件を参照してください。

Java アプリケーションには、次のコンポーネントが必要です。

  • プロジェクトオブジェクトモデル (pom.xml) ファイル。このファイルには、Kinesis Data Analytics for Flink Applications ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。

  • アプリケーションのロジックを含む main メソッド。

注記

次のアプリケーション用の Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、Apache Flink ドキュメントで説明されているように構築する必要があります。

アプリケーションコードを作成してコンパイルするには

  1. Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。

  2. StreamingJob.java という名前のファイルに対して次のコードを使用します。

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* if you would like to use runtime configuration properties, uncomment the lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* if you would like to use runtime configuration properties, uncomment the lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    前述のコード例については、以下の点に注意してください。

    • このファイルには、アプリケーションの機能を定義する main メソッドが含まれています。

    • アプリケーションでは、ソースおよびシンクコネクタを作成し、StreamExecutionEnvironment オブジェクトを使用して外部リソースにアクセスします。

    • アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、createSourceFromApplicationProperties および createSinkFromApplicationProperties メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。

  3. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。

    • Maven コマンドラインツールを使用します。pom.xml ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。

      mvn package
    • 開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。

    パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。AWS CLI を使用してアプリケーションを作成する場合は、コードのコンテンツタイプ (JAR または ZIP) を指定します。

  4. コンパイル中にエラーが発生した場合は、JAVA_HOME 環境変数が正しく設定されていることを確認します。

アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

target/java-getting-started-1.0.jar

Apache Flink Streaming Java Code のアップロード

このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。

アプリケーションコードをアップロードするには

  1. Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. [バケットを作成する] を選択します。

  3. [Bucket name (バケット名)] フィールドにka-app-code-<username>と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[Next] (次へ) を選択します。

  4. 設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。

  5. アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。

  6. バケットの作成 を選択します。

  7. Amazon S3 コンソールで、ka-app-code- <username>バケットを選択し、[アップロード] を選択します。

  8. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した java-getting-started-1.0.jar ファイルに移動します。[Next] (次へ) を選択します。

  9. アクセス許可の設定のステップでは、設定をそのままにします。[Next] (次へ) を選択します。

  10. プロパティの設定のステップでは、設定をそのままにします。[Upload] (アップロード) を選択します。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

Kinesis Data Analytics アプリケーションを作成して実行する

コンソールまたは AWS CLI を使用して Flink アプリケーション用 Kinesis Data Analytics を作成および実行できます。

注記

コンソールを使用してアプリケーションを作成する場合は、AWS Identity and Access Management (IAM) と Amazon CloudWatch Logs のリソースが作成されます。AWS CLI を使用してアプリケーションを作成する場合は、これらのリソースを個別に作成します。

アプリケーションの作成と実行 (コンソール)

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

アプリケーションの作成

  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. Amazon Kinesis ダッシュボードで、[分析アプリケーションを作成する] を選択します。

  3. [Kinesis Analytics - アプリケーションの作成] ページで、次のようにアプリケーションの詳細を指定します。

    • [アプリケーション名] にはMyApplicationと入力します。

    • [Description (説明)] にMy java test appと入力します。

    • [ランタイム] には、[Apache Flink 1.6] を選択します。

  4. [アクセス許可] には、[IAM ロールの作成 / 更新kinesis-analytics-MyApplication-us-west-2] を選択します。

  5. [Create application] を選択します。

注記

コンソールを使用して Flink アプリケーション用 Kinesis Data Analytics を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesis-analytics-MyApplication-us-west-2

IAM ポリシーの編集

IAM ポリシーを編集し、Kinesis Data Streams にアクセスするための許可を追加します。

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. [Policies] (ポリシー) を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  3. [概要] ページで、[ポリシーの編集] を選択します。[JSON] タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アプリケーションの設定

  1. MyApplicationページで [設定] を選択します。

  2. [Configure application] ページで、[Code location] を次のように指定します。

    • [Amazon S3 バケット] で、ka-app-code-<username>と入力します。

    • [Amazon S3 オブジェクトへのパス] で、java-getting-started-1.0.jarと入力します。

  3. [Access to application resources] の [Access permissions] では、[Create / update IAM rolekinesis-analytics-MyApplication-us-west-2] を選択します。

  4. [Properties] の [Group ID] には、ProducerConfigPropertiesと入力します。

  5. 次のアプリケーションのプロパティと値を入力します。

    キー
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. [Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。

  7. CloudWatch ロギングするには、「有効にする」チェックボックスを選択します。

  8. [Update] (更新) を選択します。

注記

CloudWatch ログ記録を有効にすることを選択すると、ロググループとログストリームが Kinesis Data Analytics によって作成されます。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

  1. MyApplicationページで [実行] を選択します。アクションを確認します。

  2. アプリケーションが実行されたら、ページを更新します。コンソールには [Application graph] が示されます。

アプリケーションの停止

MyApplicationページで [停止] を選択します。アクションを確認します。

アプリケーションの更新

コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。

MyApplicationページで [設定] を選択します。アプリケーションの設定を更新し、[更新] を選択します。

アプリケーションの作成と実行 (AWS CLI)

このセクションでは、AWS CLI を使用して、Kinesis Data Analytics アプリケーションを作成および実行します。Flink アプリケーション用の Kinesis Data Analytics では、kinesisanalyticsv2 AWS CLI コマンドを使用して、Kinesis Data Analytics アプリケーションを作成して操作します。

アクセス許可ポリシーを作成する

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの read アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの write アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、Kinesis Data Analytics がこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要な許可がサービスに付与されます。

次のコードを使用して KAReadSourceStreamWriteSinkStream アクセス許可ポリシーを作成します。username を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

step-by-step アクセス権限ポリシーを作成する手順については、「チュートリアル」を参照してください。IAM ユーザーガイドで初めてのカスタマー管理ポリシーを作成して添付してください

注記

その他の AWS サービスにアクセスするには、AWS SDK for Java を使用します。Kinesis Data Analytics は、アプリケーションに関連付けられているサービス実行 IAM ロールの認証情報に SDK が必要とする認証情報を自動的に設定します。追加の手順は必要ありません。

IAM ロールを作成します。

このセクションでは Kinesis Data Analytics for Flink application がソースストリームを読み取り、シンクストリームに書き込むことができる IAM ロールを作成します。

許可なしに Kinesis Data Analytics がストリームにアクセスすることはできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受ける Kinesis Data Analytics 許可を付与し、ロールを引き受けた後に Kinesis Data Analytics が実行できる操作は許可ポリシーによって決定されます。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

IAM ロールを作成するには

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. ナビゲーションペインで [Roles (ロール)]、[Create Role (ロールの作成)] の順に選択します。

  3. [信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。[ユースケースの選択] で、[Kinesis Analytics ] を選択します。

    [Next: (次へ:)] を選択します アクセス許可.

  4. アクセス権限ポリシーをアタッチ」ページで、「次へ」を選択します。確認. ロールを作成した後に、アクセス許可ポリシーをアタッチします。

  5. [Create role (ロールの作成)] ページで、ロールの名前KA-stream-rw-role を入力します。[ロールの作成] を選択します。

    これで、KA-stream-rw-role と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

  6. アクセス許可ポリシーをロールにアタッチします。

    注記

    この演習では、Kinesis Data Analytics は Kinesis Data Streams (ソース) からのデータの読み込みと別の Kinesis Data Streams への出力の書き込みの両方に対してこのロールを引き受けます。このため、前のステップで作成したポリシー、アクセス許可ポリシーを作成する をアタッチします。

    1. [概要] ページで、[アクセス許可] タブを選択します。

    2. [Attach Policies (ポリシーのアタッチ)] を選択します。

    3. 検索ボックスにKAReadSourceStreamWriteSinkStream(前のセクションで作成したポリシー) と入力します。

    4. KAReadInputStreamWriteOutputStream ポリシーを選択し、[ポリシーのアタッチ] を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

step-by-step ロールを作成する手順については、IAM ユーザーガイドの「IAM ロールの作成 (コンソール)」を参照してください

Kinesis Data Analytics アプリケーションを作成する

  1. 次の JSON コードを create_request.json という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (username) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. 前述のリクエストを指定して CreateApplication アクションを実行し、アプリケーションを作成します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

アプリケーションの起動

このセクションでは、StartApplication アクションを使用してアプリケーションを起動します。

アプリケーションを起動するには

  1. 次の JSON コードを start_request.json という名前のファイルに保存します。

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 前述のリクエストを指定して StartApplication アクションを実行し、アプリケーションを起動します。

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

アプリケーションが実行されます。Amazon CloudWatch Console で Kinesis Data Analytics メトリクスをチェックして、アプリケーションが動作していることを確認します。

アプリケーションの停止

このセクションでは、StopApplication アクションを使用してアプリケーションを停止します。

アプリケーションを停止するには

  1. 次の JSON コードを stop_request.json という名前のファイルに保存します。

    {"ApplicationName": "test" }
  2. 次のリクエストを指定して StopApplication アクションを実行し、アプリケーションを停止します。

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

アプリケーションが停止します。