Publier/souscrire AWS IoT Core des messages MQTT - AWS IoT Greengrass

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Publier/souscrire AWS IoT Core des messages MQTT

Le service IPC de messagerie AWS IoT Core MQTT vous permet d'envoyer et de recevoir des messages MQTT depuis et vers. AWS IoT Core Les composants peuvent publier des messages AWS IoT Core et s'abonner à des sujets pour agir sur les messages MQTT provenant d'autres sources. Pour plus d'informations sur l' AWS IoT Core implémentation de MQTT, consultez MQTT dans le Guide du AWS IoT Core développeur.

Note

Ce service IPC de messagerie MQTT vous permet d'échanger des messages avec. AWS IoT Core Pour plus d'informations sur la façon d'échanger des messages entre les composants, consultezPublier/souscrire des messages locaux.

Versions minimales du SDK

Le tableau suivant répertorie les versions minimales Kit SDK des appareils AWS IoT que vous devez utiliser pour publier et vous abonner à des messages MQTT en provenance AWS IoT Core et à destination.

Autorisation

Pour utiliser la messagerie AWS IoT Core MQTT dans un composant personnalisé, vous devez définir des politiques d'autorisation qui permettent à votre composant d'envoyer et de recevoir des messages sur des sujets. Pour plus d'informations sur la définition des politiques d'autorisation, consultezAutoriser les composants à effectuer des opérations IPC.

Les politiques d'autorisation pour AWS IoT Core la messagerie MQTT présentent les propriétés suivantes.

Identifiant du service IPC : aws.greengrass.ipc.mqttproxy

Opération Description Ressources

aws.greengrass#PublishToIoTCore

Permet à un composant de publier des messages AWS IoT Core sur les sujets MQTT que vous spécifiez.

Une chaîne de rubrique, telle quetest/topic, ou * pour autoriser l'accès à toutes les rubriques. Vous pouvez utiliser les caractères génériques des rubriques MQTT (#et+) pour associer plusieurs ressources.

aws.greengrass#SubscribeToIoTCore

Permet à un composant de s'abonner à des messages provenant AWS IoT Core des sujets que vous spécifiez.

Une chaîne de rubrique, telle quetest/topic, ou * pour autoriser l'accès à toutes les rubriques. Vous pouvez utiliser les caractères génériques des rubriques MQTT (#et+) pour associer plusieurs ressources.

*

Permet à un composant de publier des messages AWS IoT Core MQTT et de s'y abonner pour les sujets que vous spécifiez.

Une chaîne de rubrique, telle quetest/topic, ou * pour autoriser l'accès à toutes les rubriques. Vous pouvez utiliser les caractères génériques des rubriques MQTT (#et+) pour associer plusieurs ressources.

Caractères génériques MQTT dans les politiques d'autorisation AWS IoT Core MQTT

Vous pouvez utiliser des caractères génériques MQTT dans les politiques d'autorisation AWS IoT Core MQTT IPC. Les composants peuvent publier et s'abonner à des rubriques qui correspondent au filtre de rubrique que vous autorisez dans une politique d'autorisation. Par exemple, si la politique d'autorisation d'un composant accorde l'accès àtest/topic/#, le composant peut s'abonner àtest/topic/#, publier et s'abonner àtest/topic/filter.

Variables de recette dans les AWS IoT Core politiques d'autorisation MQTT

Si vous utilisez la version 2.6.0 ou ultérieure du noyau Greengrass, vous pouvez utiliser la variable de {iot:thingName} recette dans les politiques d'autorisation. Cette fonctionnalité vous permet de configurer une politique d'autorisation unique pour un groupe de périphériques principaux, chaque périphérique principal ne pouvant accéder qu'aux rubriques contenant son propre nom. Par exemple, vous pouvez autoriser un composant à accéder à la ressource thématique suivante.

devices/{iot:thingName}/messages

Pour plus d’informations, consultez Variables de recette et Utiliser des variables de recette dans les mises à jour de fusion.

Exemples de politiques d'autorisation

Vous pouvez consulter les exemples de politiques d'autorisation suivants pour vous aider à configurer les politiques d'autorisation pour vos composants.

Exemple de politique d'autorisation avec accès illimité

L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à toutes les rubriques.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: com.example.MyIoTCorePubSubComponent:mqttproxy:1: policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - "*"
Exemple de politique d'autorisation avec accès limité

L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à deux rubriques nommées factory/1/events etfactory/1/actions.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to factory 1 topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/actions", "factory/1/events" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to factory 1 topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/actions - factory/1/events
Exemple de politique d'autorisation pour un groupe de périphériques principaux
Important

Cet exemple utilise une fonctionnalité disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Greengrass nucleus v2.6.0 ajoute la prise en charge de la plupart des variables de recette, notamment dans les configurations de composants{iot:thingName}.

L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à une rubrique contenant le nom du périphérique principal qui exécute le composant.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/devices/{iot:thingName}/controls" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/devices/{iot:thingName}/controls

PublishToIoTCore

Publie un message MQTT AWS IoT Core sur un sujet.

Lorsque vous publiez des messages MQTT sur AWS IoT Core, il existe un quota de 100 transactions par seconde. Si vous dépassez ce quota, les messages sont mis en file d'attente pour traitement sur l'appareil Greengrass. Il existe également un quota de 512 Ko de données par seconde et un quota de 20 000 publications par seconde à l'échelle du compte (2 000 dans certains Régions AWS cas). Pour plus d'informations sur les limites du courtier de messages MQTT dans AWS IoT Core, consultez les sections Limites et quotas du courtier de AWS IoT Core messages et du protocole.

Si vous dépassez ces quotas, l'appareil Greengrass limite la publication de messages à. AWS IoT Core Les messages sont stockés dans un spouleur en mémoire. Par défaut, la mémoire allouée au spouleur est de 2,5 Mo. Si le spouleur se remplit, les nouveaux messages sont rejetés. Vous pouvez augmenter la taille du spouleur. Pour plus d'informations, consultez la section Configurationdans la documentation Noyau de Greengrass. Pour éviter de remplir le spouleur et d'avoir à augmenter la mémoire allouée, limitez les demandes de publication à un maximum de 100 demandes par seconde.

Lorsque votre application doit envoyer des messages à un débit plus élevé ou des messages plus volumineux, pensez à utiliser le Gestionnaire de flux pour envoyer des messages à Kinesis Data Streams. Le composant du gestionnaire de flux est conçu pour transférer de gros volumes de données vers le AWS Cloud. Pour plus d’informations, consultez Gérez les flux de données sur les appareils principaux de Greengrass.

Demande

La demande de cette opération comporte les paramètres suivants :

topicName(Python :topic_name)

Sujet dans lequel le message doit être publié.

qos

Les QoS MQTT à utiliser. Cette énumération possède QOS les valeurs suivantes :

  • AT_MOST_ONCE— QoS 0. Le message MQTT est délivré au plus une fois.

  • AT_LEAST_ONCE— QoS 1. Le message MQTT est délivré au moins une fois.

payload

(Facultatif) La charge utile du message sous forme de blob.

Les fonctionnalités suivantes sont disponibles pour la version 2.10.0 et les versions ultérieures Noyau de Greengrass lors de l'utilisation de MQTT 5. Ces fonctionnalités sont ignorées lorsque vous utilisez MQTT 3.1.1. Le tableau suivant répertorie la version minimale du SDK de l' AWS IoT appareil que vous devez utiliser pour accéder à ces fonctionnalités.

payloadFormat

(Facultatif) Format de la charge utile du message. Si vous ne définissez pas lepayloadFormat, le type est supposé êtreBYTES. L'enum possède les valeurs suivantes :

  • BYTES— Le contenu de la charge utile est un blob binaire.

  • UTF8— Le contenu de la charge utile est une chaîne de caractères UTF8.

retain

(Facultatif) Indique s'il faut définir l'option de conservation MQTT sur true lors de la publication.

userProperties

(Facultatif) Liste des UserProperty objets spécifiques à l'application à envoyer. L'UserPropertyobjet est défini comme suit :

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Facultatif) Nombre de secondes avant que le message n'expire et ne soit supprimé par le serveur. Si cette valeur n'est pas définie, le message n'expire pas.

correlationData

(Facultatif) Informations ajoutées à la demande qui peuvent être utilisées pour associer une demande à une réponse.

responseTopic

(Facultatif) Rubrique à utiliser pour le message de réponse.

contentType

(Facultatif) Identifiant spécifique à l'application du type de contenu du message.

Réponse

Cette opération ne fournit aucune information dans sa réponse.

Exemples

Les exemples suivants montrent comment appeler cette opération dans le code de composant personnalisé.

Java (IPC client V2)
Exemple : publier un message
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.QOS; import java.nio.charset.StandardCharsets; public class PublishToIoTCore { public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest() .withTopicName(topic) .withPayload(message.getBytes(StandardCharsets.UTF_8)) .withQos(qos)); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
Exemple : publier un message
Note

Cet exemple suppose que vous utilisez la version 1.5.4 ou ultérieure de Kit SDK des appareils AWS IoT for Python v2.

import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' payload = 'Hello, World' ipc_client = clientV2.GreengrassCoreIPCClientV2() resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload) ipc_client.close()
Java (IPC client V1)
Exemple : publier un message
Note

Cet exemple utilise une IPCUtils classe pour créer une connexion au service AWS IoT Greengrass Core IPC. Pour plus d’informations, consultez Connectez-vous au service AWS IoT Greengrass Core IPC.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PublishToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); PublishToIoTCoreResponseHandler responseHandler = PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos); CompletableFuture<PublishToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) { PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()); } }
Python (IPC client V1)
Exemple : publier un message
Note

Cet exemple suppose que vous utilisez la version 1.5.4 ou ultérieure de Kit SDK des appareils AWS IoT for Python v2.

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT)
C++
Exemple : publier un message
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
Exemple : publier un message
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.publishToIoTCore().then(r => console.log("Started workflow")); } private async publishToIoTCore() { try { const request: PublishToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); await this.ipcClient.publishToIoTCore(request); } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToIoTCore = new PublishToIoTCore();

SubscribeToIoTCore

Abonnez-vous aux messages MQTT à partir d'un AWS IoT Core sujet ou d'un filtre de sujet. Le logiciel AWS IoT Greengrass Core supprime les abonnements lorsque le composant atteint la fin de son cycle de vie.

Il s'agit d'une opération d'abonnement dans le cadre de laquelle vous vous abonnez à un flux de messages d'événements. Pour utiliser cette opération, définissez un gestionnaire de réponse au flux avec des fonctions qui gèrent les messages d'événements, les erreurs et la fermeture du flux. Pour plus d’informations, consultez Abonnez-vous aux diffusions d'événements IPC.

Type de message d'événement : IoTCoreMessage

Demande

La demande de cette opération comporte les paramètres suivants :

topicName(Python :topic_name)

Rubrique à laquelle vous souhaitez vous abonner. Vous pouvez utiliser les caractères génériques des rubriques MQTT (#et+) pour vous abonner à plusieurs rubriques.

qos

Les QoS MQTT à utiliser. Cette énumération possède QOS les valeurs suivantes :

  • AT_MOST_ONCE— QoS 0. Le message MQTT est délivré au plus une fois.

  • AT_LEAST_ONCE— QoS 1. Le message MQTT est délivré au moins une fois.

Réponse

La réponse de cette opération contient les informations suivantes :

messages

Le flux de messages MQTT. Cet objet contient IoTCoreMessage les informations suivantes :

message

Le message MQTT. Cet objet contient MQTTMessage les informations suivantes :

topicName(Python :topic_name)

Sujet dans lequel le message a été publié.

payload

(Facultatif) La charge utile du message sous forme de blob.

Les fonctionnalités suivantes sont disponibles pour la version 2.10.0 et les versions ultérieures Noyau de Greengrass lors de l'utilisation de MQTT 5. Ces fonctionnalités sont ignorées lorsque vous utilisez MQTT 3.1.1. Le tableau suivant répertorie la version minimale du SDK de l' AWS IoT appareil que vous devez utiliser pour accéder à ces fonctionnalités.

payloadFormat

(Facultatif) Format de la charge utile du message. Si vous ne définissez pas lepayloadFormat, le type est supposé êtreBYTES. L'enum possède les valeurs suivantes :

  • BYTES— Le contenu de la charge utile est un blob binaire.

  • UTF8— Le contenu de la charge utile est une chaîne de caractères UTF8.

retain

(Facultatif) Indique s'il faut définir l'option de conservation MQTT sur true lors de la publication.

userProperties

(Facultatif) Liste des UserProperty objets spécifiques à l'application à envoyer. L'UserPropertyobjet est défini comme suit :

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Facultatif) Nombre de secondes avant que le message n'expire et ne soit supprimé par le serveur. Si cette valeur n'est pas définie, le message n'expire pas.

correlationData

(Facultatif) Informations ajoutées à la demande qui peuvent être utilisées pour associer une demande à une réponse.

responseTopic

(Facultatif) Rubrique à utiliser pour le message de réponse.

contentType

(Facultatif) Identifiant spécifique à l'application du type de contenu du message.

Exemples

Les exemples suivants montrent comment appeler cette opération dans le code de composant personnalisé.

Java (IPC client V2)
Exemple : s'abonner à des messages
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; public class SubscribeToIoTCore { public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage -> System.out.printf("Received new message on topic %s: %s%n", ioTCoreMessage.getMessage().getTopicName(), new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8)); Optional<Function<Throwable, Boolean>> onStreamError = Optional.of(e -> { System.err.println("Received a stream error."); e.printStackTrace(); return false; }); Optional<Runnable> onStreamClosed = Optional.of(() -> System.out.println("Subscribe to IoT Core stream closed.")); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest() .withTopicName(topic) .withQos(qos); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler> streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed); streamingResponse.getResponse(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. while (true) { Thread.sleep(10000); } // To stop subscribing, close the stream. streamingResponse.getHandler().closeStream(); } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
Exemple : s'abonner à des messages
Note

Cet exemple suppose que vous utilisez la version 1.5.4 ou ultérieure de Kit SDK des appareils AWS IoT for Python v2.

import threading import traceback import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' def on_stream_event(event): try: topic_name = event.message.topic_name message = str(event.message.payload, 'utf-8') print(f'Received new message on topic {topic_name}: {message}') except: traceback.print_exc() def on_stream_error(error): # Return True to close stream, False to keep stream open. return True def on_stream_closed(): pass ipc_client = clientV2.GreengrassCoreIPCClientV2() resp, operation = ipc_client.subscribe_to_iot_core( topic_name=topic, qos=qos, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed ) # Keep the main thread alive, or the process will exit. event = threading.Event() event.wait() # To stop subscribing, close the operation stream. operation.close() ipc_client.close()
Java (IPC client V1)
Exemple : s'abonner à des messages
Note

Cet exemple utilise une IPCUtils classe pour créer une connexion au service AWS IoT Greengrass Core IPC. Pour plus d’informations, consultez Connectez-vous au service AWS IoT Greengrass Core IPC.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new SubscriptionResponseHandler(); SubscribeToIoTCoreResponseHandler responseHandler = SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos, streamResponseHandler); CompletableFuture<SubscribeToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) { SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String topic = ioTCoreMessage.getMessage().getTopicName(); String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; } @Override public void onStreamClosed() { System.out.println("Subscribe to IoT Core stream closed."); } } }
Python (IPC client V1)
Exemple : s'abonner à des messages
Note

Cet exemple suppose que vous utilisez la version 1.5.4 ou ultérieure de Kit SDK des appareils AWS IoT for Python v2.

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
Exemple : s'abonner à des messages
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string topicName = message.value().GetTopicName().value().c_str(); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Exemple : s'abonner à des messages
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToIoTCore().then(r => console.log("Started workflow")); } private async subscribeToIoTCore() { try { const request: SubscribeToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); const streamingOperation = this.ipcClient.subscribeToIoTCore(request); streamingOperation.on('message', (message: IoTCoreMessage) => { // parse the message depending on your use cases, e.g. if (message.message && message.message.payload) { const receivedMessage = message.message.payload.toString(); } }); streamingOperation.on('streamError', (error : RpcError) => { // define your own error handling logic }); streamingOperation.on('ended', () => { // define your own logic }); await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToIoTCore = new SubscribeToIoTCore();

Exemples

Utilisez les exemples suivants pour apprendre à utiliser le service AWS IoT Core MQTT IPC dans vos composants.

L'exemple de recette suivant permet au composant de publier dans toutes les rubriques.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

L'exemple d'application C++ suivant montre comment utiliser le service AWS IoT Core MQTT IPC pour publier des messages sur. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

L'exemple de recette suivant permet au composant de s'abonner à toutes les rubriques.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

L'exemple d'application C++ suivant montre comment utiliser le service AWS IoT Core MQTT IPC pour s'abonner à des messages provenant de. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }