对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 1:准备数据
在此部分中,您创建一个 Kinesis 数据流,然后在该流上填充订单和交易记录。此式源将用于下一步创建的应用程序。
步骤 1.1:创建流式传输源
可以使用控制台或 AWS CLI创建一个 Kinesis 数据流。本示例采用 OrdersAndTradesStream
作为流名称。
-
使用控制台 — 登录 AWS Management Console 并打开 Kinesis 控制台,网址为 https://console.aws.amazon.com/kinesis。
选择 Data Streams,然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建流。 -
使用 AWS CLI— 使用以下 Kinesis
create-stream
AWS CLI 命令创建直播:$ aws kinesis create-stream \ --stream-name
OrdersAndTradesStream
\ --shard-count 1 \ --region us-east-1 \ --profile adminuser
步骤 1.2:填充流式传输源
运行以下 Python 脚本以便在 OrdersAndTradesStream
中填充示例记录。如果您使用其他名称创建了流,请相应更新 Python 代码。
-
安装 Python 和
pip
。有关安装 Python 的信息,请访问 Python
网站。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装
。 -
运行以下 Python 代码。代码中的
put-record
命令将 JSON 记录写入到流。import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))