アプリケーションの作成と実行 (CLI) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

アプリケーションの作成と実行 (CLI)

このセクションでは、 を使用して Managed Service for Apache Flink アプリケーション AWS Command Line Interface を作成して実行します。kinesisanalyticsv2 AWS CLI コマンドを使用して、Managed Service for Apache Flink アプリケーションを作成して操作します。

許可ポリシーを作成する

注記

アプリケーションのアクセス権限ポリシーとロールを作成する必要があります。これらの IAM リソースを作成しない場合、アプリケーションはそのデータストリームやログストリームにアクセスできません。

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの読み取りアクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの書き込みアクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。

次のコードを使用して AKReadSourceStreamWriteSinkStream アクセス許可ポリシーを作成します。username を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) (012345678901)のアカウント ID を自分のアカウント ID に置き換えます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アクセス許可ポリシーを作成する step-by-step 手順については、IAM ユーザーガイドの「チュートリアル: 最初のカスタマー管理ポリシーの作成とアタッチ」を参照してください。

IAM ポリシーを作成する

このセクションでは、Managed Service for Apache Flink アプリケーションがソースストリームを読み取り、シンクストリームに書き込むために想定できる IAM ロールを作成します。

Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

IAM ロールを作成するには
  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. ナビゲーションペインで [ロール] を選択し、続いて [ロールを作成] を選択します。

  3. [信頼されるエンティティの種類を選択] で、[AWS のサービス] を選択します。

  4. [このロールを使用するサービスを選択] で、[Kinesis Analytics] を選択します。

  5. [ユースケースの選択]で、[Managed Service for Apache Flink]を選択します。

  6. [Next: Permissions] (次のステップ: 許可) を選択します。

  7. [アクセス権限ポリシーをアタッチする] ページで、[Next: Review] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。

  8. [Create role (ロールの作成)] ページで、ロールの名前MF-stream-rw-role を入力します。[ロールの作成] を選択します。

    これで、MF-stream-rw-role と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

  9. アクセス許可ポリシーをロールにアタッチします。

    注記

    この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。前のステップである「許可ポリシーの作成」で作成したロールを添付します。

    1. [概要] ページで、[アクセス許可] タブを選択します。

    2. [Attach Policies (ポリシーのアタッチ)] を選択します。

    3. 検索ボックスにAKReadSourceStreamWriteSinkStream(前のセクションで作成したポリシー) と入力します。

    4. AKReadSourceStreamWriteSinkStreamポリシーを選択し、[ポリシーを添付]を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

ロールを作成する step-by-step 手順については、IAM ユーザーガイドの「IAM ロールの作成 (コンソール)」を参照してください。

アプリケーションの作成

次の JSON コードを create_request.json という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (ユーザー名) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (012345678901) を、自分のアカウント ID に置き換えます。

{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

次のリクエストCreateApplicationで を実行してアプリケーションを作成します。

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

アプリケーションを起動する

このセクションでは、 StartApplicationアクションを使用してアプリケーションを起動します。

アプリケーションを起動するには
  1. 次の JSON コードを start_request.json という名前のファイルに保存します。

    { "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 前述のリクエストを指定して StartApplication アクションを実行し、アプリケーションを起動します。

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが動作していることを確認できます。

アプリケーションの停止

このセクションでは、 StopApplicationアクションを使用してアプリケーションを停止します。

アプリケーションを停止するには
  1. 次の JSON コードを stop_request.json という名前のファイルに保存します。

    { "ApplicationName": "s3_sink" }
  2. 前述のリクエストを指定して StopApplication アクションを実行し、アプリケーションを起動します。

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

アプリケーションが停止します。

CloudWatch ログ記録オプションを追加する

を使用して AWS CLI 、Amazon CloudWatch ログストリームをアプリケーションに追加できます。アプリケーションで CloudWatch ログを使用する方法については、「アプリケーションログ記録のセットアップ」を参照してください。

環境プロパティを更新する

このセクションでは、 UpdateApplicationアクションを使用して、アプリケーションコードを再コンパイルせずにアプリケーションの環境プロパティを変更します。この例では、ソースストリームおよびレプリケート先ストリームのリージョンを変更します。

アプリケーションの環境プロパティを更新します
  1. 次の JSON コードを update_properties_request.json という名前のファイルに保存します。

    { "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
  2. 前述のリクエストでUpdateApplicationアクションを実行し、環境プロパティを更新します。

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

アプリケーションコードの更新

アプリケーションコードを新しいバージョンのコードパッケージで更新する必要がある場合は、 CLI UpdateApplication アクションを使用します。

注記

同じファイル名のアプリケーションコードの新しいバージョンをロードするには、新しいオブジェクトバージョンを指定する必要があります。Amazon S3 オブジェクトバージョンを使用する方法の詳細については、「バージョニングの有効化または無効化」を参照してください。

を使用するには AWS CLI、Amazon S3 バケットから以前のコードパッケージを削除し、新しいバージョンをアップロードして、同じ Amazon S3 バケットとオブジェクト名、および新しいオブジェクトバージョンUpdateApplicationを指定して を呼び出します。アプリケーションは新しいコードパッケージで再起動します。

以下の UpdateApplication アクションのサンプル・リクエストは、アプリケーション・コードを再読み込 み、アプリケーションを再起動します。CurrentApplicationVersionId を現在のアプリケーションバージョンに更新します。ListApplications または DescribeApplication アクションを使用して、現在のアプリケーションバージョンを確認できます。バケット名のサフィックス (「<username>」) を、 依存リソースを作成する セクションで選択したサフィックスで更新します。

{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }