创建并运行应用程序 (CLI) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建并运行应用程序 (CLI)

在本节中,您将使用创建和运行适用 AWS Command Line Interface 于 Apache Flink 的托管服务应用程序。使用 k inesisanalyticsv2 AWS CLI 命令为 Apache Flink 应用程序创建托管服务并与之交互。

创建权限策略

注意

您必须为应用程序创建一个权限策略和角色。如果您不创建这些IAM资源,则您的应用程序将无法访问其数据和日志流。

首先,使用两个语句创建权限策略:一个语句授予对源流执行读取操作的权限,另一个语句授予对接收器流执行写入操作的权限。然后,将该策略附加到一个IAM角色(将在下一节中创建)。因此,在 Managed Service for Apache Flink代入该角色时,服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 AKReadSourceStreamWriteSinkStream 权限策略。将 username 替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARNs) 中的账户 ID (012345678901) 替换为您的账户 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

有关创建权限策略的 step-by-step 说明,请参阅IAM用户指南中的教程:创建并附加您的第一个客户托管策略

创建IAM策略

在本节中,您将创建一个IAM角色,适用于 Apache Flink 的托管服务应用程序可以代入该角色来读取源流并写入接收流。

权限不足时,Managed Service for Apache Flink 无法访问您的串流。您可以通过IAM角色授予这些权限。每个IAM角色都有两个附加的策略。此信任策略授予 Managed Service for Apache Flink代入该角色的权限,权限策略确定 Managed Service for Apache Flink代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色
  1. 打开IAM控制台,网址为https://console.aws.amazon.com/iam/

  2. 在导航窗格中选择角色,然后选择创建角色

  3. 选择受信任实体的类型 下,选择 AWS 服务

  4. 选择将使用此角色的服务 下,选择 Kinesis

  5. 选择您的用例部分,选择Managed Service for Apache Flink

  6. 选择下一步: 权限

  7. 附加权限策略 页面上,选择 下一步: 审核。在创建角色后,您可以附加权限策略。

  8. 创建角色 页面上,输入MF-stream-rw-role作为角色名称。选择 Create role(创建角色)。

    现在,您已经创建了一个名为的新IAM角色MF-stream-rw-role。接下来,您更新角色的信任和权限策略。

  9. 将权限策略附加到角色。

    注意

    对于本练习,Managed Service for Apache Flink代入此角色,以便同时从 Kinesis 数据流(源)读取数据和将输出写入另一个 Kinesis 数据流。因此,您附加在上一步创建权限策略中创建的策略。

    1. 摘要 页上,选择 权限 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 AKReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择AKReadSourceStreamWriteSinkStream策略,然后选择附加策略

现在,您已经创建了应用程序用来访问资源的服务执行角色。记下ARN新角色。

有关创建角色的 step-by-step 说明,请参阅《IAM用户指南》中的创建IAM角色(控制台)

创建应用程序

将以下JSON代码保存到名为的文件中create_request.json。将示例角色ARN替换为您之前创建的角色所ARN对应的角色。将存储桶ARN后缀(用户名)替换为您在上一节中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

CreateApplication使用以下请求执行以创建应用程序:

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您将使用StartApplication操作来启动应用程序。

启动应用程序
  1. 将以下JSON代码保存到名为的文件中start_request.json

    { "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看托管服务的 Apache Flink 指标,以验证应用程序是否正常运行。

停止应用程序

在本节中,您将使用StopApplication操作来停止应用程序。

停止应用程序
  1. 将以下JSON代码保存到名为的文件中stop_request.json

    { "ApplicationName": "s3_sink" }
  2. 使用上述请求执行 StopApplication 操作来停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

添加 CloudWatch 日志选项

您可以使用将 Amazon CloudWatch 日志流 AWS CLI 添加到您的应用程序中。有关在应用程序中使用 CloudWatch 日志的信息,请参阅设置应用程序日志记录

更新环境属性

在本节中,您将使用UpdateApplication操作来更改应用程序的环境属性,而无需重新编译应用程序代码。在该示例中,您更改源流和目标流的区域。

更新应用程序的环境属性
  1. 将以下JSON代码保存到名为的文件中update_properties_request.json

    { "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
  2. 使用前面的请求执行 UpdateApplication 操作以更新环境属性:

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

更新应用程序代码

当您需要使用新版本的代码包更新应用程序代码时,可以使用UpdateApplicationCLI操作。

注意

要使用相同的文件名加载新版本的应用程序代码,您必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制

要使用 AWS CLI,请从 Amazon S3 存储桶中删除之前的代码包,上传新版本,然后调用UpdateApplication,指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。

以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplicationsDescribeApplication 操作检查当前的应用程序版本。将存储桶名称后缀 (<用户名>) 更新为在创建依赖资源一节中选择的后缀。

{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }