翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Flink アプリケーション用 Managed Serviceを作成して実行する
この演習では、データストリームをソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。
このセクションには、以下のステップが含まれています。
2 つの Amazon Kinesis Data Streams を作成する
この演習でAmazon Managed Service for Apache Flink を作成する前に、2 つの Kinesis データストリーム (ExampleInputStream
と ExampleOutputStream
) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。
これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールを使用した手順については、データストリームの作成および更新を参照してください。
データストリームを作成するには (AWS CLI)
-
最初のストリーム (
ExampleInputStream
) を作成するには、次の Amazon Kinesiscreate-stream
AWS CLI コマンドを使用します。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を
ExampleOutputStream
に変更して同じコマンドを実行します。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
入力ストリームへのサンプルレコードの書き込み
このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。
注記
このセクションでは AWS SDK for Python (Boto)
-
次の内容で、
stock.py
という名前のファイルを作成します。import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
-
このチュートリアルの後半では、アプリケーションにデータを送信する
stock.py
スクリプトを実行します。$ python stock.py
Apache Flink Streaming Java Code のダウンロードと検証
この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。
-
次のコマンドを使用してリモートリポジトリのクローンを作成します。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
-
GettingStarted
ディレクトリに移動します。
アプリケーションコードは CustomSinkStreamingJob.java
ファイルと CloudWatchLogSink.java
ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。
-
アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
アプリケーションコードのコンパイル
このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、演習を完了するための前提条件を参照してください。
Java アプリケーションには、次のコンポーネントが必要です。
-
プロジェクトオブジェクトモデル (pom.xml)
ファイル。ファイルには、Amazon Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。 -
アプリケーションのロジックを含む
main
メソッド。
注記
次のアプリケーション用の Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、Apache Flink ドキュメント
アプリケーションコードを作成してコンパイルするには
-
Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。
-
StreamingJob.java
という名前のファイルに対して次のコードを使用します。package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
前述のコード例については、以下の点に注意してください。
-
このファイルには、アプリケーションの機能を定義する
main
メソッドが含まれています。 -
アプリケーションでは、ソースおよびシンクコネクタを作成し、
StreamExecutionEnvironment
オブジェクトを使用して外部リソースにアクセスします。 -
アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、
createSourceFromApplicationProperties
およびcreateSinkFromApplicationProperties
メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。
-
-
アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。
-
Maven コマンドラインツールを使用します。
pom.xml
ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。mvn package
-
開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。
パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。を使用してアプリケーションを作成する場合は AWS CLI、コードコンテンツタイプ (JAR または ZIP) を指定します。
-
-
コンパイル中にエラーが発生した場合は、
JAVA_HOME
環境変数が正しく設定されていることを確認します。
アプリケーションのコンパイルに成功すると、次のファイルが作成されます。
target/java-getting-started-1.0.jar
Apache Flink Streaming Java Code のアップロードしてください
このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。
アプリケーションコードをアップロードするには
Amazon S3 コンソール (https://console.aws.amazon.com/s3/
) を開きます。 -
[バケットを作成] を選択します。
-
[Bucket name (バケット名)] フィールドに
ka-app-code-
と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[Next (次へ)] を選択します。<username>
-
設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。
-
アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。
-
[バケットを作成] を選択します。
-
Amazon S3 コンソールで ka-app-code-
<username>
バケットを選択し、[アップロード] を選択します。 -
ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した
java-getting-started-1.0.jar
ファイルに移動します。[Next (次へ)] を選択します。 -
アクセス許可の設定のステップでは、設定をそのままにします。[Next (次へ)] を選択します。
-
プロパティの設定のステップでは、設定をそのままにします。[アップロード] を選択します。
アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。
Managed Service for Apache Flink アプリケーションを作成して実行する
コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。
注記
コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) と Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。
アプリケーションを作成して実行する (コンソール)
以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。
アプリケーションの作成
Kinesis コンソール (https://console.aws.amazon.com/kinesis
) を開きます。 -
Amazon Kinesis ダッシュボードで、[分析アプリケーションを作成する] を選択します。
-
[Kinesis Analytics - アプリケーションの作成] ページで、次のようにアプリケーションの詳細を指定します。
-
[アプリケーション名] には
MyApplication
と入力します。 -
[Description (説明)] に
My java test app
と入力します。 -
[ランタイム] には、[Apache Flink 1.6] を選択します。
-
-
[アクセス許可] には、[IAM ロールの作成 / 更新
kinesis-analytics-MyApplication-us-west-2
] を選択します。 -
[Create application] を選択します。
注記
コンソールを使用して Amazon Managed Service for Apache Flink を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。
-
ポリシー:
kinesis-analytics-service-
MyApplication
-us-west-2
-
ロール:
kinesis-analytics-
MyApplication
-us-west-2
IAM ポリシーを編集する
IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。
IAM コンソール (https://console.aws.amazon.com/iam/
) を開きます。 -
[ポリシー] を選択します。前のセクションでコンソールによって作成された
kinesis-analytics-service-MyApplication-us-west-2
ポリシーを選択します。 -
[概要] ページで、[ポリシーの編集] を選択します。[JSON] タブを選択します。
-
次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (
012345678901
) を自分のアカウント ID に置き換えます。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
] }
アプリケーションを設定する
-
[MyApplication] ページで、[Congirue] を選択します。
-
[Configure application] ページで、[Code location] を次のように指定します。
-
[Amazon S3 バケット] で、
ka-app-code-
と入力します。<username>
-
[Amazon S3 オブジェクトへのパス] で、
java-getting-started-1.0.jar
と入力します。
-
-
[Access to application resources] の [Access permissions] では、[Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
] を選択します。 -
[Properties] の [Group ID] には、
ProducerConfigProperties
と入力します。 -
次のアプリケーションのプロパティと値を入力します。
キー 値 flink.inputstream.initpos
LATEST
aws:region
us-west-2
AggregationEnabled
false
-
[Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。
-
[CloudWatch logging] では、[Enable] チェックボックスをオンにします。
-
[Update] (更新) を選択します。
注記
CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。
-
ロググループ:
/aws/kinesis-analytics/MyApplication
-
ログストリーム:
kinesis-analytics-log-stream
アプリケーションを実行する
-
[MyApplication] ページで、[Run] を選択します。アクションを確認します。
-
アプリケーションが実行されたら、ページを更新します。コンソールには [Application graph] が示されます。
アプリケーションを停止する
[MyApplication] ページで、[Stop] を選択します。アクションを確認します。
アプリケーションの更新
コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。
[MyApplication] ページで、[Congirue] を選択します。アプリケーションの設定を更新し、[更新] を選択します。
アプリケーションを作成して実行する (AWS CLI)
このセクションでは、 を使用して Managed Service for Apache Flink アプリケーション AWS CLI を作成して実行します。Managed Service for Apache Flink は、 kinesisanalyticsv2
AWS CLI コマンドを使用して Managed Service for Apache Flink アプリケーションを作成して操作します。
アクセス許可ポリシーを作成する
まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの read
アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの write
アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。
次のコードを使用して KAReadSourceStreamWriteSinkStream
アクセス許可ポリシーを作成します。
を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (username
) を自分のアカウント ID に置き換えます。012345678901
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-
username
", "arn:aws:s3:::ka-app-code-username
/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
許可ポリシーを作成する詳しい手順については、IAM ユーザーガイドのチュートリアル: はじめてのカスタマーマネージドポリシーの作成とアタッチを参照してください。
注記
他の AWS サービスにアクセスするには、 を使用できます AWS SDK for Java。Managed Service for Apache Flink は、アプリケーションに関連付けられているサービス実行 IAM ロールに、SDK が必要とする認証情報を自動的に設定します。追加の手順は必要ありません。
IAM ロールを作成します。
このセクションでは、Managed Service for Apache Flink がソースストリームを読み取り、シンクストリームに書き込むために想定できる IAM ロールを作成します。
Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。
前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。
IAM ロールを作成するには
IAM コンソール (https://console.aws.amazon.com/iam/
) を開きます。 -
ナビゲーションペインで [Roles (ロール)]、[Create Role (ロールの作成)] の順に選択します。
-
[信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。[ユースケースの選択] で、[Kinesis Analytics ] を選択します。
[Next: Permissions] (次へ: アクセス許可) を選択します。
-
[アクセス権限ポリシーをアタッチする] ページで、[Next: Review] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。
-
[Create role (ロールの作成)] ページで、ロールの名前に
KA-stream-rw-role
を入力します。[ロールの作成] を選択します。これで、
KA-stream-rw-role
と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。 -
アクセス許可ポリシーをロールにアタッチします。
注記
この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。このため、前のステップで作成したポリシー、アクセス許可ポリシーを作成する をアタッチします。
-
[概要] ページで、[アクセス許可] タブを選択します。
-
[Attach Policies (ポリシーのアタッチ)] を選択します。
-
検索ボックスに
KAReadSourceStreamWriteSinkStream
(前のセクションで作成したポリシー) と入力します。 -
[KAReadInputStreamWriteOutputStream] ポリシーを選択し、[ポリシーのアタッチ] を選択します。
-
これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。
ロールを作成する手順については、IAM ユーザーガイドの IAM ロールの作成 (コンソール)を参照してください。
Apache Flink アプリケーション用 Managed Serviceの作成
-
次の JSON コードを
create_request.json
という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (
) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (username
) を、自分のアカウント ID に置き換えます。012345678901
{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::
012345678901
:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username
", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
前述のリクエストを指定して
CreateApplication
アクションを実行し、アプリケーションを作成します。aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。
アプリケーションの起動
このセクションでは、StartApplication
アクションを使用してアプリケーションを起動します。
アプリケーションを起動するには
-
次の JSON コードを
start_request.json
という名前のファイルに保存します。{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
-
前述のリクエストを指定して
StartApplication
アクションを実行し、アプリケーションを起動します。aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。
アプリケーションの停止
このセクションでは、StopApplication
アクションを使用してアプリケーションを停止します。
アプリケーションを停止するには
-
次の JSON コードを
stop_request.json
という名前のファイルに保存します。{"ApplicationName": "test" }
-
次のリクエストを指定して
StopApplication
アクションを実行し、アプリケーションを停止します。aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
アプリケーションが停止します。