coreMQTT 连接共享演示 - FreeRTOS

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

coreMQTT 连接共享演示

重要

这是《FreeRTOS 用户指南》的存档版本,可用于 FreeRTOS 版本 202012.00。有关本文档的最新版本,请参阅《FreeRTOS 用户指南》

简介

CoreMQTT 连接共享演示项目向您展示了如何使用多线程应用程序通过 TLS 建立与 AWS MQTT 代理的连接,并在客户端和服务器之间进行相互身份验证。此演示使用基于 mbedTLS 的传输接口实现来建立经过服务器和客户端身份验证的 TLS 连接,并演示了 QoS 1 级别的 MQTT 订阅发布工作流程。该演示订阅主题筛选条件,发布与筛选条件匹配的主题,然后等待从 QoS 1 级别的服务器接收这些消息。这种向代理发布消息并从代理接收相同消息的循环过程会无限期地重复。本演示中的消息按照 QoS 1 发送,这样可以保证至少有一次按照 MQTT 规范传递。

注意

要设置和运行 FreeRTOS 演示,请按照开始使用 FreeRTOS中的步骤操作。

此演示使用线程安全队列来保存与 MQTT API 交互的命令。此演示中有四个任务需要注意。

  • 命令(主)任务从命令队列中获取命令并对其进行处理。其他任务将命令放入该队列进行处理。此任务会进入循环,在此期间它会处理命令。如果收到终止命令,则此任务将退出循环。

  • 同步发布者任务会创建一系列发布操作并将其推送到命令队列。然后,这些操作由命令任务运行。此任务使用同步发布,这意味着任务将等待每个发布操作完成后再安排下一个发布操作。

  • 同步发布者任务会创建一系列发布操作并将其推送到命令队列。然后,这些操作由命令任务运行。此任务与上一个任务的区别在于,它不会等到发布操作完成后才安排下一个任务。它会在所有发布操作都添加到队列中后检查每个发布操作的状态。请注意,同步发布和异步发布的区别仅在于这些任务的行为。实际发布命令并没有区别。

  • 订阅者任务创建对主题筛选条件的 MQTT 订阅,该筛选条件与两个发布者任务发布的所有消息的主题相匹配。此任务会进入循环,等待接收其他任务发布的消息。

任务可以通过队列来保存收到的消息。命令任务会将传入的消息推送到任何订阅了传入主题的任务的队列中。

此演示使用具有双向身份验证的 TLS 连接到 AWS。如果网络在演示过程中意外断开连接,则客户端会尝试使用指数回退逻辑重新连接。如果客户端重新连接成功,但代理无法恢复之前的会话,则客户端将重新订阅与上一个会话相同的主题。

单线程与多线程

coreMQTT 有两种使用模式,即单线程和多线程(多任务处理)。该演示展示了如何创建多线程方案。还有另一个多线程示例,在代理(或进程守护程序)任务中,它在后台运行 MQTT 协议。有关更多信息,请参阅 MWTT 代理和使用 coreMQTT 的演示。如果您在代理任务中运行 MQTT 协议,则无需显式管理任何 MQTT 状态或调用 MQTT_ProcessLoop 函数。此外,如果您使用代理任务,则多个应用程序任务可共享单个 MQTT 连接,而无需同步原语(例如,互斥锁)。

源代码

演示源文件已命名mqtt_demo_connection_sharing.c,可以在freertos/demos/coreMQTT/目录和 GitHub网站上找到。

功能

此演示一共创建四个任务:三个用于请求 MQTT API 调用,一个用于处理这些请求(这是主要任务)。在此演示中,主任务会进入循环,该循环将创建三个子任务,调用处理循环,然后进行清理。主任务创建与代理的单个 MQTT 连接,该连接在子任务之间共享。其中两个子任务向代理发布消息,第三个子任务使用主题筛选条件的 MQTT 订阅接收消息,该筛选条件与已发布消息的所有主题相匹配。

Typedefs

该演示定义了以下结构:枚举和函数指针。

命令

这些任务不是直接调用 MQTT API,而是使用 Command_t 结构来创建命令,指示主任务为它们调用相应的 API 操作。命令包括以下类型:

  • PROCESSLOOP

  • PUBLISH

  • SUBSCRIBE

  • UNSUBSCRIBE

  • PING

  • DISCONNECT

  • RECONNECT

  • TERMINATE

TERMINATE 命令没有相应的 MQTT API 操作。在演示中,它用于指示主任务停止处理命令并开始清理操作。因为某些 MQTT 命令(例如、MQTT_PublishMQTT_SubscribeMQTT_Unsubscribe)需要一些其他信息,例如发布或订阅信息,所以我们使用 CommandContext_t 字段。对于这三个命令,此字段是必需的,对于其他命令则是可选的。

由于这些命令需要此上下文,因此,将命令放入队列后,在完成之前,不要更改此值。命令完成后,可以调用可选的回调。在该演示中,我们使用一个创建任务通知的回调来通知调用任务该命令已完成。对于需要确认的 MQTT 操作(订阅、取消订阅和 QoS 大于 0 的发布),收到确认后,即认为该命令已完成。否则,相应的 MQTT API 调用返回后,命令即告完成。

mqtt_demo_connection_sharing.c 文件中可找到以下定义:

确认

由于某些 MQTT 操作需要确认,因此它们使用包含预期确认的数据包标识符和预期确认的原始命令的 AckInfo_t 数组,以便可以调用其完成回调。

订阅

该演示可跟踪每个任务的订阅。为此,每个请求订阅的任务都必须提供一个消息队列,用于接收已发布的消息 (SubscriptionElement_t)。多个任务可订阅同一个主题筛选条件,因为它们预期使用不同的响应队列。

收到的已发布消息

由于任务与主任务并行运行,因此,主任务必须等待每个订阅的任务才能读取收到的已发布消息,这既困难又耗时。因此,收到的每条消息都将复制到订阅已发布消息主题 (PublishElement_t) 的任何任务的响应队列中。由于从 MQTT 客户端收到的发布数据包包含指向客户端网络缓冲区的指针,因此,在将传入消息的有效负载和主题名称插入响应队列之前,会将其复制到单独的缓冲区中。这样,在 MQTT 客户端清除其网络缓冲区后,订阅的任务仍然可以读取收到的信息。

主任务

主应用程序任务建立持久的 MQTT 会话,创建三个子任务,并运行处理循环,直到收到终止命令。 RunCoreMQTTConnectionSharingDemo因为使用了持久会话,所以,如果网络意外断开连接,演示将在后台重新连接到代理,而不会丢失订阅或来自代理的任何传入的已发布消息。要为每次运行创建一个新的持久会话,演示将连接到设置了 clean session 标记的代理,然后在未设置标记的情况下断开连接并重新连接。当处理循坏终止后,它会断开与代理的连接,然后从重新连接网络的位置重新开始循环。

成功完成演示将生成类似于以下图像的输出。

MQTT 连接共享演示终端在成功完成后的输出
命令循环

命令循环等待命令放入命令队列,然后调用相应的 MQTT API。 prvCommandLoopDISCONNECTTERMINATE 之外的所有命令也会导致调用 MQTT_ProcessLoop。此演示会设置套接字唤醒回调,以便在套接字上有数据可用时向队列中添加 PROCESSLOOP 命令。但是,此时队列中可能有许多命令排在它前面。为了确保在处理其他命令时不会忽略传入的数据,需要在每个命令之后调用 MQTT_ProcessLoop 进行一次迭代。

处理命令

参见 prvProcessCommand函数。

同步发布者任务

同步发布者任务 T prvSyncPublishas k 会同步创建PUBLISH操作,并等待每个操作完成后再安排下一个操作。此演示使用 QoS 1 来发布消息,这意味着在收到发布确认数据包之前,不会认为这些操作已完成。

同步发布者任务

异步发布者 prvAsyncPublish任务 Task 不会等到发布完成后才会将下一个任务放入队列。这表明任务并非总是需要等待 MQTT 操作完成后才能恢复。由于每个发布命令都需要自己的上下文结构,因此,此任务无法像同步发布者任务一样重复使用单个上下文结构,因为之前的命令可能仍然需要该上下文结构。因此,它会为每个上下文结构分配内存,然后等待在所有要发布的消息都放入队列后释放所有分配的内存。

订阅者任务

订阅者任务订阅主题筛选器,该过滤器与同步和异步任务发布的消息的所有主题相匹配。 prvSubscribeTask接着,它会等待收到所有已发布的消息,然后再取消订阅。此任务还负责创建 TERMINATE 操作,从而向主任务发出结束命令循环的信号。