Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Publier/souscrire des messages locaux
La messagerie Publish/subscribe (pubsub) vous permet d'envoyer et de recevoir des messages relatifs à des sujets. Les composants peuvent publier des messages dans des rubriques pour envoyer des messages à d'autres composants. Les composants abonnés à cette rubrique peuvent ensuite agir sur les messages qu'ils reçoivent.
Note
Vous ne pouvez pas utiliser ce IPC service de publication/d'abonnement pour publier ou vous abonner à. AWS IoT Core MQTT Pour plus d'informations sur la façon d'échanger des messages avec AWS IoT Core MQTT, consultezPublier/souscrire AWS IoT Core MQTT des messages.
SDKVersions minimales
Le tableau suivant répertorie les versions minimales Kit SDK des appareils AWS IoT que vous devez utiliser pour publier et vous abonner à des messages à destination et en provenance de sujets locaux.
SDK | Version minimale |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
Autorisation
Pour utiliser la messagerie locale de publication/d'abonnement dans un composant personnalisé, vous devez définir des politiques d'autorisation qui permettent à votre composant d'envoyer et de recevoir des messages à des sujets. Pour plus d'informations sur la définition des politiques d'autorisation, consultezAutoriser les composants à effectuer des IPC opérations.
Les politiques d'autorisation pour les messages de publiation/d'abonnement présentent les propriétés suivantes.
IPCidentifiant de service : aws.greengrass.ipc.pubsub
Opération | Description | Ressources |
---|---|---|
|
Permet à un composant de publier des messages dans les rubriques que vous spécifiez. |
Une chaîne de rubrique, telle que Cette chaîne de rubrique ne prend pas en charge les caractères génériques de MQTT rubrique ( |
|
Permet à un composant de s'abonner à des messages pour les sujets que vous spécifiez. |
Une chaîne de rubrique, telle que Dans Greengrass nucleus v2.6.0 et versions ultérieures, vous pouvez vous abonner à des MQTT rubriques contenant des caractères génériques (et). |
|
Permet à un composant de publier des messages et de s'y abonner pour les sujets que vous spécifiez. |
Une chaîne de rubrique, telle que Dans Greengrass nucleus v2.6.0 et versions ultérieures, vous pouvez vous abonner à des MQTT rubriques contenant des caractères génériques (et). |
Exemples de politiques d'autorisation
Vous pouvez vous référer à l'exemple de politique d'autorisation suivant pour vous aider à configurer les politiques d'autorisation pour vos composants.
Exemple de politique d'autorisation
L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à toutes les rubriques.
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "
com.example.MyLocalPubSubComponent
:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }
PublishToTopic
Publier un message dans une rubrique
Demande
La demande de cette opération comporte les paramètres suivants :
topic
-
Sujet dans lequel le message doit être publié.
publishMessage
(Python :publish_message
)-
Le message à publier. Cet objet contient
PublishMessage
les informations suivantes. Vous devez spécifier l'une des options suivantes :jsonMessage
etbinaryMessage
.jsonMessage
(Python :json_message
)-
(Facultatif) Un JSON message. Cet objet contient
JsonMessage
les informations suivantes :message
-
Le JSON message en tant qu'objet.
context
-
Le contexte du message, tel que le sujet dans lequel le message a été publié.
Cette fonctionnalité est disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Le tableau suivant répertorie les versions minimales du Kit SDK des appareils AWS IoT que vous devez utiliser pour accéder au contexte du message.
SDK Version minimale v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
Le logiciel AWS IoT Greengrass Core utilise les mêmes objets de message dans les
SubscribeToTopic
opérationsPublishToTopic
et. Le logiciel AWS IoT Greengrass Core définit cet objet de contexte dans les messages lorsque vous vous abonnez et ignore cet objet de contexte dans les messages que vous publiez.Cet objet contient
MessageContext
les informations suivantes :topic
-
Sujet dans lequel le message a été publié.
binaryMessage
(Python :binary_message
)-
(Facultatif) Un message binaire. Cet objet contient
BinaryMessage
les informations suivantes :message
-
Le message binaire sous forme de blob.
context
-
Le contexte du message, tel que le sujet dans lequel le message a été publié.
Cette fonctionnalité est disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Le tableau suivant répertorie les versions minimales du Kit SDK des appareils AWS IoT que vous devez utiliser pour accéder au contexte du message.
SDK Version minimale v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
Le logiciel AWS IoT Greengrass Core utilise les mêmes objets de message dans les
SubscribeToTopic
opérationsPublishToTopic
et. Le logiciel AWS IoT Greengrass Core définit cet objet de contexte dans les messages lorsque vous vous abonnez et ignore cet objet de contexte dans les messages que vous publiez.Cet objet contient
MessageContext
les informations suivantes :topic
-
Sujet dans lequel le message a été publié.
Réponse
Cette opération ne fournit aucune information dans sa réponse.
Exemples
Les exemples suivants montrent comment appeler cette opération dans le code de composant personnalisé.
SubscribeToTopic
Abonnez-vous aux messages sur un sujet.
Il s'agit d'une opération d'abonnement dans le cadre de laquelle vous vous abonnez à un flux de messages d'événements. Pour utiliser cette opération, définissez un gestionnaire de réponse au flux avec des fonctions qui gèrent les messages d'événements, les erreurs et la fermeture du flux. Pour de plus amples informations, veuillez consulter Abonnez-vous aux diffusions IPC d'événements.
Type de message d'événement : SubscriptionResponseMessage
Demande
La demande de cette opération comporte les paramètres suivants :
topic
-
Rubrique à laquelle vous souhaitez vous abonner.
Note
Dans Greengrass nucleus v2.6.0 et versions ultérieures, cette MQTT rubrique prend en charge les caractères génériques (et).
#
+
receiveMode
(Python :receive_mode
)-
(Facultatif) Comportement qui indique si le composant reçoit des messages de lui-même. Vous pouvez modifier ce comportement pour permettre à un composant d'agir sur ses propres messages. Le comportement par défaut dépend de la présence ou non d'un caractère MQTT générique dans le sujet. Sélectionnez parmi les options suivantes :
-
RECEIVE_ALL_MESSAGES
— Recevez tous les messages correspondant au sujet, y compris les messages du composant abonné.Ce mode est l'option par défaut lorsque vous vous abonnez à un sujet qui ne contient pas de MQTT caractère générique.
-
RECEIVE_MESSAGES_FROM_OTHERS
— Recevez tous les messages correspondant au sujet, à l'exception des messages du composant abonné.Ce mode est l'option par défaut lorsque vous vous abonnez à une rubrique contenant un MQTT caractère générique.
Cette fonctionnalité est disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Le tableau suivant répertorie les versions minimales Kit SDK des appareils AWS IoT que vous devez utiliser pour définir le mode de réception.
SDK Version minimale v1.9.3
v1.11.3
v1.18.4
v1.12.0
-
Réponse
La réponse de cette opération contient les informations suivantes :
messages
-
Le flux de messages. Cet objet contient
SubscriptionResponseMessage
les informations suivantes. Chaque message contientjsonMessage
oubinaryMessage
.jsonMessage
(Python :json_message
)-
(Facultatif) Un JSON message. Cet objet contient
JsonMessage
les informations suivantes :message
-
Le JSON message en tant qu'objet.
context
-
Le contexte du message, tel que le sujet dans lequel le message a été publié.
Cette fonctionnalité est disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Le tableau suivant répertorie les versions minimales du Kit SDK des appareils AWS IoT que vous devez utiliser pour accéder au contexte du message.
SDK Version minimale v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
Le logiciel AWS IoT Greengrass Core utilise les mêmes objets de message dans les
SubscribeToTopic
opérationsPublishToTopic
et. Le logiciel AWS IoT Greengrass Core définit cet objet de contexte dans les messages lorsque vous vous abonnez et ignore cet objet de contexte dans les messages que vous publiez.Cet objet contient
MessageContext
les informations suivantes :topic
-
Sujet dans lequel le message a été publié.
binaryMessage
(Python :binary_message
)-
(Facultatif) Un message binaire. Cet objet contient
BinaryMessage
les informations suivantes :message
-
Le message binaire sous forme de blob.
context
-
Le contexte du message, tel que le sujet dans lequel le message a été publié.
Cette fonctionnalité est disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Le tableau suivant répertorie les versions minimales du Kit SDK des appareils AWS IoT que vous devez utiliser pour accéder au contexte du message.
SDK Version minimale v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
Le logiciel AWS IoT Greengrass Core utilise les mêmes objets de message dans les
SubscribeToTopic
opérationsPublishToTopic
et. Le logiciel AWS IoT Greengrass Core définit cet objet de contexte dans les messages lorsque vous vous abonnez et ignore cet objet de contexte dans les messages que vous publiez.Cet objet contient
MessageContext
les informations suivantes :topic
-
Sujet dans lequel le message a été publié.
topicName
(Python :topic_name
)-
Sujet dans lequel le message a été publié.
Note
Cette propriété n'est pas utilisée actuellement. Dans Greengrass nucleus v2.6.0 et versions ultérieures, vous pouvez obtenir la
(jsonMessage|binaryMessage).context.topic
valeur de aSubscriptionResponseMessage
pour obtenir le sujet dans lequel le message a été publié.
Exemples
Les exemples suivants montrent comment appeler cette opération dans le code de composant personnalisé.
Exemples
Utilisez les exemples suivants pour savoir comment utiliser le IPC service de publication/d'abonnement dans vos composants.
L'exemple de recette suivant permet au composant de publier dans toutes les rubriques.
L'exemple d'application Java suivant montre comment utiliser le IPC service de publication/d'abonnement pour publier des messages vers d'autres composants.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }
L'exemple de recette suivant permet au composant de s'abonner à toutes les rubriques.
L'exemple d'application Java suivant montre comment utiliser le IPC service de publication/d'abonnement pour s'abonner à des messages envoyés à d'autres composants.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
L'exemple de recette suivant permet au composant de publier dans toutes les rubriques.
L'exemple d'application Python suivant montre comment utiliser le IPC service de publication/d'abonnement pour publier des messages vers d'autres composants.
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
L'exemple de recette suivant permet au composant de s'abonner à toutes les rubriques.
L'exemple d'application Python suivant montre comment utiliser le IPC service de publication/abonnement pour s'abonner à des messages envoyés à d'autres composants.
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
L'exemple de recette suivant permet au composant de publier dans toutes les rubriques.
L'exemple d'application C++ suivant montre comment utiliser le IPC service de publication/d'abonnement pour publier des messages vers d'autres composants.
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
L'exemple de recette suivant permet au composant de s'abonner à toutes les rubriques.
L'exemple d'application C++ suivant montre comment utiliser le IPC service de publication/d'abonnement pour s'abonner à des messages envoyés à d'autres composants.
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }