Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
このセクションでは、 を使用して Managed Service for Apache Flink アプリケーション AWS Command Line Interface を作成して実行します。kinesisanalyticsv2 AWS CLI コマンドを使用して、Managed Service for Apache Flink アプリケーションを作成して操作します。
許可ポリシーを作成する
注記
アプリケーションのアクセス権限ポリシーとロールを作成する必要があります。これらの IAM リソースを作成しない場合、アプリケーションはそのデータストリームやログストリームにアクセスできません。
まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの読み取りアクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの書き込みアクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。
次のコードを使用して AKReadSourceStreamWriteSinkStream
アクセス許可ポリシーを作成します。username
を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) (012345678901)
のアカウント ID を自分のアカウント ID に置き換えます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
許可ポリシーを作成する詳しい手順については、IAM ユーザーガイドのチュートリアル: はじめてのカスタマー管理ポリシーの作成とアタッチを参照してください。
IAM ポリシーを作成する
このセクションでは、Managed Service for Apache Flink アプリケーションがソースストリームを読み取り、シンクストリームに書き込むために想定できる IAM ロールを作成します。
Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。
前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。
IAM ロールを作成するには
IAM コンソール (https://console.aws.amazon.com/iam/
) を開きます。 ナビゲーションペインで [ロール] を選択し、続いて [ロールを作成] を選択します。
[信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。
[このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。
[ユースケースの選択]で、[Managed Service for Apache Flink]を選択します。
[Next: Permissions] (次へ: アクセス許可) を選択します。
[アクセス権限ポリシーをアタッチする] ページで、[Next: Review] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。
[Create role (ロールの作成)] ページで、ロールの名前に
MF-stream-rw-role
を入力します。[ロールの作成] を選択します。これで、
MF-stream-rw-role
と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。アクセス許可ポリシーをロールにアタッチします。
注記
この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。前のステップである「許可ポリシーの作成」で作成したロールを添付します。
[概要] ページで、[アクセス許可] タブを選択します。
[Attach Policies (ポリシーのアタッチ)] を選択します。
検索ボックスに
AKReadSourceStreamWriteSinkStream
(前のセクションで作成したポリシー) と入力します。AKReadSourceStreamWriteSinkStream
ポリシーを選択し、[ポリシーを添付]を選択します。
これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。
ロールを作成する手順については、IAM ユーザーガイドの IAM ロールの作成 (コンソール)を参照してください。
アプリケーションの作成
次の JSON コードを create_request.json
という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (ユーザー名) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。
{
"ApplicationName": "getting_started",
"ApplicationDescription": "Scala getting started application",
"RuntimeEnvironment": "FLINK-1_19",
"ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
"ApplicationConfiguration": {
"ApplicationCodeConfiguration": {
"CodeContent": {
"S3ContentLocation": {
"BucketARN": "arn:aws:s3:::ka-app-code-username
",
"FileKey": "getting-started-scala-1.0.jar"
}
},
"CodeContentType": "ZIPFILE"
},
"EnvironmentProperties": {
"PropertyGroups": [
{
"PropertyGroupId": "ConsumerConfigProperties",
"PropertyMap" : {
"aws.region" : "us-west-2",
"stream.name" : "ExampleInputStream",
"flink.stream.initpos" : "LATEST"
}
},
{
"PropertyGroupId": "ProducerConfigProperties",
"PropertyMap" : {
"aws.region" : "us-west-2",
"stream.name" : "ExampleOutputStream"
}
}
]
}
},
"CloudWatchLoggingOptions": [
{
"LogStreamARN": "arn:aws:logs:us-west-2:012345678901
:log-group:MyApplication:log-stream:kinesis-analytics-log-stream"
}
]
}
次のリクエストによって CreateApplication を実行して、アプリケーションを作成します。
aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。
アプリケーションを起動する
このセクションでは、 StartApplicationアクションを使用してアプリケーションを起動します。
アプリケーションを起動するには
次の JSON コードを
start_request.json
という名前のファイルに保存します。{ "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
前述のリクエストを指定して
StartApplication
アクションを実行し、アプリケーションを起動します。aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。
アプリケーションを停止する
このセクションでは、StopApplicationアクションを使用してアプリケーションを停止します。
アプリケーションを停止するには
次の JSON コードを
stop_request.json
という名前のファイルに保存します。{ "ApplicationName": "s3_sink" }
前述のリクエストを指定して
StopApplication
アクションを実行し、アプリケーションを起動します。aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
アプリケーションが停止します。
CloudWatch ログ記録オプションを追加する
を使用して AWS CLI 、Amazon CloudWatch ログストリームをアプリケーションに追加できます。アプリケーションでCloudWatch ログを使用する情報については、「アプリケーションロギングの設定」を参照してください。
環境プロパティを更新する
このセクションでは、UpdateApplicationアクションを使用して、アプリケーションコードを再コンパイルせずにアプリケーションの環境プロパティを変更します。この例では、ソースストリームおよびレプリケート先ストリームのリージョンを変更します。
アプリケーションの環境プロパティを更新します
次の JSON コードを
update_properties_request.json
という名前のファイルに保存します。{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
前述のリクエストで
UpdateApplication
アクションを実行し、環境プロパティを更新します。aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
アプリケーションコードの更新
アプリケーションコードを新しいバージョンのコードパッケージで更新する必要がある場合は、「UpdateApplication」CLI アクションを使用します。
注記
同じファイル名のアプリケーションコードの新しいバージョンをロードするには、新しいオブジェクトバージョンを指定する必要があります。Amazon S3 オブジェクトバージョンを使用する方法の詳細については、「バージョニングの有効化または無効化」を参照してください。
を使用するには AWS CLI、Amazon S3 バケットから以前のコードパッケージを削除し、新しいバージョンをアップロードして を呼び出しUpdateApplication
、同じ Amazon S3 バケットとオブジェクト名、および新しいオブジェクトバージョンを指定します。アプリケーションは新しいコードパッケージで再起動します。
以下の UpdateApplication
アクションのサンプル・リクエストは、アプリケーション・コードを再読み込 み、アプリケーションを再起動します。CurrentApplicationVersionId
を現在のアプリケーションバージョンに更新します。ListApplications
または DescribeApplication
アクションを使用して、現在のアプリケーションバージョンを確認できます。バケット名のサフィックス (「<username>」) を、 依存リソースを作成する セクションで選択したサフィックスで更新します。
{{
"ApplicationName": "getting_started",
"CurrentApplicationVersionId": 1,
"ApplicationConfigurationUpdate": {
"ApplicationCodeConfigurationUpdate": {
"CodeContentUpdate": {
"S3ContentLocationUpdate": {
"BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>",
"FileKeyUpdate": "getting-started-scala-1.0.jar",
"ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU"
}
}
}
}
}