Demo zur gemeinsamen Nutzung von CoreMQTT-Verbindungen - FreeRTOS

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Demo zur gemeinsamen Nutzung von CoreMQTT-Verbindungen

Wichtig

Dies ist eine archivierte Version des FreeRTOS-Benutzerhandbuchs zur Verwendung mit der FreeRTOS-Version 202012.00. Die neueste Version dieses Dokuments finden Sie im FreeRTOS-Benutzerhandbuch.

Einführung

Das Demo-Projekt CoreMQTT Connection Sharing zeigt Ihnen, wie Sie mit einer Multithread-Anwendung eine Verbindung zum AWS MQTT-Broker mithilfe von TLS mit gegenseitiger Authentifizierung zwischen dem Client und dem Server herstellen können. Diese Demo verwendet eine MbedTLS-basierte Transportschnittstellenimplementierung, um eine server- und clientauthentifizierte TLS-Verbindung herzustellen, und demonstriert den Subscribe-Publish-Workflow von MQTT auf QoS 1-Ebene. Die Demo abonniert einen Themenfilter, veröffentlicht Themen, die dem Filter entsprechen, und wartet dann auf den Empfang dieser Nachrichten vom Server auf QoS 1-Ebene. Dieser Zyklus, bei dem es um die Veröffentlichung auf dem Broker und den Empfang derselben Nachricht vom Broker geht, wird auf unbestimmte Zeit wiederholt. Nachrichten in dieser Demo werden mit QoS 1 gesendet, was mindestens eine Lieferung gemäß der MQTT-Spezifikation garantiert.

Anmerkung

Folgen Sie den Schritten unter, um die FreeRTOS-Demos einzurichten und auszuführen. Erste Schritte mit FreeRTOS

Diese Demo verwendet eine Thread-sichere Warteschlange, in der Befehle für die Interaktion mit der MQTT-API gespeichert werden. In dieser Demo gibt es vier Aufgaben, die Sie beachten sollten.

  • Eine Befehlsaufgabe (Hauptaufgabe) nimmt Befehle aus der Befehlswarteschlange entgegen und verarbeitet sie. Die anderen Aufgaben platzieren Befehle zur Verarbeitung in diese Warteschlange. Diese Aufgabe tritt in eine Schleife ein, in der sie die Befehle verarbeitet. Wenn ein Terminierungsbefehl eingeht, wird diese Aufgabe aus der Schleife ausbrechen.

  • Eine synchrone Publisher-Task erstellt eine Reihe von Veröffentlichungsvorgängen und verschiebt sie in die Befehlswarteschlange. Diese Operationen werden dann von der Befehlstask ausgeführt. Diese Aufgabe verwendet synchrones Publizieren, was bedeutet, dass diese Aufgabe wartet, bis jeder Veröffentlichungsvorgang abgeschlossen ist, bevor der nächste geplant wird.

  • Eine asynchrone Publisher-Task erstellt eine Reihe von Veröffentlichungsvorgängen und verschiebt sie in die Befehlswarteschlange. Diese Operationen werden dann von der Befehlstask ausgeführt. Der Unterschied zwischen dieser Aufgabe und der vorherigen besteht darin, dass nicht auf den Abschluss eines Veröffentlichungsvorgangs gewartet wird, bevor der nächste geplant wird. Sie überprüft den Status jedes Veröffentlichungsvorgangs, nachdem alle Veröffentlichungsvorgänge der Warteschlange hinzugefügt wurden. Beachten Sie, dass der Unterschied zwischen synchronem und asynchronem Publizieren nur im Verhalten dieser Aufgaben besteht. Bei den eigentlichen Veröffentlichungsbefehlen gibt es keine Unterschiede.

  • Eine Abonnentenaufgabe erstellt ein MQTT-Abonnement für einen Themenfilter, der den Themen aller Nachrichten entspricht, die die beiden Publisher-Aufgaben veröffentlichen. Diese Aufgabe geht in eine Schleife über und wartet darauf, die Nachrichten zurückzuerhalten, die von den anderen Aufgaben veröffentlicht wurden.

Aufgaben können Warteschlangen für empfangene Nachrichten haben. Die Befehlsaufgabe verschiebt eingehende Nachrichten in die Warteschlange aller Aufgaben, die das eingehende Thema abonniert haben.

In dieser Demo wird eine TLS-Verbindung mit gegenseitiger Authentifizierung für die Verbindung AWS verwendet. Wenn das Netzwerk während der Demo unerwartet unterbrochen wird, versucht der Client, mithilfe der exponentiellen Backoff-Logik erneut eine Verbindung herzustellen. Wenn der Client die Verbindung erfolgreich wiederherstellt, der Broker die vorherige Sitzung jedoch nicht fortsetzen kann, abonniert der Client dieselben Themen wie in der vorherigen Sitzung erneut.

Einzelthread oder Multithread

Es gibt zwei CoreMQTT-Nutzungsmodelle: Singlethread und Multithread (Multitasking). Diese Demo zeigt Ihnen, wie Sie Ihr eigenes Multithreading-Schema erstellen können. Es gibt auch ein anderes Multithread-Beispiel, bei dem das MQTT-Protokoll im Hintergrund innerhalb einer Agenten- (oder Daemon-) Aufgabe ausgeführt wird. Weitere Informationen finden Sie unter MWTT-Agent und Demos mit CoreMQTT. Wenn Sie das MQTT-Protokoll in einer Agententask ausführen, ist es nicht erforderlich, einen MQTT-Status explizit zu verwalten oder die Funktion aufzurufen. MQTT_ProcessLoop Wenn Sie eine Agententask verwenden, können sich außerdem mehrere Anwendungsaufgaben eine einzige MQTT-Verbindung teilen, ohne dass Synchronisationsprimitive wie Mutexe erforderlich sind.

Quellcode

Die Demo-Quelldatei ist benannt mqtt_demo_connection_sharing.c und befindet sich im freertos/demos/coreMQTT/ Verzeichnis und auf der Website. GitHub

Funktionalität

Diese Demo erstellt insgesamt vier Aufgaben: drei, die MQTT-API-Aufrufe anfordern, und eine, die diese Anfragen verarbeitet, was die Hauptaufgabe ist. In dieser Demo geht die Hauptaufgabe in eine Schleife über, die die drei Unteraufgaben erstellt, die Verarbeitungsschleife aufruft und anschließend aufräumt. Die primäre Aufgabe stellt eine einzelne MQTT-Verbindung zum Broker her, die von den Unteraufgaben gemeinsam genutzt wird. Zwei der Unteraufgaben veröffentlichen Nachrichten an den Broker, und die dritte empfängt die Nachrichten zurück. Dabei wird ein MQTT-Abonnement für einen Themenfilter verwendet, der allen Themen der veröffentlichten Nachrichten entspricht.

Typdefinitionen

Die Demo definiert die folgenden Strukturen, Aufzählungen und Funktionszeiger.

Befehle

Anstatt die MQTT-API-Aufrufe direkt durchzuführen, verwenden die Aufgaben Command_t Strukturen, um Befehle zu erstellen, die die Hauptaufgabe anweisen, die entsprechende API-Operation für sie aufzurufen. Befehle haben die folgenden Typen:

  • PROCESSLOOP

  • PUBLISH

  • SUBSCRIBE

  • UNSUBSCRIBE

  • PING

  • DISCONNECT

  • RECONNECT

  • TERMINATE

Der TERMINATE Befehl hat keine entsprechende MQTT-API-Operation. Er wird in der Demo verwendet, um die Hauptaufgabe anzuweisen, die Verarbeitung von Befehlen zu beenden und mit den Bereinigungsvorgängen zu beginnen. Da für einige MQTT-Befehle wie, und MQTT_Unsubscribe zusätzliche Informationen erforderlich sind, z. B. Informationen zum Veröffentlichen oder Abonnieren MQTT_PublishMQTT_Subscribe, verwenden wir das Feld. CommandContext_t Dieses Feld ist für diese drei Befehle erforderlich und für die anderen optional.

Da dieser Kontext für diese Befehle erforderlich ist, ändern Sie diesen Wert nicht, sobald der Befehl in die Warteschlange gestellt wurde, bis der Befehl abgeschlossen ist. Wenn ein Befehl abgeschlossen ist, kann ein optionaler Callback aufgerufen werden. In dieser Demo verwenden wir einen Callback, der eine Aufgabenbenachrichtigung erstellt, um die aufrufende Aufgabe darüber zu informieren, dass der Befehl abgeschlossen wurde. Für MQTT-Operationen, die Bestätigungen erfordern (Abonnierungen, Abmeldungen und Veröffentlichungen mit QoS größer als 0), gilt der Befehl als abgeschlossen, sobald die Bestätigung empfangen wurde. Andernfalls ist der Befehl abgeschlossen, sobald der entsprechende MQTT-API-Aufruf zurückgekehrt ist.

Die folgenden Definitionen finden Sie in der mqtt_demo_connection_sharing.c Datei:

Mitwirkende

Da einige MQTT-Operationen eine Bestätigung erfordern, verwenden sie ein Array von AckInfo_t, das die Paket-ID der erwarteten Bestätigung und den ursprünglichen Befehl, der sie erwartet, enthält, sodass ihr Abschluss-Callback aufgerufen werden kann.

Subscriptions (Abonnements)

In dieser Demo können Abonnements für jede Aufgabe nachverfolgt werden. Zu diesem Zweck muss jede Aufgabe, die ein Abonnement anfordert, eine Nachrichtenwarteschlange bereitstellen, in der sie die veröffentlichten Nachrichten zurückempfängt (SubscriptionElement_t). Für mehrere Aufgaben kann derselbe Themenfilter abonniert werden, da davon ausgegangen wird, dass sie separate Antwortwarteschlangen verwenden.

Veröffentlichte Nachrichten erhalten

Da Aufgaben parallel zur Hauptaufgabe ausgeführt werden, wäre es für die Hauptaufgabe schwierig und zeitaufwändig, darauf warten zu müssen, dass jede abonnierte Aufgabe eine empfangene veröffentlichte Nachricht liest. Daher wird jede empfangene Nachricht in die Antwortwarteschlange aller Aufgaben kopiert, die das Thema der veröffentlichten Nachricht abonniert haben (PublishElement_t). Da vom MQTT-Client empfangene Veröffentlichungspakete Zeiger auf den Netzwerkpuffer des Clients enthalten, werden die Nutzlast und der Themenname der eingehenden Nachricht in separate Puffer kopiert, bevor sie in eine Antwortwarteschlange eingefügt werden. Auf diese Weise kann die abonnierte Aufgabe die empfangenen Informationen immer noch lesen, nachdem der MQTT-Client seinen Netzwerkpuffer geleert hat.

Hauptaufgabe

Die Hauptaufgabe der Anwendung RunCoreMQTTConnectionSharingDemo, richtet eine persistente MQTT-Sitzung ein, erstellt drei Unteraufgaben und führt die Verarbeitungsschleife aus, bis ein Abschlussbefehl empfangen wird. Es wird eine persistente Sitzung verwendet. Wenn also die Netzwerkverbindung unerwartet unterbrochen wird, stellt die Demo im Hintergrund wieder eine Verbindung zum Broker her, ohne dass Abonnements oder eingehende veröffentlichte Nachrichten vom Broker verloren gehen. Um für jeden Lauf eine neue persistente Sitzung zu erstellen, stellt die Demo mit dem Broker eine Verbindung mit dem Broker her, trennt dann die Verbindung und stellt die Verbindung wieder her, wenn das clean session Flag nicht gesetzt ist. Nach Abschluss der Verarbeitungsschleife wird die Verbindung zum Broker getrennt und an dem Punkt, an dem die Netzwerkverbindung wiederhergestellt wurde, wieder aufgenommen.

Nach erfolgreichem Abschluss der Demo wird eine Ausgabe generiert, die der folgenden Abbildung ähnelt.

Demo-Terminalausgabe zur gemeinsamen Nutzung der MQTT-Verbindung bei erfolgreichem Abschluss
Befehlsschleife

Die Befehlsschleife, prvCommandLoop, wartet darauf, dass Befehle in die Befehlswarteschlange gestellt werden, und ruft dann die entsprechende MQTT-API auf. Alle Befehle außer DISCONNECT und TERMINATE führen dazu, MQTT_ProcessLoop dass sie ebenfalls aufgerufen werden. In dieser Demo wird ein Socket-Wakeup-Callback eingerichtet, um einen PROCESSLOOP Befehl zur Warteschlange hinzuzufügen, wenn Daten auf dem Socket verfügbar sind. Zu diesem Zeitpunkt befinden sich jedoch möglicherweise viele Befehle in der Warteschlange vor ihm. Um sicherzustellen, dass wir eingehende Daten nicht vernachlässigen, während andere Befehle verarbeitet werden, MQTT_ProcessLoop wird nach jedem Befehl eine einzige Iteration aufgerufen.

Befehle werden verarbeitet

Sehen Sie sich die prvProcessCommandFunktion an.

Synchrone Publisher-Aufgabe

Die synchrone prvSyncPublishPublisher-Task, Task, erstellt PUBLISH Operationen synchron und wartet, bis jeder Vorgang abgeschlossen ist, bevor der nächste geplant wird. Diese Demo verwendet QoS 1, um Nachrichten zu veröffentlichen, was bedeutet, dass diese Operationen erst als abgeschlossen gelten, wenn das Bestätigungs-Paket für die Veröffentlichung empfangen wurde.

Asynchrone Publisher-Aufgabe

Die asynchrone prvAsyncPublishPublisher-Aufgabe, Task, wartet nicht, bis eine Veröffentlichung abgeschlossen ist, bevor sie die nächste in die Warteschlange stellt. Dies zeigt, dass es nicht immer notwendig ist, dass eine Aufgabe auf den Abschluss eines MQTT-Vorgangs warten muss, bevor er wieder aufgenommen werden kann. Da für jeden Veröffentlichungsbefehl eine eigene Kontextstruktur erforderlich ist, kann diese Aufgabe nicht wie bei der synchronen Publisher-Aufgabe eine einzelne Kontextstruktur wiederverwenden, da sie für einen vorherigen Befehl möglicherweise noch benötigt wird. Daher weist er jeder Kontextstruktur Speicher zu und wartet dann, bis der gesamte zugewiesene Speicher freigegeben wird, nachdem alle zu veröffentlichenden Nachrichten in die Warteschlange gestellt wurden.

Aufgabe für Abonnenten

Die Abonnentenaufgabe, prvSubscribeTask, abonniert einen Themenfilter, der allen Themen der Nachrichten entspricht, die im Rahmen der synchronen und asynchronen Aufgaben veröffentlicht wurden. Anschließend wartet sie auf den Empfang all dieser veröffentlichten Nachrichten, bevor sie sich abmeldet. Diese Aufgabe ist auch für die Erstellung der TERMINATE Operation verantwortlich, die der Hauptaufgabe signalisiert, die Befehlsschleife zu beenden.