Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行应用程序 (CLI)
在本节中,您将使用创建和运行适用 AWS Command Line Interface 于 Apache Flink 的托管服务应用程序。使用 k inesisanalyticsv2 AWS CLI 命令为 Apache Flink 应用程序创建托管服务并与之交互。
创建权限策略
注意
您必须为应用程序创建一个权限策略和角色。如果您不创建这些IAM资源,则您的应用程序将无法访问其数据和日志流。
首先,使用两个语句创建权限策略:一个语句授予对源流执行读取操作的权限,另一个语句授予对接收器流执行写入操作的权限。然后,将该策略附加到一个IAM角色(将在下一节中创建)。因此,在 Managed Service for Apache Flink代入该角色时,服务具有必要的权限从源流进行读取和写入接收器流。
使用以下代码创建 AKReadSourceStreamWriteSinkStream
权限策略。将 username
替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARNs) 中的账户 ID (012345678901)
替换为您的账户 ID。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
有关创建权限策略的 step-by-step 说明,请参阅IAM用户指南中的教程:创建并附加您的第一个客户托管策略。
创建IAM策略
在本节中,您将创建一个IAM角色,适用于 Apache Flink 的托管服务应用程序可以代入该角色来读取源流并写入接收流。
权限不足时,Managed Service for Apache Flink 无法访问您的串流。您可以通过IAM角色授予这些权限。每个IAM角色都有两个附加的策略。此信任策略授予 Managed Service for Apache Flink代入该角色的权限,权限策略确定 Managed Service for Apache Flink代入这个角色后可以执行的操作。
您将在上一部分中创建的权限策略附加到此角色。
创建 IAM 角色
打开IAM控制台,网址为https://console.aws.amazon.com/iam/
。 在导航窗格中选择角色,然后选择创建角色。
在 选择受信任实体的类型 下,选择 AWS 服务。
在 选择将使用此角色的服务 下,选择 Kinesis。
在选择您的用例部分,选择Managed Service for Apache Flink。
选择下一步: 权限。
在 附加权限策略 页面上,选择 下一步: 审核。在创建角色后,您可以附加权限策略。
在 创建角色 页面上,输入
MF-stream-rw-role
作为角色名称。选择 Create role(创建角色)。现在,您已经创建了一个名为的新IAM角色
MF-stream-rw-role
。接下来,您更新角色的信任和权限策略。将权限策略附加到角色。
注意
对于本练习,Managed Service for Apache Flink代入此角色,以便同时从 Kinesis 数据流(源)读取数据和将输出写入另一个 Kinesis 数据流。因此,您附加在上一步创建权限策略中创建的策略。
在 摘要 页上,选择 权限 选项卡。
选择附加策略。
在搜索框中,输入
AKReadSourceStreamWriteSinkStream
(您在上一部分中创建的策略)。选择
AKReadSourceStreamWriteSinkStream
策略,然后选择附加策略。
现在,您已经创建了应用程序用来访问资源的服务执行角色。记下ARN新角色。
有关创建角色的 step-by-step 说明,请参阅《IAM用户指南》中的创建IAM角色(控制台)。
创建应用程序
将以下JSON代码保存到名为的文件中create_request.json
。将示例角色ARN替换为您之前创建的角色所ARN对应的角色。将存储桶ARN后缀(用户名)替换为您在上一节中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。
{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-
username
", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901
:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }
CreateApplication使用以下请求执行以创建应用程序:
aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
应用程序现已创建。您在下一步中启动应用程序。
启动应用程序
在本节中,您将使用StartApplication操作来启动应用程序。
启动应用程序
将以下JSON代码保存到名为的文件中
start_request.json
。{ "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
使用上述请求执行
StartApplication
操作来启动应用程序:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看托管服务的 Apache Flink 指标,以验证应用程序是否正常运行。
停止应用程序
在本节中,您将使用StopApplication操作来停止应用程序。
停止应用程序
将以下JSON代码保存到名为的文件中
stop_request.json
。{ "ApplicationName": "s3_sink" }
使用上述请求执行
StopApplication
操作来停止应用程序:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
应用程序现已停止。
添加 CloudWatch 日志选项
您可以使用将 Amazon CloudWatch 日志流 AWS CLI 添加到您的应用程序中。有关在应用程序中使用 CloudWatch 日志的信息,请参阅设置应用程序日志记录。
更新环境属性
在本节中,您将使用UpdateApplication操作来更改应用程序的环境属性,而无需重新编译应用程序代码。在该示例中,您更改源流和目标流的区域。
更新应用程序的环境属性
将以下JSON代码保存到名为的文件中
update_properties_request.json
。{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
使用前面的请求执行
UpdateApplication
操作以更新环境属性:aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
更新应用程序代码
当您需要使用新版本的代码包更新应用程序代码时,可以使用UpdateApplicationCLI操作。
注意
要使用相同的文件名加载新版本的应用程序代码,您必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制。
要使用 AWS CLI,请从 Amazon S3 存储桶中删除之前的代码包,上传新版本,然后调用UpdateApplication
,指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。
以下示例 UpdateApplication
操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId
更新为当前的应用程序版本。您可以使用 ListApplications
或 DescribeApplication
操作检查当前的应用程序版本。将存储桶名称后缀 (<用户名>) 更新为在创建依赖资源一节中选择的后缀。
{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }