对于新项目,我们建议您使用适用于 Apache Flink Studio 的新托管服务,而不是应用程序版 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
配置应用程序输入
您的 Amazon Kinesis Data Analytics 应用程序可以从单个流式传输源中接收输入,并且可以选择使用一个引用数据源。有关更多信息,请参阅 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics:工作原理。本主题的此部分介绍了应用程序输入源。
主题
配置流式传输源
当您创建应用程序时,可以指定流式传输源。您还可以在创建应用程序后修改输入。Amazon Kinesis Data Analytics 支持应用程序的以下流式源:
-
Kinesis 数据流
-
Firehose 的传送直播
注意
2023 年 9 月 12 日之后,如果您尚未使用适用于 SQL 的 Kinesis Data Analytics,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。使用 KinesisFirehoseInput
对 Kinesis Data Analytics for SQL 应用程序进行操作的现有客户可以继续使用 KinesisFirehoseInput
在使用 Kinesis Data Analytics 的现有账户内添加应用程序。如果您是现有客户,并希望使用 KinesisFirehoseInput
在适用于 SQL 应用程序的 Kinesis Data Analytics 中创建一个新的账户,则可以通过服务限制增加表创建案例。有关更多信息,请参阅 AWS Support 中心
注意
如果 Kinesis 数据流是加密的,Kinesis Data Analytics 会无缝地访问加密流中的数据,无需进一步配置。Kinesis Data Analytics 不存储从 Kinesis 数据流读取的未加密数据。有关更多信息,请参阅什么是 Kinesis 数据流的服务器端加密?。
Kinesis Data Analytics 持续轮询流式传输源以查找新数据,并根据输入配置在应用程序内部流中提取该数据。
注意
添加 Kinesis 流作为应用程序的输入不会影响流中的数据。如果其他资源(例如 Firehose 交付流)也访问了相同的 Kinesis 流,则 Firehose 交付流和 Kinesis Data Analytics 应用程序都将收到相同的数据。但是,吞吐量和限制可能会受到影响。
您的应用程序代码可以查询应用程序内部流。作为输入配置的一部分,您需要提供以下内容:
-
流式传输源 – 您提供流的 Amazon 资源名称 (ARN) 以及 Kinesis Data Analytics 可担任的 IAM 角色以代表您访问流。
-
应用程序内部流名称前缀 – 在启动应用程序时,Kinesis Data Analytics 创建指定的应用程序内部流。在您的应用程序代码中,可以使用此名称访问应用程序内部流。
您可以选择将一个流式传输源映射到多个应用程序内部流。有关更多信息,请参阅Limits。在这种情况下,Amazon Kinesis Data Analytics 使用如下名称创建指定数量的应用程序内部流:
prefix
_001
、prefix
_002
和prefix
_003
。默认情况下,Kinesis Data Analytics 将流式传输源映射到一个名为prefix
_001
的应用程序内部流。在应用程序内部流中插入行时有速度限制。因此,Kinesis Data Analytics 支持多个此类应用程序内部流,以便以快得多的速度将记录添加到应用程序中。如果发现应用程序无法及时处理流式传输源中的数据,您可以添加并行度单元以提高性能。
-
映射架构 – 您描述流式传输源上的记录格式 (JSON、CSV)。您还描述流上的每个记录如何映射到创建的应用程序内部流中的列。您可以在此处提供列名和数据类型。
注意
在创建输入应用程序内部流时,Kinesis Data Analytics 使用引号将标识符 (流名称和列名称) 引起来。在查询该流和列时,您必须在引号内使用相同的大小写指定它们 (小写和大写字母完全匹配)。有关标识符的更多信息,请参阅Amazon Managed Service for Apache Flink SQL 参考中的标识符。
您可以通过 Amazon Kinesis Data Analytics 控制台创建一个应用程序并配置输入。然后,控制台执行必需的 API 调用。创建新应用程序 API 或者将输入配置添加到现有应用程序时,可以配置应用程序输入。有关更多信息,请参阅 CreateApplication 和 AddApplicationInput。下面是 Createapplication
API 请求正文的输入配置部分:
"Inputs": [ { "InputSchema": { "RecordColumns": [ { "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "KinesisFirehoseInput": { "ResourceARN": "string", "RoleARN": "string" }, "KinesisStreamsInput": { "ResourceARN": "string", "RoleARN": "string" }, "Name": "string" } ]
配置引用源
您还可以选择将引用数据源添加到现有应用程序中,以便扩充来自流式传输源的数据。您必须将引用数据存储为 Amazon S3 存储桶中的对象。在应用程序启动时,Amazon Kinesis Data Analytics 读取 Amazon S3 对象并创建应用程序内部引用表。然后,您的应用程序代码可以将其与应用程序内部流联接。
您可以使用支持的格式 (CSV、JSON) 在 Amazon S3 对象中存储引用数据。例如,假设您的应用程序对股票订单执行分析。对流式传输源采用以下记录格式:
Ticker, SalePrice, OrderId AMZN $700 1003 XYZ $250 1004 ...
在这种情况下,您可能考虑维护引用数据源,以提供有关每个股票行情机的详细信息,如公司名称。
Ticker, Company AMZN, Amazon XYZ, SomeCompany ...
您可以使用 API 或控制台添加应用程序引用数据源。Amazon Kinesis Data Analytics 提供以下 API 操作来管理引用数据源:
有关使用控制台添加引用数据的信息,请参阅示例:在 应用程序中添加引用数据。
请注意以下几点:
-
如果应用程序正在运行,则 Kinesis Data Analytics 创建一个应用程序内部引用表,然后立即加载引用数据。
-
如果应用程序未运行 (例如,处于就绪状态),则 Kinesis Data Analytics 仅保存更新的输入配置。在应用程序开始运行时,Kinesis Data Analytics 在应用程序中将引用数据作为表进行加载。
假设您希望在 Kinesis Data Analytics 创建应用程序内部引用表后刷新数据。您可能更新了 Amazon S3 对象,或者要使用不同的 Amazon S3 对象。在这种情况下,您可以显式调用 UpdateApplication,或在控制台中依次选择操作、Synchronize reference data table (同步引用数据表)。Kinesis Data Analytics 不会自动刷新应用程序内部引用表。
可作为引用数据源创建的 Amazon S3 对象具有大小限制。有关更多信息,请参阅Limits。如果对象大小超出该限制,则 Kinesis Data Analytics 无法加载数据。应用程序状态显示为正在运行,但是不读取数据。
当添加引用数据源时,您需要提供以下信息:
-
S3 存储桶和对象键名称 – 除了存储桶名称和对象键以外,您还需要提供 Kinesis Data Analytics 可担任的 IAM 角色以代表您读取对象。
-
应用程序内部引用表名称 – Kinesis Data Analytics 创建该应用程序内部表并读取 Amazon S3 对象以填充该表。这是您在应用程序代码中指定的表名称。
-
映射架构 - 您描述记录格式 (JSON、CSV),即 Amazon S3 对象中存储的数据的编码。您还可以描述每个对象元素如何映射到应用程序内部引用表中的列。
下面显示 AddApplicationReferenceDataSource
API 请求中的请求正文。
{ "applicationName": "string", "CurrentapplicationVersionId": number, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "IsDropped": boolean, "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "S3ReferenceDataSource": { "BucketARN": "string", "FileKey": "string", "ReferenceRoleARN": "string" }, "TableName": "string" } }