ステップ 3: Flink アプリケーションの Kinesis Data Analytics の作成と実行 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 3: Flink アプリケーションの Kinesis Data Analytics の作成と実行

この演習では、データストリームを含む Kinesis Data Analytics for Flink アプリケーションを作成します。

Amazon Kinesis Data Streams を作成する

この演習用の Kinesis Data Analytics for Flink アプリケーションを作成する前に 2 つの Kinesis データストリーム (ExampleInputStreamおよびExampleOutputStream). アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次のを使用して作成できます。AWS CLIコマンド。コンソールを使用した手順については、「データストリームの作成および更新」を参照してください。

データストリームを作成するには (AWS CLI)

  1. 最初のストリーム (ExampleInputStream) を使用するには、次の Amazon Kinesis を使用します。create-stream AWS CLIコマンド。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を ExampleOutputStream に変更して同じコマンドを実行します。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

入力ストリームへのサンプルレコードの書き込み

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

  1. 次の内容で、stock.py というファイルを作成します。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
  2. このチュートリアルの後半では、アプリケーションにデータを送信する stock.py スクリプトを実行します。

    $ python stock.py

Apache Flink Streaming Java Code のダウンロードと検証

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. GettingStarted ディレクトリに移動します。

アプリケーションコードは CustomSinkStreamingJob.java ファイルと CloudWatchLogSink.java ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。

  • アプリケーションでは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

アプリケーションコードのコンパイル

このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、「演習を完了するための前提条件」を参照してください。

Java アプリケーションには、次のコンポーネントが必要です。

  • プロジェクトオブジェクトモデル (pom.xml) ファイル。このファイルには、Flink アプリケーション用の Kinesis Data Analytics for Flink アプリケーションライブラリを含む、アプリケーションの構成と依存関係に関する情報が含まれています。

  • アプリケーションのロジックを含む main メソッド。

注記

次のアプリケーションでは、Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、Apache Flink ドキュメント

アプリケーションコードを作成してコンパイルするには

  1. Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。

  2. StreamingJob.java という名前のファイルに対して次のコードを使用します。

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* if you would like to use runtime configuration properties, uncomment the lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* if you would like to use runtime configuration properties, uncomment the lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    前述のコード例については、以下の点に注意してください。

    • このファイルには、アプリケーションの機能を定義する main メソッドが含まれています。

    • アプリケーションでは、ソースおよびシンクコネクタを作成し、StreamExecutionEnvironment オブジェクトを使用して外部リソースにアクセスします。

    • アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、createSourceFromApplicationProperties および createSinkFromApplicationProperties メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。

  3. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。

    • Maven コマンドラインツールを使用します。pom.xml ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。

      mvn package
    • 開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。

    パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。AWS CLI を使用してアプリケーションを作成する場合は、コードのコンテンツタイプ (JAR または ZIP) を指定します。

  4. コンパイル中にエラーが発生した場合は、JAVA_HOME 環境変数が正しく設定されていることを確認します。

アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

target/java-getting-started-1.0.jar

Apache Flink Streaming Java Code のアップロード

このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。

アプリケーションコードをアップロードするには

  1. Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. [バケットを作成する] を選択します。

  3. Enterka-app-code-<username>()バケット名 フィールド。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[Next] (次へ) をクリックします。

  4. 設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。

  5. アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。

  6. [バケットを作成する] を選択します。

  7. Amazon S3 コンソールで、[] を選択します。ka-app-code<username>バケットを選択し、アップロード

  8. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した java-getting-started-1.0.jar ファイルに移動します。[Next] (次へ) をクリックします。

  9. アクセス許可の設定のステップでは、設定をそのままにします。[Next] (次へ) をクリックします。

  10. プロパティの設定のステップでは、設定をそのままにします。[Upload (アップロード)] を選択します。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

Kinesis Data Analytics アプリケーションを作成して実行する

Flink 用 Kinesis Data Analytics for Flink アプリケーションは、コンソールまたはAWS CLI。

注記

コンソールを使用してアプリケーションを作成する場合は、コンソールを使用してAWS Identity and Access Management(IAM) と Amazon CloudWatch ログのリソースが作成されます。AWS CLI を使用してアプリケーションを作成する場合は、これらのリソースを個別に作成します。

アプリケーションの作成と実行 (コンソール)

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

アプリケーションの作成

  1. Kinesis コンソールを開きます。https://console.aws.amazon.com/kinesis

  2. Amazon Kinesis ダッシュボードで、分析アプリケーションの作成

  3. [Kinesis Analytics - アプリケーションの作成] ページで、次のようにアプリケーションの詳細を指定します。

    • [アプリケーション名] には「MyApplication」と入力します。

    • [説明] に、「My java test app」を入力します。

    • [ランタイム] には、[Apache Flink 1.6] を選択します。

  4. [アクセス許可] には、[IAM ロールの作成 / 更新kinesis-analytics-MyApplication-us-west-2] を選択します。

    
                                    アプリケーションの作成ページの設定を示しているコンソールのスクリーンショット。
  5. [Create application] を選択します。

注記

コンソールを使用して Kinesis Data Analytics for Flink アプリケーションでは、アプリケーション用の IAM ロールとポリシーを作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesis-analytics-MyApplication-us-west-2

IAM ポリシーの編集

IAM ポリシーを編集し、Kinesis データストリームにアクセスするためのアクセス権限を追加します。

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. [ポリシー] を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  3. [概要] ページで、[ポリシーの編集] を選択します。[JSON] タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アプリケーションの設定

  1. [MyApplication] ページで、[Congirue] を選択します。

    
                                    MyApplication ページおよび設定ボタンと実行ボタンを示しているスクリーンショット。
  2. [Configure application] ページで、[Code location] を次のように指定します。

    • を使用する場合Amazon S3 バケット。に、と入力します。ka-app-code-<username>

    • を使用する場合Amazon S3 オブジェクトへのパスに、と入力します。java-getting-started-1.0.jar

  3. [Access to application resources] の [Access permissions] では、[Create / update IAM rolekinesis-analytics-MyApplication-us-west-2] を選択します。

  4. [Properties] の [Group ID] には、「ProducerConfigProperties」と入力します。

  5. 次のアプリケーションのプロパティと値を入力します。

    キー
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. [Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。

  7. [CloudWatch logging] では、[Enable] チェックボックスをオンにします。

  8. [Update (更新)] を選択します。

    
                                    この手順で説明されている設定を含む [Configure application] ページのスクリーンショット。
注記

CloudWatch ログ記録を有効にすることを選択すると、Kinesis Data Analytics によってロググループとログストリームが自動的に作成されます。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

  1. [MyApplication] ページで、[Run] を選択します。アクションを確認します。

    
                                    [MyApplication] ページと実行ボタンのスクリーンショット。
  2. アプリケーションが実行されたら、ページを更新します。コンソールには [Application graph] が示されます。


                            Application graph のスクリーンショット。

アプリケーションの停止

[MyApplication] ページで、[Stop] を選択します。アクションを確認します。


                            [MyApplication] ページと停止ボタンのスクリーンショット。

アプリケーションの更新

コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。

[MyApplication] ページで、[Congirue] を選択します。アプリケーションの設定を更新し、[更新] を選択します。

アプリケーションの作成と実行 (AWS CLI)

このセクションでは、使用するAWS CLIKinesis Data Analytics アプリケーションを作成して実行します。Flink アプリケーションの Kinesis データ分析では、kinesisanalyticsv2 AWS CLIコマンドを使用して、Kinesis データ分析アプリケーションを作成して操作できます。

アクセス許可ポリシーを作成する

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの read アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの write アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。したがって、Kinesis Data Analytics がロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。

次のコードを使用して KAReadSourceStreamWriteSinkStream アクセス許可ポリシーを作成します。置換usernameを Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アクセス権限ポリシーを作成する詳しい手順については、「」を参照してください。チュートリアル: 最初のカスタマー管理ポリシーの作成とアタッチ()IAM ユーザーガイド

注記

その他にアクセスするにはAWSのサービスを終了する場合は、AWS SDK for Java。Kinesis Data Analytics では、アプリケーションに関連付けられているサービス実行 IAM ロールの認証情報に SDK が必要とする認証情報が自動的に設定されます。追加の手順は必要ありません。

IAM ロールを作成します。

このセクションでは、Kinesis Data Analytics for Flink アプリケーションでは、ソースストリームを読み取り、シンクストリームに書き込むことができる IAM ロールを作成します。

Kinesis Data Analytics では、アクセス許可なしにがストリームにアクセスすることはできません これらのアクセス権限は、IAM ロールを使用して付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、Kinesis Data Analytics にロールを引き受けるアクセス許可を付与し、ロールを引き受けた後に Kinesis Data Analytics が実行できる操作はアクセス許可ポリシーによって決定されます。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

IAM ロールを作成するには

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. ナビゲーションペインで [ロール]、[ロールの作成] の順に選択します。

  3. []信頼されたアイデンティティの種類を選択] で、AWSサービス。[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。[ユースケースの選択] で、[Kinesis Analytics ] を選択します。

    [Next: (次へ:)] を選択します アクセス許可.

  4. リポジトリの []アクセス許可ポリシーをアタッチするページで [] を選択します。次へ: 確認. ロールを作成した後に、アクセス許可ポリシーをアタッチします。

  5. リポジトリの []ロールの作成ページで、KA-stream-rw-role向けのロール名。[ロールの作成] を選択します。

    これで、と呼ばれる、新しい IAM ロールが作成されました。KA-stream-rw-role。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

  6. アクセス許可ポリシーをロールにアタッチします。

    注記

    この演習では、Kinesis Data Analytics では、Kinesis データストリーム (ソース) からのデータの読み取りと別の Kinesis データストリームへの出力の書き込みの両方に対してこのロールを引き受けます。このため、前のステップで作成したポリシー、アクセス許可ポリシーを作成する をアタッチします。

    1. [概要] ページで、[アクセス許可] タブを選択します。

    2. [Attach Policies (ポリシーのアタッチ)] を選択します。

    3. 検索ボックスに「KAReadSourceStreamWriteSinkStream」(前のセクションで作成したポリシー) と入力します。

    4. [KAReadInputStreamWriteOutputStream] ポリシーを選択し、[ポリシーのアタッチ] を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

ロールの作成手順については、「」を参照してください。IAM ロールの作成 (コンソール)()IAM ユーザーガイド

Kinesis Data Analytics アプリケーションを作成する

  1. 次の JSON コードを create_request.json という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (username) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. 前述のリクエストを指定して CreateApplication アクションを実行し、アプリケーションを作成します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

アプリケーションの起動

このセクションでは、StartApplication アクションを使用してアプリケーションを起動します。

アプリケーションを起動するには

  1. 次の JSON コードを start_request.json という名前のファイルに保存します。

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 前述のリクエストを指定して StartApplication アクションを実行し、アプリケーションを起動します。

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

アプリケーションが実行されます。Amazon CloudWatch コンソールで Kinesis Data Analytics メトリックスをチェックして、アプリケーションが動作していることを確認します。

アプリケーションの停止

このセクションでは、StopApplication アクションを使用してアプリケーションを停止します。

アプリケーションを停止するには

  1. 次の JSON コードを stop_request.json という名前のファイルに保存します。

    {"ApplicationName": "test" }
  2. 次のリクエストを指定して StopApplication アクションを実行し、アプリケーションを停止します。

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

アプリケーションが停止します。