建立並執行應用程式 (CLI) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

建立並執行應用程式 (CLI)

在本節中,您可 AWS Command Line Interface 以使用建立和執行 Apache Flink 應用程式的受管理服務。使用指 AWS CLI 令為 Ap ache Flink 應用程式建立受管理的服務,並與之互動。

建立許可政策

注意

您必須為應用程式建立許可政策和角色。如果您未建立這些 IAM 資源,應用程式將無法存取其資料和日誌串流。

您會先建立具有兩條陳述式的許可政策:一條陳述式授與來源串流上 read 動作的許可,而另一條則是授與目的地串流上 write 動作的許可。您之後會將政策連接至 IAM 角色 (您會在下一節中建立)。因此,當 Managed Service for Apache Flink 擔任角色時,服務便具有從來源串流讀取並寫入目的地串流的所需許可。

使用以下程式碼來建立 AKReadSourceStreamWriteSinkStream 許可政策。以您用於建立 Amazon S3 儲存貯體 (以儲存應用程式的程式碼) 的使用者名稱來取代 username。使用您的帳戶 ID 取代 Amazon Resource Name (ARN) (012345678901) 中的帳戶 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

如需建立許可政策的指 step-by-step 示,請參閱 IAM 使用指南中的教學課程:建立和附加您的第一個客戶受管政策

建立 IAM 政策

在本節中,您會建立 Managed Service for Apache Flink 應用程式可以擔任的 IAM 角色,以便讀取來源串流與寫入目的地串流。

Managed Service for Apache Flink 沒有許可,無法存取串流。您可以透過 IAM 角色來授與這些許可。各 IAM 角色都有連接兩項政策。信任政策會授與擔任角色的 Managed Service for Apache Flink 許可,而許可政策決定了 Managed Service for Apache Flink 在擔任角色後可以執行的作業。

您會將在上一節中建立的許可政策連接至此角色。

若要建立一個 IAM 角色
  1. 前往 https://console.aws.amazon.com/iam/ 開啟 IAM 主控台。

  2. 在導覽窗格中,選擇角色,然後選擇建立角色

  3. 選取可信身分類型下,選擇 AWS 服務

  4. 選擇將使用此角色的服務下,選擇 Kinesis

  5. 選取使用案例下,選擇 Managed Service for Apache Flink

  6. 選擇下一步:許可

  7. 連接許可政策頁面,選擇下一步:檢閱。您會在建立角色後連接許可政策。

  8. 建立角色頁面,輸入 MF-stream-rw-role 作為角色名稱。選擇建立角色

    現在您已建立新的 IAM 角色,名為 MF-stream-rw-role。您接著會更新角色的信任和許可政策

  9. 將 許可政策連接到角色。

    注意

    在此練習中,Managed Service for Apache Flink 擔任從 Kinesis 資料串流 (來源) 讀取資料並將輸出寫入另一個 Kinesis 資料串流的角色。因此您會連接在上一個步驟建立許可政策中建立的政策。

    1. 摘要頁面,選擇許可標籤。

    2. 選擇連接政策

    3. 在搜尋方塊中,輸入 AKReadSourceStreamWriteSinkStream (您在上一節中建立的政策)。

    4. 選擇 AKReadSourceStreamWriteSinkStream 政策,然後選擇連接政策

您現在已建立應用程式用於存取資源的服務執行角色。請記下新角色的 ARN。

如需建立角色的指 step-by-step 示,請參閱 IAM 使用者指南中的建立 IAM 角色 (主控台)

建立應用程式

將下列 JSON 程式碼複製到名為 create_request.json 的檔案。使用您之前建立之角色的 ARN,取代範例角色 ARN。使用您在上一節中選擇的尾碼取代儲存貯體 ARN 尾碼 (username)。使用您的帳戶 ID 取代服務執行角色中的範例帳戶 ID (012345678901)。

{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

CreateApplication使用以下請求執行以創建應用程序:

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

應用程式現在已建立。您會在下一個步驟中啟動應用程式。

啟動應用程式

在本節中,您將使用StartApplication動作來啟動應用程式。

啟動應用程式
  1. 將下列 JSON 程式碼複製到名為 start_request.json 的檔案。

    { "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 以啟動應用程式的上述請求,執行 StartApplication 動作:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

應用程式現在正在執行。您可以在 Amazon CloudWatch 主控台上查看適用於 Apache Flink 的受管服務指標,以確認應用程式是否正常運作。

停止應用程式

在本節中,您可以使用StopApplication動作來停止應用程式。

停止應用程式
  1. 將下列 JSON 程式碼複製到名為 stop_request.json 的檔案。

    { "ApplicationName": "s3_sink" }
  2. 使用前述請求執行 StopApplication 動作以停止應用程式:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

現在已停止應用程式。

新增記 CloudWatch 錄選項

您可以使用 AWS CLI 將 Amazon CloudWatch 日誌串流新增至您的應用程式。如需搭配應用程式使用記 CloudWatch 錄的相關資訊,請參閱設定應用程式記錄

更新環境屬性

在本節中,您可以使用UpdateApplication動作來變更應用程式的環境屬性,而無需重新編譯應用程式程式碼。在此範例中,您會變更來源和目的地串流的「區域」。

更新應用程式的環境屬性
  1. 將下列 JSON 程式碼複製到名為 update_properties_request.json 的檔案。

    { "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
  2. 使用前述請求執行 UpdateApplication 動作以更新環境屬性:

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

更新應用程式的程式碼

當您需要使用新版程式碼套件更新應用程式程式碼時,請使用 UpdateApplicationCLI 動作。

注意

若要載入具有相同檔案名稱的新版應用程式的程式碼,必須指定新的物件版本。如需如何使用 Amazon S3 物件版本的詳細資訊,請參閱啟用或停用版本控制

若要使用 AWS CLI,請從 Amazon S3 儲存貯體刪除先前的程式碼套件、上傳新版本,然後呼叫UpdateApplication,指定相同的 Amazon S3 儲存貯體和物件名稱,以及新的物件版本。應用程式將以新的程式碼套件重新啟動。

UpdateApplication 動作的下列範例請求會重新載入應用程式的程式碼並重新啟動應用程式。將 CurrentApplicationVersionId 更新至目前的應用程式版本。您可以使用 ListApplicationsDescribeApplication 動作來檢查目前的應用程式版本。使用您在建立相依資源一節中選擇的尾碼更新儲存貯體名稱尾碼 (<username>)。

{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }