本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:通过 MQTT 与本地物联网设备交互
您可以完成本教程,将核心设备配置为与通过 MQTT 连接到核心设备的本地 IoT 设备(称为客户端设备)进行交互。在本教程中,AWS IoT您将配置为使用云发现作为客户端设备连接到核心设备。配置云发现时,客户端设备可以向AWS IoT Greengrass云服务发送请求以发现核心设备。来自的响应AWS IoT Greengrass包括您配置客户端设备以发现的核心设备的连接信息和证书。然后,客户端设备可以使用此信息连接到可用的核心设备,在那里它可以通过 MQTT 进行通信。
在本教程中,您将执行以下操作:
-
如有必要,请查看并更新核心设备的权限。
-
将客户端设备与核心设备关联,以便他们可以使用云发现来发现核心设备。
-
将 Greengrass 组件部署到核心设备以启用客户端设备支持。
-
将客户端设备连接到核心设备并测试与AWS IoT Core云服务的通信。
-
开发一个与客户端设备通信的自定义 Greengrass 组件。
-
开发一个与客户端设备的AWS IoT设备影子交互的自定义组件。
本教程使用单核设备和单个客户端设备。您也可以按照教程连接和测试多个客户端设备。
在本教程中,预计花费 30-60 分钟。
先决条件
要完成本教程,您需要:
-
AWS 账户。如果没有,请参阅设置一个 AWS 账户。
-
具有管理员权限的 AWS Identity and Access Management (IAM) 用户。
-
一款 Greengrass 核心设备。有关如何设置核心设备的更多信息,请参阅设置 AWS IoT Greengrass 核心设备。
-
核心设备必须运行 Greengrass nucleus v2.6.0 或更高版本。此版本包括对本地发布/订阅通信中的通配符的支持以及对客户端设备影子的支持。
注意
客户端设备支持需要 Greengrass nucleus v2.0 或更高版本。但是,本教程探讨了较新的功能,例如在本地发布/订阅中支持 MQTT 通配符以及支持客户端设备影子。这些功能需要 Greengrass nucleus v2.6.0 或更高版本。
-
核心设备必须与客户端设备位于同一个网络上才能连接。
-
(可选)要完成开发自定义 Greengrass 组件的模块,核心设备必须运行 Greengrass CLI。有关更多信息,请参阅 安装 Greengrass CLI。
-
-
在本教程中,可以作为客户端设备进行连接。AWS IoT有关更多信息,请参阅《AWS IoT Core开发人员指南》中的创建AWS IoT资源。
-
客户端设备的AWS IoT策略必须允许该
greengrass:Discover
权限。有关更多信息,请参阅 客户端设备的最低AWS IoT政策。 -
客户端设备必须与核心设备位于同一个网络上。
-
客户端设备必须运行 Python 3
。 -
客户端设备必须运行 Git
。
-
步骤 1:查看并更新核心设备AWS IoT政策
要支持客户端设备,核心设备的AWS IoT策略必须允许以下权限:
-
greengrass:PutCertificateAuthorities
-
greengrass:VerifyClientDeviceIdentity
-
greengrass:VerifyClientDeviceIoTCertificateAssociation
-
greengrass:GetConnectivityInfo
-
greengrass:UpdateConnectivityInfo
—(可选)使用 IP 检测器组件需要此权限,该组件将核心设备的网络连接信息报告给AWS IoT Greengrass云服务。
有关核心设备的这些权限和AWS IoT策略的更多信息,请参阅数据层面操作的 AWS IoT 策略和支持客户端设备的最低AWS IoT政策。
在本节中,您将查看核心设备的AWS IoT策略并添加缺少的所有必需权限。如果您使用AWS IoT Greengrass核心软件安装程序来配置资源,则您的核心设备具有允许访问所有AWS IoT Greengrass操作的AWS IoT策略(greengrass:*
)。在这种情况下,只有当您计划将影子管理器组件配置为与其同步设备影子时,才必须更新AWS IoT策略AWS IoT Core。否则,你可以跳过本节。
查看和更新核心设备的AWS IoT政策
-
在AWS IoT Greengrass控制台
导航菜单中,选择核心设备。 -
在核心设备页面上,选择要更新的核心设备。
-
在核心设备详细信息页面上,选择指向核心设备的 Thin g 的链接。此链接可在AWS IoT控制台中打开事物详细信息页面。
-
在事物详细信息页面上,选择证书。
-
在 “证书” 选项卡中,选择事物的有效证书。
-
在证书详细信息页面上,选择策略。
-
在 “策略” 选项卡中,选择要查看和更新的AWS IoT策略。您可以向附加到核心设备活动证书的任何策略添加所需的权限。
注意
如果您使用AWS IoT Greengrass核心软件安装程序来配置资源,则有两个AWS IoT策略。我们建议您选择名为GreengrassV2IoTThingPolicy的策略(如果存在)。默认情况下,使用快速安装程序创建的核心设备使用此策略名称。如果您向此策略添加权限,则也将这些权限授予使用此策略的其他核心设备。
-
在策略概述中,选择编辑活动版本。
-
查看策略以获取所需权限,然后添加缺少的所有必需权限。
-
要将新的策略版本设置为活动版本,请在策略版本状态下,选择将编辑后的版本设置为该策略的活动版本。
-
选择另存为新版本。
步骤 2:启用客户端设备支持
要使客户端设备使用云发现连接到核心设备,必须关联这些设备。当您将客户端设备与核心设备关联时,可以使该客户端设备检索核心设备的 IP 地址和证书以用于连接。
要使客户端设备能够安全地连接到核心设备并与 Greengrass 组件通信,请将以下 Greengrass 组件AWS IoT Core部署到核心设备:
-
客户端设备身份验证 (
aws.greengrass.clientdevices.Auth
)部署客户端设备身份验证组件以对客户端设备进行身份验证并授权客户端设备操作。这个组件允许你的AWS IoT东西连接到核心设备。
此组件需要一些配置才能使用。您必须指定客户端设备组以及每个组有权执行的操作,例如通过 MQTT 进行连接和通信。有关更多信息,请参阅客户端设备身份验证组件配置。
-
MQTT 3.1.1 经纪商 (Moquette) (
aws.greengrass.clientdevices.mqtt.Moquette
)部署 Moquette MQTT 代理组件来运行轻量级 MQTT 代理。Moquette MQTT 代理符合 MQTT 3.1.1,包括对 QoS 0、QoS 1、QoS 2、保留消息、最后遗嘱消息和永久订阅的本地支持。
您无需配置此组件即可使用它。但是,您可以配置此组件运行 MQTT 代理的端口。默认情况下,它使用端口 8883。
-
MQTT 网桥 (
aws.greengrass.clientdevices.mqtt.Bridge
)(可选)部署 MQTT 桥接组件,以便在客户端设备(本地 MQTT)、本地发布/订阅和 MQTT 之间中继消息。AWS IoT Core将此组件配置为与 Greengrass 组件中的客户端设备同步AWS IoT Core并与客户端设备进行交互。
此组件需要配置才能使用。您必须指定此组件中继消息的主题映射。有关更多信息,请参阅 MQTT 网桥组件配置。
-
IP 探测器 (
aws.greengrass.clientdevices.IPDetector
)(可选)部署 IP 检测器组件,自动向AWS IoT Greengrass云服务报告核心设备的 MQTT 代理端点。如果您的网络设置很复杂,例如路由器将 MQTT 代理端口转发到核心设备的网络设置,则无法使用此组件。
您无需配置此组件即可使用它。
在本节中,您将使用AWS IoT Greengrass控制台关联客户端设备并将客户端设备组件部署到核心设备。
启用客户端设备支持
-
在左侧导航菜单中,选择核心设备。
-
在核心设备页面上,选择要在其中启用客户端设备支持的核心设备。
-
在核心设备详细信息页面上,选择客户端设备选项卡。
-
在 “客户端设备” 选项卡上,选择 “配置云发现”。
将打开 “配置核心设备” 发现页面。在此页面上,您可以将客户端设备与核心设备关联并部署客户端设备组件。本页在步骤 1:选择目标核心设备中为您选择核心设备。
注意
您也可以使用此页为事物组配置核心设备发现。如果选择此选项,则可以将客户端设备组件部署到事物组中的所有核心设备。但是,如果选择此选项,则必须在创建部署后手动将客户端设备与每台核心设备关联起来。在本教程中,您将配置单核设备。
-
在步骤 2:关联客户端设备中,将客户端设备的设备AWS IoT与核心设备相关联。这使客户端设备能够使用云发现来检索核心设备的连接信息和证书。执行以下操作:
-
选择 “关联客户端设备”。
-
在 “将客户端设备与核心设备关联” 模式中,输入要关联AWS IoT的事物的名称。
-
选择添加。
-
选择关联。
-
-
在步骤 3:配置和部署 Greengrass 组件中,部署组件以启用客户端设备支持。如果目标核心设备以前部署过,则此页面将修改该部署。否则,此页面将为核心设备创建新的部署。要配置和部署客户端设备组件,请执行以下操作:
-
核心设备必须运行 Greengrass nucleus v2.6.0 或更高版本才能完成本教程。如果核心设备运行的是早期版本,请执行以下操作:
-
选中该框以部署组aws.greengrass.Nucleus件。
-
对于组aws.greengrass.Nucleus件,选择编辑配置。
-
对于组件版本,请选择版本 2.6.0 或更高版本。
-
选择确认。
注意
如果您将 Greengrass nucleus 从较早的次要版本升级,并且核心设备AWS运行依赖于该核的组件,则还必须将提供的组件更新到较新的版本。AWS在本教程的后面部分中查看部署时,可以配置这些组件的版本。有关更多信息,请参阅 更新AWS IoT Greengrass核心软件 (OTA)。
-
-
对于组aws.greengrass.clientdevices.Auth件,选择编辑配置。
-
在客户端设备身份验证组件的编辑配置模式中,配置授权策略,允许客户端设备在核心设备上发布和订阅 MQTT 代理。执行以下操作:
-
在 “配置” 下的 “合并代码配置” 块中,输入以下配置,其中包含客户端设备授权策略。每个设备组授权策略都指定了一组操作以及客户端设备可以用来执行这些操作的资源。
-
此策略允许名称
MyClientDevice
以开头的客户端设备就所有 MQTT 主题进行连接和通信。将MyClientDevice*
替换为要作为客户端设备连接AWS IoT的事物的名称。您也可以使用*
通配符指定与客户端设备名称相匹配的名称。通*
配符必须位于名称的末尾。如果您要连接第二台客户端设备,请将
MyOtherClientDevice*
替换为该客户端设备的名称或与该客户端设备名称匹配的通配符模式。否则,您可以删除或保留选择规则的这一部分,该部分允许名称匹配的客户端设备MyOtherClientDevice*
进行连接和通信。 -
此策略还使用
OR
运算符来允许名称MyOtherClientDevice
以开头的客户端设备就所有 MQTT 主题进行连接和通信。您可以删除选择规则中的此子句,也可以对其进行修改以匹配要连接的客户端设备。 -
此政策允许客户端设备发布和订阅所有 MQTT 主题。要遵循最佳安全实践,请将
mqtt:publish
和mqtt:subscribe
操作限制在客户端设备用于通信的最少一组话题上。
{ "deviceGroups": { "formatVersion": "2021-03-05", "definitions": { "MyDeviceGroup": { "selectionRule": "thingName:
MyClientDevice*
OR thingName:MyOtherClientDevice*
", "policyName": "MyClientDevicePolicy" } }, "policies": { "MyClientDevicePolicy": { "AllowConnect": { "statementDescription": "Allow client devices to connect.", "operations": [ "mqtt:connect" ], "resources": [ "*" ] }, "AllowPublish": { "statementDescription": "Allow client devices to publish to all topics.", "operations": [ "mqtt:publish" ], "resources": [ "*" ] }, "AllowSubscribe": { "statementDescription": "Allow client devices to subscribe to all topics.", "operations": [ "mqtt:subscribe" ], "resources": [ "*" ] } } } } }有关更多信息,请参阅客户端设备身份验证组件配置。
-
-
选择确认。
-
-
对于组aws.greengrass.clientdevices.mqtt.Bridge件,选择编辑配置。
-
在 MQTT 网桥组件的编辑配置模式中,配置主题映射,将 MQTT 消息从客户端设备中继到。AWS IoT Core执行以下操作:
-
在 “配置” 下的 “合并代码配置” 块中,输入以下配置。此配置指定将
clients/+/hello/world
主题过滤器上的 MQTT 消息从客户端设备中继到AWS IoT Core云服务。例如,此主题筛选条件与clients/MyClientDevice1/hello/world
主题匹配。{ "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" } } }
有关更多信息,请参阅 MQTT 网桥组件配置。
-
选择确认。
-
-
-
选择 “查看并部署” 以查看此页为您创建的部署。
-
如果您之前未在此区域设置过 Greengrass 服务角色,则控制台会打开一个模式来为您设置服务角色。客户端设备身份验证组件使用此服务角色来验证客户端设备的身份,IP 检测器组件使用此服务角色来管理核心设备的连接信息。选择 Grant permissions(授予权限)。
-
在 “查看” 页面上,选择 “部署” 以开始部署到核心设备。
-
要验证部署是否成功,请检查部署状态并检查核心设备上的日志。要查看核心设备上的部署状态,可以在部署概述中选择 Targ et。有关更多信息,请参阅下列内容:
步骤 3:Connect 客户端设备
客户端设备可以使用AWS IoT Device SDK来发现、连接核心设备并与之通信。客户端设备必须是一个AWS IoT东西。有关更多信息,请参阅《AWS IoT Core开发人员指南》中的创建事物对象。
在本节中,您将安装适用于 Python 的 AWS IoT Device SDK v2
注意
AWS IoT Device SDK还提供其他编程语言版本。本教程使用适用于 Python 的 AWS IoT Device SDK v2,但你可以根据自己的用例探索其他 SDK。有关更多信息,请参阅AWS IoT Core 开发人员指南中的 AWS IoT Device 软件开发工具包。
将客户端设备连接到核心设备
-
下载适用于 Python 的 AWS IoT Device SDK v2
并将其安装到要作为客户端设备连接的设备上。AWS IoT 在客户端设备上,执行以下操作:
-
克隆适用于 Python 的 AWS IoT Device SDK v2 版本库进行下载。
git clone https://github.com/aws/aws-iot-device-sdk-python-v2.git
-
安装适用于 Python 的 AWS IoT Device SDK v2。
python3 -m pip install --user ./aws-iot-device-sdk-python-v2
-
-
在 python 版 AWS IoT Device SDK v2 中切换到示例文件夹。
cd aws-iot-device-sdk-python-v2/samples
-
运行示例 Greengrass 发现应用程序。此应用程序需要指定客户端设备事物名称、要使用的 MQTT 主题和消息以及用于验证和保护连接的证书的参数。以下示例向
clients/
主题发送了 Hello World 消息。MyClientDevice1
/hello/world注意
本主题与您配置 MQTT 网桥以将消息中继到AWS IoT Core之前的主题相匹配。
-
将
MyClientDevice1
替换为客户端设备的事物名称。 -
将
~/certs/ AmazonRoot ca1.pem 替
换为客户端设备上亚马逊根 CA 证书的路径。 -
将
~/certs/device.pem.crt 替换为客户端设备
上设备证书的路径。 -
将
~/certs/private.pem.key 替换为客户端设备上私钥
文件的路径。 -
将
us-east-1
替换为您的客户端设备和核心设备运行的AWS区域。
python3 basic_discovery.py \\ --thing_name
MyClientDevice1
\\ --topic 'clients/MyClientDevice1
/hello/world' \\ --message 'Hello World!' \\ --ca_file~/certs/AmazonRootCA1.pem
\\ --cert~/certs/device.pem.crt
\\ --key~/certs/private.pem.key
\\ --regionus-east-1
\\ --verbosity Warn发现示例应用程序发送消息 10 次并断开连接。它还订阅发布消息的同一主题。如果输出显示应用程序收到了有关该主题的 MQTT 消息,则客户端设备可以成功地与核心设备通信。
Performing greengrass discovery... awsiot.greengrass_discovery.DiscoverResponse(gg_groups=[awsiot.greengrass_discovery.GGGroup(gg_group_id='greengrassV2-coreDevice-MyGreengrassCore', cores=[awsiot.greengrass_discovery.GGCore(thing_arn='arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore', connectivity=[awsiot.greengrass_discovery.ConnectivityInfo(id='203.0.113.0', host_address='203.0.113.0', metadata='', port=8883)])], certificate_authorities=['-----BEGIN CERTIFICATE-----\
MIICiT...EXAMPLE=
\ -----END CERTIFICATE-----\ '])]) Trying core arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore at host 203.0.113.0 port 8883 Connected! Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 0} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 0}' Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 1} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 1}'...
Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 9} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 9}'如果应用程序改为输出错误,请参阅 Greengrass 发现问题疑难解答。
您还可以查看核心设备上的 Greengrass 日志,以验证客户端设备是否成功连接和发送消息。有关更多信息,请参阅 监控AWS IoT Greengrass日志。
-
-
验证 MQTT 网桥是否将来自客户端设备的消息中继到。AWS IoT Core您可以在AWS IoT Core控制台中使用 MQTT 测试客户端订阅 MQTT 主题筛选器。执行以下操作:
-
导航到 AWS IoT 控制台
。 -
在左侧导航菜单的测试下,选择 MQTT 测试客户端。
-
在 “订阅主题” 选项卡上,在 “主题筛选器” 中,输入
clients/+/hello/world
订阅来自核心设备的客户端设备消息。 -
选择订阅。
-
再次在客户端设备上运行发布/订阅应用程序。
MQTT 测试客户端显示您从客户端设备发送的与该主题过滤器匹配的主题的消息。
-
步骤 4:开发可与客户端设备通信的组件
您可以开发与客户端设备通信的 Greengrass 组件。组件使用进程间通信 (IPC) 和本地发布/订阅接口在核心设备上进行通信。要与客户端设备交互,请将 MQTT 桥接组件配置为在客户端设备和本地发布/订阅接口之间中继消息。
在本节中,您将更新 MQTT 桥接组件,以将来自客户端设备的消息中继到本地发布/订阅接口。然后,你开发一个订阅这些消息并在收到消息时打印这些消息的组件。
开发可与客户端设备通信的组件
-
修改核心设备的部署,并配置 MQTT 网桥组件以将来自客户端设备的消息中继到本地发布/订阅。执行以下操作:
-
在左侧导航菜单中,选择核心设备。
-
在核心设备页面上,选择本教程中使用的核心设备。
-
在核心设备详细信息页面上,选择客户端设备选项卡。
-
在 “客户端设备” 选项卡上,选择 “配置云发现”。
将打开 “配置核心设备” 发现页面。在此页面上,您可以更改或配置部署到核心设备的客户端设备组件。
-
在步骤 3 中,对于组aws.greengrass.clientdevices.mqtt.Bridge件,选择编辑配置。
-
在 MQTT 网桥组件的编辑配置模式中,配置主题映射,将 MQTT 消息从客户端设备中继到本地发布/订阅接口。执行以下操作:
-
在 “配置” 下的 “合并代码配置” 块中,输入以下配置。此配置指定将与主题过滤器匹配的主题上的 MQTT 消息从客户端设备中继到AWS IoT Core云服务和
clients/+/hello/world
本地 Greengrass 发布/订阅代理。{ "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" }, "HelloWorldPubsubMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "Pubsub" } } }
有关更多信息,请参阅 MQTT 网桥组件配置。
-
选择确认。
-
-
选择 “查看并部署” 以查看此页为您创建的部署。
-
在 “查看” 页面上,选择 “部署” 以开始部署到核心设备。
-
要验证部署是否成功,请检查部署状态并检查核心设备上的日志。要查看核心设备上的部署状态,可以在部署概述中选择 Targ et。有关更多信息,请参阅下列内容:
-
-
开发和部署一个 Greengrass 组件,用于订阅来自客户端设备的 Hello World 消息。执行以下操作:
-
在核心设备上为配方和工件创建文件夹。
重要
必须使用以下格式作为对象文件夹路径。包括您在配方中指定的组件名称和版本。
artifacts/
componentName
/componentVersion
/ -
使用文本编辑器创建包含以下内容的组件配方。此配方指定安装适用于 Python 的 AWS IoT Device SDK v2 并运行订阅主题并打印消息的脚本。
例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano recipes/com.example.clientdevices.MyHelloWorldSubscriber-1.0.0.json
将以下配方复制到文件中。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.clientdevices.MyHelloWorldSubscriber", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to Hello World messages from client devices.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.clientdevices.MyHelloWorldSubscriber:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "run": "python3 -u {artifacts:path}/hello_world_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "run": "py -3 -u {artifacts:path}/hello_world_subscriber.py" } } ] }
-
使用文本编辑器创建名
hello_world_subscriber.py
为以下内容的 Python 脚本构件。此应用程序使用发布/订阅 IPC 服务来订阅clients/+/hello/world
主题并打印它收到的消息。例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/hello_world_subscriber.py
将以下 Python 代码复制到文件中。
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 CLIENT_DEVICE_HELLO_WORLD_TOPIC = 'clients/+/hello/world' TIMEOUT = 10 def on_hello_world_message(event): try: message = str(event.binary_message.message, 'utf-8') print('Received new message: %s' % message) except: traceback.print_exc() try: ipc_client = GreengrassCoreIPCClientV2() # SubscribeToTopic returns a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic( topic=CLIENT_DEVICE_HELLO_WORLD_TOPIC, on_stream_event=on_hello_world_message) print('Successfully subscribed to topic: %s' % CLIENT_DEVICE_HELLO_WORLD_TOPIC) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') operation.close() except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
注意
此组件使用AWS IoT Device SDK适用于 Python 的 v2 中的 IPC 客户端 V2 与核心
软件进行通信。AWS IoT Greengrass与最初的 IPC 客户端相比,IPC 客户端 V2 减少了在自定义组件中使用 IPC 所需编写的代码量。 -
使用 Greengrass CLI 部署该组件。
-
-
查看组件日志,验证组件是否成功安装并订阅了该主题。
您可以保持日志源处于打开状态,以验证核心设备是否收到消息。
-
在客户端设备上,再次运行示例 Greengrass 发现应用程序,向核心设备发送消息。
python3 basic_discovery.py \\ --thing_name
MyClientDevice1
\\ --topic 'clients/MyClientDevice1
/hello/world' \\ --message 'Hello World!' \\ --ca_file~/certs/AmazonRootCA1.pem
\\ --cert~/certs/device.pem.crt
\\ --key~/certs/private.pem.key
\\ --regionus-east-1
\\ --verbosity Warn -
再次查看组件日志,以验证该组件是否接收并打印了来自客户端设备的消息。
步骤 5:开发与客户端设备影子交互的组件
您可以开发与客户端设备的设备影子交互的 Greengrass 组件。AWS IoT影子是一个 JSON 文档,用于存储诸如客户端设备AWS IoT之类的当前或所需状态信息。即使客户端设备未连接到,自定义组件也可以访问客户端设备的影子以AWS IoT管理其状态。每AWS IoT件事物都有一个未命名的阴影,你也可以为每个事物创建多个命名的阴影。
在本节中,您将部署影子管理器组件来管理核心设备上的阴影。您还可以更新 MQTT 网桥组件,以便在客户端设备和影子管理器组件之间中继影子消息。然后,开发一个用于更新客户端设备影子的组件,并在客户端设备上运行一个示例应用程序,以响应来自该组件的影子更新。该组件代表智能灯光管理应用程序,其中核心设备管理作为客户端设备连接到它的智能灯的颜色状态。
开发与客户端设备影子交互的组件
-
修改核心设备的部署以部署影子管理器组件,并配置 MQTT 网桥组件以在客户端设备和影子管理器通信的本地发布/订阅之间中继影子消息。执行以下操作:
-
在左侧导航菜单中,选择核心设备。
-
在核心设备页面上,选择本教程中使用的核心设备。
-
在核心设备详细信息页面上,选择客户端设备选项卡。
-
在 “客户端设备” 选项卡上,选择 “配置云发现”。
将打开 “配置核心设备” 发现页面。在此页面上,您可以更改或配置部署到核心设备的客户端设备组件。
-
在步骤 3 中,对于组aws.greengrass.clientdevices.mqtt.Bridge件,选择编辑配置。
-
在 MQTT bridge 组件的编辑配置模式中,配置主题映射,以便在客户端设备和本地发布/订阅接口之间中继设备影子主题上的 MQTT 消息。您还要确认部署指定了兼容的 MQTT 桥接版本。客户端设备影子支持需要 MQTT 网桥 v2.2.0 或更高版本。执行以下操作:
-
对于组件版本,请选择版本 2.2.0 或更高版本。
-
在 “配置” 下的 “合并代码配置” 块中,输入以下配置。此配置指定在影子主题上中继 MQTT 消息。
{ "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" }, "HelloWorldPubsubMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "Pubsub" }, "ShadowsLocalMqttToPubsub": { "topic": "$aws/things/+/shadow/#", "source": "LocalMqtt", "target": "Pubsub" }, "ShadowsPubsubToLocalMqtt": { "topic": "$aws/things/+/shadow/#", "source": "Pubsub", "target": "LocalMqtt" } } }
有关更多信息,请参阅 MQTT 网桥组件配置。
-
选择确认。
-
-
在步骤 3 中,选择要部署的aws.greengrass.ShadowManager组件。
-
选择 “查看并部署” 以查看此页为您创建的部署。
-
在 “查看” 页面上,选择 “部署” 以开始部署到核心设备。
-
要验证部署是否成功,请检查部署状态并检查核心设备上的日志。要查看核心设备上的部署状态,可以在部署概述中选择 Targ et。有关更多信息,请参阅下列内容:
-
-
开发和部署用于管理智能轻型客户端设备的 Greengrass 组件。执行以下操作:
-
在核心设备上为组件的工件创建一个文件夹。
重要
必须使用以下格式作为对象文件夹路径。包括您在配方中指定的组件名称和版本。
artifacts/
componentName
/componentVersion
/ -
使用文本编辑器创建包含以下内容的组件配方。此配方指定安装适用于 Python 的 AWS IoT Device SDK v2 并运行一个脚本,该脚本与智能灯光客户端设备的阴影交互以管理其颜色。
例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano recipes/com.example.clientdevices.MySmartLightManager-1.0.0.json
将以下配方复制到文件中。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.clientdevices.MySmartLightManager", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that interacts with smart light client devices.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.Nucleus": { "VersionRequirement": "^2.6.0" }, "aws.greengrass.ShadowManager": { "VersionRequirement": "^2.2.0" }, "aws.greengrass.clientdevices.mqtt.Bridge": { "VersionRequirement": "^2.2.0" } }, "ComponentConfiguration": { "DefaultConfiguration": { "smartLightDeviceNames": [], "accessControl": { "aws.greengrass.ShadowManager": { "com.example.clientdevices.MySmartLightManager:shadow:1": { "policyDescription": "Allows access to client devices' unnamed shadows", "operations": [ "aws.greengrass#GetThingShadow", "aws.greengrass#UpdateThingShadow" ], "resources": [ "$aws/things/MyClientDevice*/shadow" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.clientdevices.MySmartLightManager:pubsub:1": { "policyDescription": "Allows access to client devices' unnamed shadow updates", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "$aws/things/+/shadow/update/accepted" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "run": "python3 -u {artifacts:path}/smart_light_manager.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "run": "py -3 -u {artifacts:path}/smart_light_manager.py" } } ] }
-
使用文本编辑器创建名
smart_light_manager.py
为以下内容的 Python 脚本构件。此应用程序使用影子 IPC 服务来获取和更新客户端设备影子,使用本地发布/订阅 IPC 服务来接收报告的影子更新。例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano artifacts/com.example.clientdevices.MySmartLightManager/1.0.0/smart_light_manager.py
将以下 Python 代码复制到文件中。
import json import random import sys import time import traceback from uuid import uuid4 from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ResourceNotFoundError SHADOW_COLOR_PROPERTY = 'color' CONFIGURATION_CLIENT_DEVICE_NAMES = 'smartLightDeviceNames' COLORS = ['red', 'orange', 'yellow', 'green', 'blue', 'purple'] SHADOW_UPDATE_TOPIC = '$aws/things/+/shadow/update/accepted' SET_COLOR_INTERVAL = 15 class SmartLightDevice(): def __init__(self, client_device_name: str, reported_color: str = None): self.name = client_device_name self.reported_color = reported_color self.desired_color = None class SmartLightDeviceManager(): def __init__(self, ipc_client: GreengrassCoreIPCClientV2): self.ipc_client = ipc_client self.devices = {} self.client_tokens = set() self.shadow_update_accepted_subscription_operation = None self.client_device_names_configuration_subscription_operation = None self.update_smart_light_device_list() def update_smart_light_device_list(self): # Update the device list from the component configuration. response = self.ipc_client.get_configuration( key_path=[CONFIGURATION_CLIENT_DEVICE_NAMES]) # Identify the difference between the configuration and the currently tracked devices. current_device_names = self.devices.keys() updated_device_names = response.value[CONFIGURATION_CLIENT_DEVICE_NAMES] added_device_names = set(updated_device_names) - set(current_device_names) removed_device_names = set(current_device_names) - set(updated_device_names) # Stop tracking any smart light devices that are no longer in the configuration. for name in removed_device_names: print('Removing %s from smart light device manager' % name) self.devices.pop(name) # Start tracking any new smart light devices that are in the configuration. for name in added_device_names: print('Adding %s to smart light device manager' % name) device = SmartLightDevice(name) device.reported_color = self.get_device_reported_color(device) self.devices[name] = device print('Current color for %s is %s' % (name, device.reported_color)) def get_device_reported_color(self, smart_light_device): try: response = self.ipc_client.get_thing_shadow( thing_name=smart_light_device.name, shadow_name='') shadow = json.loads(str(response.payload, 'utf-8')) if 'reported' in shadow['state']: return shadow['state']['reported'].get(SHADOW_COLOR_PROPERTY) return None except ResourceNotFoundError: return None def request_device_color_change(self, smart_light_device, color): # Generate and track a client token for the request. client_token = str(uuid4()) self.client_tokens.add(client_token) # Create a shadow payload, which must be a blob. payload_json = { 'state': { 'desired': { SHADOW_COLOR_PROPERTY: color } }, 'clientToken': client_token } payload = bytes(json.dumps(payload_json), 'utf-8') self.ipc_client.update_thing_shadow( thing_name=smart_light_device.name, shadow_name='', payload=payload) smart_light_device.desired_color = color def subscribe_to_shadow_update_accepted_events(self): if self.shadow_update_accepted_subscription_operation == None: # SubscribeToTopic returns a tuple with the response and the operation. _, self.shadow_update_accepted_subscription_operation = self.ipc_client.subscribe_to_topic( topic=SHADOW_UPDATE_TOPIC, on_stream_event=self.on_shadow_update_accepted_event) print('Successfully subscribed to shadow update accepted topic') def close_shadow_update_accepted_subscription(self): if self.shadow_update_accepted_subscription_operation is not None: self.shadow_update_accepted_subscription_operation.close() def on_shadow_update_accepted_event(self, event): try: message = str(event.binary_message.message, 'utf-8') accepted_payload = json.loads(message) # Check for reported states from smart light devices and ignore desired states from components. if 'reported' in accepted_payload['state']: # Process this update only if it uses a client token created by this component. client_token = accepted_payload.get('clientToken') if client_token is not None and client_token in self.client_tokens: self.client_tokens.remove(client_token) shadow_state = accepted_payload['state']['reported'] if SHADOW_COLOR_PROPERTY in shadow_state: reported_color = shadow_state[SHADOW_COLOR_PROPERTY] topic = event.binary_message.context.topic client_device_name = topic.split('/')[2] if client_device_name in self.devices: # Set the reported color for the smart light device. self.devices[client_device_name].reported_color = reported_color print( 'Received shadow update confirmation from client device: %s' % client_device_name) else: print("Shadow update doesn't specify color") except: traceback.print_exc() def subscribe_to_client_device_name_configuration_updates(self): if self.client_device_names_configuration_subscription_operation == None: # SubscribeToConfigurationUpdate returns a tuple with the response and the operation. _, self.client_device_names_configuration_subscription_operation = self.ipc_client.subscribe_to_configuration_update( key_path=[CONFIGURATION_CLIENT_DEVICE_NAMES], on_stream_event=self.on_client_device_names_configuration_update_event) print( 'Successfully subscribed to configuration updates for smart light device names') def close_client_device_names_configuration_subscription(self): if self.client_device_names_configuration_subscription_operation is not None: self.client_device_names_configuration_subscription_operation.close() def on_client_device_names_configuration_update_event(self, event): try: if CONFIGURATION_CLIENT_DEVICE_NAMES in event.configuration_update_event.key_path: print('Received configuration update for list of client devices') self.update_smart_light_device_list() except: traceback.print_exc() def choose_random_color(): return random.choice(COLORS) def main(): try: # Create an IPC client and a smart light device manager. ipc_client = GreengrassCoreIPCClientV2() smart_light_manager = SmartLightDeviceManager(ipc_client) smart_light_manager.subscribe_to_shadow_update_accepted_events() smart_light_manager.subscribe_to_client_device_name_configuration_updates() try: # Keep the main thread alive, or the process will exit. while True: # Set each smart light device to a random color at a regular interval. for device_name in smart_light_manager.devices: device = smart_light_manager.devices[device_name] desired_color = choose_random_color() print('Chose random color (%s) for %s' % (desired_color, device_name)) if desired_color == device.desired_color: print('Desired color for %s is already %s' % (device_name, desired_color)) elif desired_color == device.reported_color: print('Reported color for %s is already %s' % (device_name, desired_color)) else: smart_light_manager.request_device_color_change( device, desired_color) print('Requested color change for %s to %s' % (device_name, desired_color)) time.sleep(SET_COLOR_INTERVAL) except InterruptedError: print('Application interrupted') smart_light_manager.close_shadow_update_accepted_subscription() smart_light_manager.close_client_device_names_configuration_subscription() except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) if __name__ == '__main__': main()
这个 Python 应用程序执行以下操作:
-
读取组件的配置以获取要管理的智能轻型客户端设备列表。
-
使用 SubscribeToConfigurationUpdate IPC 操作订阅配置更新通知。每次组件的配置发生变化时,C AWS IoT Greengrass ore 软件都会发送通知。当组件收到配置更新通知时,它会更新其管理的智能轻型客户端设备列表。
-
获取每台智能灯光客户端设备的阴影以获取其初始颜色状态。
-
每隔 15 秒将每台智能灯客户端设备的颜色设置为随机颜色。该组件更新客户端设备的事物阴影以更改其颜色。此操作通过 MQTT 向客户端设备发送阴影增量事件。
-
使用 IPC 操作在本地发布/订阅接口上订阅影子更新已接受的SubscribeToTopic消息。该组件接收这些消息以跟踪每台智能灯客户端设备的颜色。当智能轻型客户端设备收到影子更新时,它会发送 MQTT 消息以确认已收到更新。MQTT 网桥将此消息中继到本地发布/订阅接口。
-
-
使用 Greengrass CLI 部署该组件。部署此组件时,需要指定由其管理影子的客户端设备列表。
smartLightDeviceNames
将MyClientDevice1
替换为客户端设备的事物名称。
-
-
查看组件日志,验证组件是否成功安装和运行。
该组件发送更改智能灯客户端设备颜色的请求。影子管理器接收请求并设置影子的
desired
状态。但是,智能灯客户端设备尚未运行,因此阴影的reported
状态不会改变。该组件的日志包含以下消息。2022-07-07T03:49:24.908Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Chose random color (blue) for MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.912Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Requested color change for MyClientDevice1 to blue. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING}
您可以将日志源保持打开状态,以查看组件何时打印消息。
-
下载并运行使用 Greengrass 发现功能并订阅设备影子更新的示例应用程序。在客户端设备上,执行以下操作:
-
在 python 版 AWS IoT Device SDK v2 中切换到示例文件夹。此示例应用程序使用示例文件夹中的命令行解析模块。
cd aws-iot-device-sdk-python-v2/samples
-
使用文本编辑器创建名为的 Python 脚本
basic_discovery_shadow.py
,其内容如下。此应用程序使用 Greengrass 发现和阴影来保持客户端设备和核心设备之间的属性同步。例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano basic_discovery_shadow.py
将以下 Python 代码复制到文件中。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. from awscrt import io from awscrt import mqtt from awsiot import iotshadow from awsiot.greengrass_discovery import DiscoveryClient from awsiot import mqtt_connection_builder from concurrent.futures import Future import sys import threading import traceback from uuid import uuid4 # Parse arguments import utils.command_line_utils; cmdUtils = utils.command_line_utils.CommandLineUtils("Basic Discovery - Greengrass discovery example with device shadows.") cmdUtils.add_common_mqtt_commands() cmdUtils.add_common_topic_message_commands() cmdUtils.add_common_logging_commands() cmdUtils.register_command("key", "<path>", "Path to your key in PEM format.", True, str) cmdUtils.register_command("cert", "<path>", "Path to your client certificate in PEM format.", True, str) cmdUtils.remove_command("endpoint") cmdUtils.register_command("thing_name", "<str>", "The name assigned to your IoT Thing", required=True) cmdUtils.register_command("region", "<str>", "The region to connect through.", required=True) cmdUtils.register_command("shadow_property", "<str>", "The name of the shadow property you want to change (optional, default='color'", default="color") # Needs to be called so the command utils parse the commands cmdUtils.get_args() # Using globals to simplify sample code is_sample_done = threading.Event() mqtt_connection = None shadow_thing_name = cmdUtils.get_command_required("thing_name") shadow_property = cmdUtils.get_command("shadow_property") SHADOW_VALUE_DEFAULT = "off" class LockedData: def __init__(self): self.lock = threading.Lock() self.shadow_value = None self.disconnect_called = False self.request_tokens = set() locked_data = LockedData() def on_connection_interupted(connection, error, **kwargs): print('connection interrupted with error {}'.format(error)) def on_connection_resumed(connection, return_code, session_present, **kwargs): print('connection resumed with return code {}, session present {}'.format(return_code, session_present)) # Try IoT endpoints until we find one that works def try_iot_endpoints(): for gg_group in discover_response.gg_groups: for gg_core in gg_group.cores: for connectivity_info in gg_core.connectivity: try: print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port)) mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=connectivity_info.host_address, port=connectivity_info.port, cert_filepath=cmdUtils.get_command_required("cert"), pri_key_filepath=cmdUtils.get_command_required("key"), ca_bytes=gg_group.certificate_authorities[0].encode('utf-8'), on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed, client_id=cmdUtils.get_command_required("thing_name"), clean_session=False, keep_alive_secs=30) connect_future = mqtt_connection.connect() connect_future.result() print('Connected!') return mqtt_connection except Exception as e: print('Connection failed with exception {}'.format(e)) continue exit('All connection attempts failed') # Function for gracefully quitting this sample def exit(msg_or_exception): if isinstance(msg_or_exception, Exception): print("Exiting sample due to exception.") traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) else: print("Exiting sample:", msg_or_exception) with locked_data.lock: if not locked_data.disconnect_called: print("Disconnecting...") locked_data.disconnect_called = True future = mqtt_connection.disconnect() future.add_done_callback(on_disconnected) def on_disconnected(disconnect_future): # type: (Future) -> None print("Disconnected.") # Signal that sample is finished is_sample_done.set() def on_get_shadow_accepted(response): # type: (iotshadow.GetShadowResponse) -> None try: with locked_data.lock: # check that this is a response to a request from this session try: locked_data.request_tokens.remove(response.client_token) except KeyError: return print("Finished getting initial shadow state.") if locked_data.shadow_value is not None: print(" Ignoring initial query because a delta event has already been received.") return if response.state: if response.state.delta: value = response.state.delta.get(shadow_property) if value: print(" Shadow contains delta value '{}'.".format(value)) change_shadow_value(value) return if response.state.reported: value = response.state.reported.get(shadow_property) if value: print(" Shadow contains reported value '{}'.".format(value)) set_local_value_due_to_initial_query(response.state.reported[shadow_property]) return print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return except Exception as e: exit(e) def on_get_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: return if error.code == 404: print("Thing has no shadow document. Creating with defaults...") change_shadow_value(SHADOW_VALUE_DEFAULT) else: exit("Get request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def on_shadow_delta_updated(delta): # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None try: print("Received shadow delta event.") if delta.state and (shadow_property in delta.state): value = delta.state[shadow_property] if value is None: print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return else: print(" Delta reports that desired value is '{}'. Changing local value...".format(value)) if (delta.client_token is not None): print (" ClientToken is: " + delta.client_token) change_shadow_value(value, delta.client_token) else: print(" Delta did not report a change in '{}'".format(shadow_property)) except Exception as e: exit(e) def on_publish_update_shadow(future): #type: (Future) -> None try: future.result() print("Update request published.") except Exception as e: print("Failed to publish update request.") exit(e) def on_update_shadow_accepted(response): # type: (iotshadow.UpdateShadowResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(response.client_token) except KeyError: return try: if response.state.reported != None: if shadow_property in response.state.reported: print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore else: print ("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore else: print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None except: exit("Updated shadow is missing the target property") except Exception as e: exit(e) def on_update_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: return exit("Update request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def set_local_value_due_to_initial_query(reported_value): with locked_data.lock: locked_data.shadow_value = reported_value def change_shadow_value(value, token=None): with locked_data.lock: if locked_data.shadow_value == value: print("Local value is already '{}'.".format(value)) return print("Changed local shadow value to '{}'.".format(value)) locked_data.shadow_value = value print("Updating reported shadow value to '{}'...".format(value)) reuse_token = token is not None # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics if not reuse_token: token = str(uuid4()) # if the value is "clear shadow" then send a UpdateShadowRequest with None # for both reported and desired to clear the shadow document completely. if value == "clear_shadow": tmp_state = iotshadow.ShadowState(reported=None, desired=None, reported_is_nullable=True, desired_is_nullable=True) request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=tmp_state, client_token=token, ) # Otherwise, send a normal update request else: # if the value is "none" then set it to a Python none object to # clear the individual shadow property if value == "none": value = None request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=iotshadow.ShadowState( reported={ shadow_property: value } ), client_token=token, ) future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE) if not reuse_token: locked_data.request_tokens.add(token) future.add_done_callback(on_publish_update_shadow) if __name__ == '__main__': tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(cmdUtils.get_command_required("cert"), cmdUtils.get_command_required("key")) if cmdUtils.get_command(cmdUtils.m_cmd_ca_file): tls_options.override_default_trust_store_from_path(None, cmdUtils.get_command(cmdUtils.m_cmd_ca_file)) tls_context = io.ClientTlsContext(tls_options) socket_options = io.SocketOptions() print('Performing greengrass discovery...') discovery_client = DiscoveryClient(io.ClientBootstrap.get_or_create_static_default(), socket_options, tls_context, cmdUtils.get_command_required("region")) resp_future = discovery_client.discover(cmdUtils.get_command_required("thing_name")) discover_response = resp_future.result() print(discover_response) if cmdUtils.get_command("print_discover_resp_only"): exit(0) mqtt_connection = try_iot_endpoints() shadow_client = iotshadow.IotShadowClient(mqtt_connection) try: # Subscribe to necessary topics. # Note that is **is** important to wait for "accepted/rejected" subscriptions # to succeed before publishing the corresponding "request". print("Subscribing to Update responses...") update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_accepted) update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_rejected) # Wait for subscriptions to succeed update_accepted_subscribed_future.result() update_rejected_subscribed_future.result() print("Subscribing to Get responses...") get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_accepted) get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_rejected) # Wait for subscriptions to succeed get_accepted_subscribed_future.result() get_rejected_subscribed_future.result() print("Subscribing to Delta events...") delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events( request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_shadow_delta_updated) # Wait for subscription to succeed delta_subscribed_future.result() # The rest of the sample runs asynchronously. # Issue request for shadow's current state. # The response will be received by the on_get_accepted() callback print("Requesting current shadow state...") with locked_data.lock: # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics token = str(uuid4()) publish_get_future = shadow_client.publish_get_shadow( request=iotshadow.GetShadowRequest(thing_name=shadow_thing_name, client_token=token), qos=mqtt.QoS.AT_LEAST_ONCE) locked_data.request_tokens.add(token) # Ensure that publish succeeds publish_get_future.result() except Exception as e: exit(e) # Wait for the sample to finish (user types 'quit', or an error occurs) is_sample_done.wait()
这个 Python 应用程序执行以下操作:
-
使用 Greengrass 发现功能来发现并连接到核心设备。
-
从核心设备请求影子文档以获取属性的初始状态。
-
订阅 shadow delta 事件,当属性的值与其
desired
reported
值不同时,核心设备会发送这些事件。当应用程序收到 shadow delta 事件时,它会更改属性的值并向核心设备发送更新以将新值设置为其reported
值。
该应用程序结合了 Greengrass 的发现和 v2 中的阴影样本。AWS IoT Device SDK
-
-
运行示例应用程序。此应用程序需要指定客户端设备事物名称、要使用的影子属性以及用于验证和保护连接的证书的参数。
-
将
MyClientDevice1
替换为客户端设备的事物名称。 -
将
~/certs/ AmazonRoot ca1.pem 替
换为客户端设备上亚马逊根 CA 证书的路径。 -
将
~/certs/device.pem.crt 替换为客户端设备
上设备证书的路径。 -
将
~/certs/private.pem.key 替换为客户端设备上私钥
文件的路径。 -
将
us-east-1
替换为您的客户端设备和核心设备运行的AWS区域。
python3 basic_discovery_shadow.py \ --thing_name
MyClientDevice1
\ --shadow_property color \ --ca_file~/certs/AmazonRootCA1.pem
\ --cert~/certs/device.pem.crt
\ --key~/certs/private.pem.key
\ --regionus-east-1
\ --verbosity Warn示例应用程序订阅影子主题并等待接收来自核心设备的阴影增量事件。如果输出表明应用程序接收并响应 shadow delta 事件,则客户端设备可以成功地与核心设备上的影子进行交互。
Performing greengrass discovery... awsiot.greengrass_discovery.DiscoverResponse(gg_groups=[awsiot.greengrass_discovery.GGGroup(gg_group_id='greengrassV2-coreDevice-MyGreengrassCore', cores=[awsiot.greengrass_discovery.GGCore(thing_arn='arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore', connectivity=[awsiot.greengrass_discovery.ConnectivityInfo(id='203.0.113.0', host_address='203.0.113.0', metadata='', port=8883)])], certificate_authorities=['-----BEGIN CERTIFICATE-----\n
MIICiT...EXAMPLE=
\n-----END CERTIFICATE-----\n'])]) Trying core arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore at host 203.0.113.0 port 8883 Connected! Subscribing to Update responses... Subscribing to Get responses... Subscribing to Delta events... Requesting current shadow state... Received shadow delta event. Delta reports that desired value is 'purple'. Changing local value... ClientToken is: 3dce4d3f-e336-41ac-aa4f-7882725f0033 Changed local shadow value to 'purple'. Updating reported shadow value to 'purple'... Update request published.如果应用程序改为输出错误,请参阅 Greengrass 发现问题疑难解答。
您还可以查看核心设备上的 Greengrass 日志,以验证客户端设备是否成功连接和发送消息。有关更多信息,请参阅 监控AWS IoT Greengrass日志。
-
-
-
再次查看组件日志,以验证该组件是否收到来自智能轻型客户端设备的阴影更新确认。
该组件记录消息以确认智能轻型客户端设备已更改其颜色。
2022-07-07T03:49:24.908Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Chose random color (blue) for MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.912Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Requested color change for MyClientDevice1 to blue. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.959Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Received shadow update confirmation from client device: MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING}
注意
客户端设备的影子在核心设备和客户端设备之间同步。但是,核心设备不会将客户端设备的影子与同步AWS IoT Core。例如,您可以将影子与同步,AWS IoT Core以查看或修改队列中所有设备的状态。有关如何配置阴影管理器组件以与之同步阴影的更多信息AWS IoT Core,请参阅将本地设备阴影与同步 AWS IoT Core。
你已经完成了本教程。客户端设备连接到核心设备,向和 Greengrass 组件发送 MQTT 消息,AWS IoT Core并接收来自核心设备的影子更新。有关本教程所涵盖主题的更多信息,请参阅以下内容: