Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
本主题包含Managed Service for Apache Flink操作的示例请求块。
要用JSON作 AWS Command Line Interface (AWS CLI) 操作的输入,请将请求保存在JSON文件中。然后,使用 --cli-input-json
参数将文件名传递给该操作。
以下示例演示如何使用带有操作的JSON文件。
$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json
有关JSON与一起使用的更多信息 AWS CLI,请参阅《AWS Command Line Interface 用户指南》中的 “生成CLI骨架和CLI输入JSON参数”。
主题
AddApplicationCloudWatchLoggingOption
以下AddApplicationCloudWatchLoggingOption操作的示例请求代码为适用于 Apache Flink 的托管服务应用程序添加了 Amazon CloudWatch 日志选项:
{
"ApplicationName": "MyApplication",
"CloudWatchLoggingOption": {
"LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream"
},
"CurrentApplicationVersionId": 2
}
AddApplicationInput
以下AddApplicationInput操作的示例请求代码将应用程序输入添加到 Apache Flink 托管服务应用程序中:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 2,
"Input": {
"InputParallelism": {
"Count": 2
},
"InputSchema": {
"RecordColumns": [
{
"Mapping": "$.TICKER",
"Name": "TICKER_SYMBOL",
"SqlType": "VARCHAR(50)"
},
{
"SqlType": "REAL",
"Name": "PRICE",
"Mapping": "$.PRICE"
}
],
"RecordEncoding": "UTF-8",
"RecordFormat": {
"MappingParameters": {
"JSONMappingParameters": {
"RecordRowPath": "$"
}
},
"RecordFormatType": "JSON"
}
},
"KinesisStreamsInput": {
"ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
}
}
}
AddApplicationInputProcessingConfiguration
以下AddApplicationInputProcessingConfiguration操作的示例请求代码将应用程序输入处理配置添加到适用于 Apache Flink 的托管服务应用程序:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 2,
"InputId": "2.1",
"InputProcessingConfiguration": {
"InputLambdaProcessor": {
"ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction"
}
}
}
AddApplicationOutput
以下AddApplicationOutput操作的示例请求代码将 Kinesis 数据流作为应用程序输出添加到 Apache Flink 托管服务应用程序中:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 2,
"Output": {
"DestinationSchema": {
"RecordFormatType": "JSON"
},
"KinesisStreamsOutput": {
"ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
},
"Name": "DESTINATION_SQL_STREAM"
}
}
AddApplicationReferenceDataSource
以下AddApplicationReferenceDataSource操作的示例请求代码将CSV应用程序参考数据源添加到 Apache Flink 托管服务应用程序中:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 5,
"ReferenceDataSource": {
"ReferenceSchema": {
"RecordColumns": [
{
"Mapping": "$.TICKER",
"Name": "TICKER",
"SqlType": "VARCHAR(4)"
},
{
"Mapping": "$.COMPANYNAME",
"Name": "COMPANY_NAME",
"SqlType": "VARCHAR(40)"
},
],
"RecordEncoding": "UTF-8",
"RecordFormat": {
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": " ",
"RecordRowDelimiter": "\r\n"
}
},
"RecordFormatType": "CSV"
}
},
"S3ReferenceDataSource": {
"BucketARN": "arn:aws:s3:::amzn-s3-demo-bucket",
"FileKey": "TickerReference.csv"
},
"TableName": "string"
}
}
AddApplicationVpcConfiguration
以下AddApplicationVpcConfiguration操作的示例请求代码为现有应用程序添加VPC配置:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 9,
"VpcConfiguration": {
"SecurityGroupIds": [ "sg-0123456789abcdef0" ],
"SubnetIds": [ "subnet-0123456789abcdef0" ]
}
}
CreateApplication
以下CreateApplication操作的示例请求代码为 Apache Flink 应用程序创建了一个托管服务:
{
"ApplicationName":"MyApplication",
"ApplicationDescription":"My-Application-Description",
"RuntimeEnvironment":"FLINK-1_15",
"ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
"CloudWatchLoggingOptions":[
{
"LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream"
}
],
"ApplicationConfiguration": {
"EnvironmentProperties":
{"PropertyGroups":
[
{"PropertyGroupId": "ConsumerConfigProperties",
"PropertyMap":
{"aws.region": "us-east-1",
"flink.stream.initpos": "LATEST"}
},
{"PropertyGroupId": "ProducerConfigProperties",
"PropertyMap":
{"aws.region": "us-east-1"}
},
]
},
"ApplicationCodeConfiguration":{
"CodeContent":{
"S3ContentLocation":{
"BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
"FileKey":"myflink.jar",
"ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
}
},
"CodeContentType":"ZIPFILE"
},
"FlinkApplicationConfiguration":{
"ParallelismConfiguration":{
"ConfigurationType":"CUSTOM",
"Parallelism":2,
"ParallelismPerKPU":1,
"AutoScalingEnabled":true
}
}
}
}
CreateApplicationSnapshot
以下CreateApplicationSnapshot操作的示例请求代码创建了应用程序状态的快照:
{
"ApplicationName": "MyApplication",
"SnapshotName": "MySnapshot"
}
DeleteApplication
以下DeleteApplication操作请求代码示例删除了 Apache Flink 应用程序的托管服务:
{"ApplicationName": "MyApplication",
"CreateTimestamp": 12345678912}
DeleteApplicationCloudWatchLoggingOption
以下DeleteApplicationCloudWatchLoggingOption操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中删除亚马逊 CloudWatch日志选项:
{
"ApplicationName": "MyApplication",
"CloudWatchLoggingOptionId": "3.1"
"CurrentApplicationVersionId": 3
}
DeleteApplicationInputProcessingConfiguration
以下DeleteApplicationInputProcessingConfiguration操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中删除了输入处理配置:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 4,
"InputId": "2.1"
}
DeleteApplicationOutput
以下DeleteApplicationOutput操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中删除应用程序输出:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 4,
"OutputId": "4.1"
}
DeleteApplicationReferenceDataSource
以下DeleteApplicationReferenceDataSource操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中移除应用程序参考数据源:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 5,
"ReferenceId": "5.1"
}
DeleteApplicationSnapshot
以下DeleteApplicationSnapshot操作的示例请求代码删除了应用程序状态的快照:
{
"ApplicationName": "MyApplication",
"SnapshotCreationTimestamp": 12345678912,
"SnapshotName": "MySnapshot"
}
DeleteApplicationVpcConfiguration
以下DeleteApplicationVpcConfiguration操作的示例请求代码将从应用程序中移除现有VPC配置:
{
"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 9,
"VpcConfigurationId": "1.1"
}
DescribeApplication
以下DescribeApplication操作的示例请求代码返回有关适用于 Apache Flink 的托管服务应用程序的详细信息:
{"ApplicationName": "MyApplication"}
DescribeApplicationSnapshot
以下DescribeApplicationSnapshot操作的示例请求代码返回有关应用程序状态快照的详细信息:
{
"ApplicationName": "MyApplication",
"SnapshotName": "MySnapshot"
}
DiscoverInputSchema
以下DiscoverInputSchema操作的示例请求代码从流媒体源生成架构:
{
"InputProcessingConfiguration": {
"InputLambdaProcessor": {
"ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction"
}
},
"InputStartingPositionConfiguration": {
"InputStartingPosition": "NOW"
},
"ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream",
"S3Configuration": {
"BucketARN": "string",
"FileKey": "string"
},
"ServiceExecutionRole": "string"
}
以下DiscoverInputSchema操作的示例请求代码从参考来源生成架构:
{
"S3Configuration": {
"BucketARN": "arn:aws:s3:::amzn-s3-demo-bucket",
"FileKey": "TickerReference.csv"
},
"ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole"
}
ListApplications
以下ListApplications操作的示例请求代码会返回您账户中适用于 Apache Flink 应用程序的托管服务列表:
{
"ExclusiveStartApplicationName": "MyApplication",
"Limit": 50
}
ListApplicationSnapshots
以下ListApplicationSnapshots操作的示例请求代码返回了应用程序状态快照的列表:
{"ApplicationName": "MyApplication",
"Limit": 50,
"NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123"
}
StartApplication
以下StartApplication操作请求代码示例启动适用于 Apache Flink 的托管服务应用程序,并从最新的快照(如果有)加载应用程序状态:
{
"ApplicationName": "MyApplication",
"RunConfiguration": {
"ApplicationRestoreConfiguration": {
"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
}
}
}
StopApplication
以下API_StopApplication操作的示例请求代码会停止 Apache Flink 托管服务应用程序:
{"ApplicationName": "MyApplication"}
UpdateApplication
以下UpdateApplication操作的示例请求代码更新了适用于 Apache Flink 的托管服务应用程序,以更改应用程序代码的位置:
{"ApplicationName": "MyApplication",
"CurrentApplicationVersionId": 1,
"ApplicationConfigurationUpdate": {
"ApplicationCodeConfigurationUpdate": {
"CodeContentTypeUpdate": "ZIPFILE",
"CodeContentUpdate": {
"S3ContentLocationUpdate": {
"BucketARNUpdate": "arn:aws:s3:::amzn-s3-demo-bucket
",
"FileKeyUpdate": "my_new_code.zip
",
"ObjectVersionUpdate": "2"
}
}
}
}