本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用物联网 Greengrass 经济实惠地将物联网数据直接摄取到亚马逊 S3 AWS
由塞巴斯蒂安·维维亚尼 (AWS) 和 Rizwan Syed () 创作 AWS
摘要
此模式向您展示了如何使用物联网 G AWS reengrass 版本 2 设备经济高效地将物联网 (IoT) 数据直接摄取到亚马逊简单存储服务 (Amazon S3) 存储桶中。设备运行自定义组件,用于读取物联网数据,并将数据保存在永久存储(即本地磁盘或卷)中。然后,设备将物联网数据压缩为 Apache Parquet 文件,并定期将数据上传至 S3 存储桶。
您采集的物联网数据的数量和速度仅受边缘硬件功能以及网络带宽的限制。您可使用 Amazon Athena 经济高效地分析您摄取的数据。Athena 支持使用 Amazon Managed Grafana 进行压缩 Apache Parquet 文件和数据可视化。
先决条件和限制
先决条件
一个活跃的AWS账户
在 AWSIoT Greengrass 版本 2 上运行并从传感器收集数据的边缘网关(数据源和数据收集过程超出了此模式的范围,但您几乎可以使用任何类型的传感器数据。 此模式使用带有传感器或网关的本地MQTT
代理,可在本地发布数据。) 用于将数据上传至 S3 存储桶的流管理器组件
AWSSDK对于 Java
JavaScript、AWSSDKf or 或 AWSSDKPython (Boto3) 运行 APIs
限制
这种模式中的数据不会实时上传至 S3 存储桶。有延迟期,您可配置延迟时间。数据在边缘设备中临时缓冲,然后在到期后上传。
SDK仅在 Java、Node.js 和 Python 中可用。
架构
目标技术堆栈
Amazon S3
AWS IoT Greengrass
MQTT经纪人
流管理器组件
目标架构
下图显示了一种架构,该架构旨在摄取物联网传感器数据,并将该数据存储至 S3 存储桶。
图表显示了以下工作流:
多个传感器(例如温度和阀门)更新会发布给当地MQTT代理商。
订阅这些传感器的 Parquet 文件压缩器会更新主题并接收这些更新。
Parquet 文件压缩器将更新项存储在本地。
期限过后,存储的文件被压缩为 Parquet 文件,然后传递至流管理器,以上传到指定的 S3 存储桶。
流管理器会将 Parquet 文件上传至 S3 存储桶。
注意
直播管理器 (StreamManager
) 是一个托管组件。有关如何将数据导出到 Amazon S3 的示例,请参阅 AWS IoT G reengrass 文档中的流管理器。你可以使用本地MQTT经纪商作为组件,也可以使用其他经纪商,比如 Eclipse Mos
工具
AWS工具
Amazon Athena 是一项交互式查询服务,可帮助您使用标准直接在 Amazon S3 中分析数据。SQL
Amazon Simple Storage Service (Amazon S3) 是一项基于云的对象存储服务,可帮助您存储、保护和检索任意数量的数据。
AWSIoT Greengrass 是一款开源物联网边缘运行时和云服务,可帮助您在设备上构建、部署和管理物联网应用程序。
其他工具
Apache Parquet
是一种专为存储和检索而设计的开源列式数据文件格式。 MQTT(消息队列遥测传输)是一种轻量级消息协议,专为受限的设备而设计。
最佳实践
对上传的数据使用正确分区格式
对 S3 存储桶中的根前缀名称没有具体要求(例如 "myAwesomeDataSet/"
或 "dataFromSource"
),但我们建议您使用有意义的分区和前缀,以便于理解数据集的用途。
我们还建议您在 Amazon S3 中使用正确分区,以便查询在数据集上以最佳方式运行。在以下示例中,按HIVE格式对数据进行了分区,以便优化每个 Athena 查询扫描的数据量。这可以提高性能并降低成本。
s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet
操作说明
任务 | 描述 | 所需技能 |
---|---|---|
创建 S3 存储桶。 |
| 应用程序开发人员 |
向 S3 存储桶添加IAM权限。 | 要向用户授予您之前创建的 S3 存储桶和前缀的写入权限,请将以下IAM策略添加到您的 AWS IoT Greengrass 角色中:
有关更多信息,请参阅 Aurora 文档中的创建访问 Amazon S3 资源的IAM策略。 接下来,更新 S3 存储桶的资源策略(如果需要),以允许使用正确的AWS委托人进行写入访问。 | 应用程序开发人员 |
任务 | 描述 | 所需技能 |
---|---|---|
更新组件的配方。 |
| 应用程序开发人员 |
创建组件。 | 请执行以下操作之一:
| 应用程序开发人员 |
更新MQTT客户端。 | 示例代码不使用身份验证,因为该组件在本地连接至代理。如果您的情况有所不同,请根据需要更新MQTT客户端部分。此外,执行下列操作:
| 应用程序开发人员 |
任务 | 描述 | 所需技能 |
---|---|---|
更新核心设备部署。 | 如果AWS物联网 Greengrass 第 2 版核心设备的部署已经存在,请修改部署。如果部署不存在,请创建新部署。 要为组件指定正确的名称,请根据以下内容更新新组件的日志管理器配置(如果需要):
最后,完成AWS物联网 Greengrass 核心设备部署的修订。 | 应用程序开发人员 |
任务 | 描述 | 所需技能 |
---|---|---|
查看日志,了解AWS物联网 Greengrass 音量。 | 检查以下各项:
| 应用程序开发人员 |
检查 S3 存储桶。 | 验证数据是否正在上传至 S3 存储桶。您可以看到每个时间段在上传的文件。 您还可以通过查询下一部分中的数据,验证数据是否已上传至 S3 存储桶。 | 应用程序开发人员 |
任务 | 描述 | 所需技能 |
---|---|---|
创建数据库和表。 |
| 应用程序开发人员 |
授予 Athena 数据访问权限。 |
| 应用程序开发人员 |
故障排除
事务 | 解决方案 |
---|---|
MQTT客户端连接失败 |
|
MQTT客户订阅失败 | 验证MQTT代理的权限。如果你有来自的MQTT经纪商AWS,请参阅 MQTT3.1.1 经纪商 (Moquette) 和 MQTT5 个经纪商 (EMQX)。 |
无法创建 Parquet 文件 |
|
对象未上传至 S3 存储桶 |
|
相关资源
DataFrame
(Pandas 文档) Apache Parquet 文档
(Parquet 文档) 开发AWS物联网 Greengrass 组件(AWS物联网 Greengr ass 开发者指南,版本 2)
将AWS物联网 Greengrass 组件部署到设备(物AWS联网 Greengr ass 开发者指南,版本 2)
与本地物联网设备互动(I AWS oT Greengrass 开发者指南,版本 2)
MQTT3.1.1 经纪商(Moquette)(物联网 AWS Greengrass 开发者指南,版本 2)
MQTT5 经纪商 (EMQX)(AWSIoT Greengrass 开发者指南,版本 2)
其他信息
成本分析
以下成本分析情景演示了这种模式中涵盖的数据摄取方法如何影响云中的数据摄取成本。AWS此场景中的定价示例基于发布价格。价格可能会发生变化。此外,您的费用可能会有所不同,具体取决于您AWS所在的地区、AWS服务配额以及与您的云环境相关的其他因素。
输入信号集
该分析使用以下一组输入信号为基础,将物联网摄取成本与其他可用替代方案进行比较。
信号数量 | Frequency | 每个信号的数据 |
125 | 25 Hz | 8 字节 |
在此情况下,系统接收 125 个信号。每个信号为 8 字节,每 40 毫秒 (25 Hz) 会出现一次。这些信号可单独发出,也可以分组至公共有效载荷中。您可根据需要选择拆分和打包这些信号。您还可确定延迟。延迟由接收、累积和摄取数据时间段组成。
为便于比较,此场景的摄取操作以区域为基础。us-east-1
AWS成本比较仅适用于AWS服务。硬件或连接等其他成本未计入分析。
成本比较
下表显示了每种摄取方法的每月成本(美元USD)。
方法 | 月度成本 |
AWS物联网 SiteWise * | 331.77 USD |
AWS带有数据处理包的 IoT SiteWise Edge(将所有数据保存在边缘) | 200 USD |
AWS用于访问原始数据的物联网核心和 Amazon S3 规则 | 84.54 USD |
边缘 Parquet 文件压缩并上传至 Amazon S3 | 0.5 USD |
*必须对数据进行缩减采样,才能符合服务限额。这意味着使用此方法会丢失数据。
替代方法
本节显示了以下替代方法等效成本:
AWS物联网 SiteWise — 每个信号都必须以单独的消息形式上传。因此,每月的消息总数为 125×25×3600×24×30,相当于每月 81 亿条消息。但是,AWS物联网每个属性每秒 SiteWise 只能处理 10 个数据点。假设将数据缩减到 10 Hz,则每月的消息数量将减少到125×10×3600×24×30,即 32.4 亿条。如果您使用的发布者组件将测量结果打包成一组 10 个(USD每百万封邮件 1 个),则每月的费用USD为每月 324 个。假设每条消息为 8 字节 (1 Kb/125),则为 25.92 Gb 的数据存储空间。这增加了每月7.77 USD 的费用。第一个月的总费用为331.77,USD并且每月增加7.77 USD。
AWS带有数据处理包的 IoT SiteWise Edge,包括在边缘完全处理的所有模型和信号(即无需云端接入)— 您可以使用数据处理包作为替代方案,以降低成本并配置在边缘计算的所有模型。即使没有进行实际计算,也可以仅用于存储和可视化。在这种情况下,必须为边缘网关使用强大硬件。每月的固定费用为200 USD 美元。
通过将原始数据存储到 Amazon S3 中的物联网核心MQTT以及将原始数据存储在 Amazon S3 中的物联网规则 — 假设所有信号都发布在公共负载中,则发布到 Io AWS T Core 的消息总数为 25×3600×24×30,即每月 6,480 万条。AWSUSD按每百万条消息计算,每月费用为64.8 USD 条。USD按每百万条规则激活0.15次计算,每条消息只有一条规则,则每月增加19.44 USD 的费用。在 Amazon S3 中,USD按每 Gb 存储0.023的成本计算,USD每月再增加1.5英镑(每月增加以反映新数据)。第一个月的总费用为84.54USD,每月增加1. USD 5%。
在边缘压缩 Parquet 文件中的数据并上传至 Amazon S3(建议的方法)— 压缩率取决于数据的类型。使用相同的工业数据进行测试MQTT,整个月的总产出数据为1.2 Gb。这笔费用为USD每月0.03美元。其他基准测试中描述的压缩率(使用随机数据)约为 66%(更接近最坏的情况)。总数据量为 21 Gb,USD每月花费 0.5 Gb。
Parquet 文件生成器
以下代码示例显示了用 Python 编写的 Parquet 文件生成器结构。该代码示例仅用于说明,如果粘贴到您的环境中则不起作用。
import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)