Publish/subscribe local messages
Publish/subscribe (pubsub) messaging enables you to send and receive messages to topics.
Components can publish messages to topics to send messages to other components. Then, components
that are subscribed to that topic can act on the messages that they receive.
You can't use this publish/subscribe IPC service to publish or subscribe to AWS IoT Core
MQTT. For more information about how to exchange messages with AWS IoT Core MQTT, see Publish/subscribe AWS IoT Core MQTT messages.
Minimum SDK versions
The following table lists the minimum versions of the AWS IoT Device SDK that you must use to
publish and subscribe to messages to and from local topics.
Authorization
To use local publish/subscribe messaging in a custom component, you must define
authorization policies that allow your component to send and receive messages to topics. For
information about defining authorization policies, see Authorize components to perform IPC
operations.
Authorization policies for publish/subscribe messaging have the following
properties.
IPC service identifier:
aws.greengrass.ipc.pubsub
Operation |
Description |
Resources |
aws.greengrass#PublishToTopic
|
Allows a component to publish messages to the topics that you specify.
|
A topic string, such as test/topic . Use an * to match
any combination of characters in a topic.
This topic string doesn't support MQTT topic wildcards (# and
+ ).
|
aws.greengrass#SubscribeToTopic
|
Allows a component to subscribe to messages for the topics that you
specify.
|
A topic string, such as test/topic . Use an * to match
any combination of characters in a topic.
In Greengrass nucleus v2.6.0 and
later, you can subscribe to topics that contain MQTT topic wildcards (# and
+ ). This topic string supports MQTT topic wildcards as literal characters. For
example, if a component's authorization policy grants access to test/topic/# , the
component can subscribe to test/topic/# , but it can't subscribe to
test/topic/filter .
|
*
|
Allows a component to publish and subscribe to messages for the topics that you
specify.
|
A topic string, such as test/topic . Use an * to match
any combination of characters in a topic.
In Greengrass nucleus v2.6.0 and
later, you can subscribe to topics that contain MQTT topic wildcards (# and
+ ). This topic string supports MQTT topic wildcards as literal characters. For
example, if a component's authorization policy grants access to test/topic/# , the
component can subscribe to test/topic/# , but it can't subscribe to
test/topic/filter .
|
Authorization policy
examples
You can reference the following authorization policy example to help you configure
authorization policies for your components.
Example authorization policy
The following example authorization policy allows a component to publish and subscribe
to all topics.
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.MyLocalPubSubComponent
:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
PublishToTopic
Publish a message to a topic.
Request
This operation's request has the following parameters:
topic
-
The topic to which to publish the message.
publishMessage
(Python: publish_message
)
-
The message to publish. This object, PublishMessage
, contains the
following information. You must specify one of jsonMessage
and
binaryMessage
.
jsonMessage
(Python: json_message
)
-
(Optional) A JSON message. This object, JsonMessage
, contains the
following information:
message
-
The JSON message as an object.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the
Greengrass nucleus component. The following table lists the minimum versions of the
AWS IoT Device SDK that you must use to access the message context.
The AWS IoT Greengrass Core software uses the same message objects in the PublishToTopic
and SubscribeToTopic
operations. The AWS IoT Greengrass Core software sets this context
object in messages when you subscribe, and ignores this context object in messages that
you publish.
This object, MessageContext
, contains the following information:
topic
-
The topic where the message was published.
binaryMessage
(Python: binary_message
)
-
(Optional) A binary message. This object, BinaryMessage
, contains the
following information:
message
-
The binary message as a blob.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the
Greengrass nucleus component. The following table lists the minimum versions of the
AWS IoT Device SDK that you must use to access the message context.
The AWS IoT Greengrass Core software uses the same message objects in the PublishToTopic
and SubscribeToTopic
operations. The AWS IoT Greengrass Core software sets this context
object in messages when you subscribe, and ignores this context object in messages that
you publish.
This object, MessageContext
, contains the following information:
topic
-
The topic where the message was published.
Response
This operation doesn't provide any information in its response.
Examples
The following examples demonstrate how to call this operation in custom component code.
- Java (IPC client V2)
-
Example: Publish a binary message
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.BinaryMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import java.nio.charset.StandardCharsets;
public class PublishToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message);
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponse publishBinaryMessageToTopic(
GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException {
BinaryMessage binaryMessage =
new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8));
PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage);
PublishToTopicRequest publishToTopicRequest =
new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage);
return ipcClient.publishToTopic(publishToTopicRequest);
}
}
- Python (IPC client V2)
-
Example: Publish a binary message
import sys
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
PublishMessage,
BinaryMessage
)
def main():
args = sys.argv[1:]
topic = args[0]
message = args[1]
try:
ipc_client = GreengrassCoreIPCClientV2()
publish_binary_message_to_topic(ipc_client, topic, message)
print('Successfully published to topic: ' + topic)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def publish_binary_message_to_topic(ipc_client, topic, message):
binary_message = BinaryMessage(message=bytes(message, 'utf-8'))
publish_message = PublishMessage(binary_message=binary_message)
return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message)
if __name__ == '__main__':
main()
- Java (IPC client V1)
-
Example: Publish a binary message
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.PublishToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PublishToTopic {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
PublishToTopicResponseHandler responseHandler =
PublishToTopic.publishBinaryMessageToTopic(ipcClient, topic, message);
CompletableFuture<PublishToTopicResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
throw e;
}
}
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponseHandler publishBinaryMessageToTopic(
GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message) {
PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishToTopicRequest.setPublishMessage(publishMessage);
publishToTopicRequest.setTopic(topic);
return greengrassCoreIPCClient.publishToTopic(publishToTopicRequest, Optional.empty());
}
}
- Python (IPC client V1)
-
Example: Publish a binary message
This example assumes that you are using version 1.5.4 or later of the AWS IoT Device SDK for
Python v2. If you are using version 1.5.3 of the SDK, see Use AWS IoT Device SDK for Python v2 (IPC client V1) for information about connecting to the AWS IoT Greengrass Core IPC service.
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
topic = "my/topic"
message = "Hello, World!"
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
- C++
-
Example: Publish a binary message
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
String message("Hello, World!");
int timeout = 10;
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
SubscribeToTopic
Subscribe to messages on a topic.
This operation is a subscription operation where you
subscribe to a stream of event messages. To use this operation, define a stream response handler
with functions that handle event messages, errors, and stream closure. For more information, see
Subscribe to IPC event streams.
Event message type:
SubscriptionResponseMessage
Request
This operation's request has the following parameters:
topic
-
The topic to which to subscribe.
In Greengrass nucleus v2.6.0 and later, this topic supports MQTT topic wildcards
(#
and +
).
receiveMode
(Python: receive_mode
)
-
(Optional) The behavior that specifies whether the component receives messages
from itself. You can change this behavior to allow a component to act on its own
messages. The default behavior depends on whether the topic contains an MQTT wildcard.
Choose from the following options:
-
RECEIVE_ALL_MESSAGES
– Receive all messages that match the
topic, including messages from the component that subscribes.
This mode is the default option when you subscribe to a topic that doesn't
contain an MQTT wildcard.
-
RECEIVE_MESSAGES_FROM_OTHERS
– Receive all messages that
match the topic, except messages from the component that subscribes.
This mode is the default option when you subscribe to a topic that contains an
MQTT wildcard.
This feature is available for v2.6.0 and later of the
Greengrass nucleus component. The following table lists the minimum versions of
the AWS IoT Device SDK that you must use to set the receive mode.
Response
This operation's response has the following information:
messages
-
The stream of messages. This object, SubscriptionResponseMessage
,
contains the following information. Each message contains jsonMessage
or
binaryMessage
.
jsonMessage
(Python: json_message
)
-
(Optional) A JSON message. This object, JsonMessage
, contains the
following information:
message
-
The JSON message as an object.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the
Greengrass nucleus component. The following table lists the minimum versions of the
AWS IoT Device SDK that you must use to access the message context.
The AWS IoT Greengrass Core software uses the same message objects in the PublishToTopic
and SubscribeToTopic
operations. The AWS IoT Greengrass Core software sets this context
object in messages when you subscribe, and ignores this context object in messages that
you publish.
This object, MessageContext
, contains the following information:
topic
-
The topic where the message was published.
binaryMessage
(Python: binary_message
)
-
(Optional) A binary message. This object, BinaryMessage
, contains the
following information:
message
-
The binary message as a blob.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the
Greengrass nucleus component. The following table lists the minimum versions of the
AWS IoT Device SDK that you must use to access the message context.
The AWS IoT Greengrass Core software uses the same message objects in the PublishToTopic
and SubscribeToTopic
operations. The AWS IoT Greengrass Core software sets this context
object in messages when you subscribe, and ignores this context object in messages that
you publish.
This object, MessageContext
, contains the following information:
topic
-
The topic where the message was published.
topicName
(Python: topic_name
)
-
The topic to which the message was published.
This property isn't currently used. In Greengrass nucleus v2.6.0 and later, you can
get the (jsonMessage|binaryMessage).context.topic
value from a
SubscriptionResponseMessage
to get the topic where the message was
published.
Examples
The following examples demonstrate how to call this operation in custom component code.
- Java (IPC client V2)
-
Example: Subscribe to local publish/subscribe messages
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
Example: Subscribe to local publish/subscribe messages
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- Java (IPC client V1)
-
Example: Subscribe to local publish/subscribe messages
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeToTopic {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler =
new SubscriptionResponseHandler(topic);
SubscribeToTopicResponseHandler responseHandler =
SubscribeToTopic.subscribeToTopic(ipcClient, topic, streamResponseHandler);
CompletableFuture<SubscribeToTopicResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
throw e;
}
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static SubscribeToTopicResponseHandler subscribeToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler) {
SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest();
subscribeToTopicRequest.setTopic(topic);
return greengrassCoreIPCClient.subscribeToTopic(subscribeToTopicRequest,
Optional.of(streamResponseHandler));
}
public static class SubscriptionResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
private final String topic;
public SubscriptionResponseHandler(String topic) {
this.topic = topic;
}
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message =
new String(subscriptionResponseMessage.getBinaryMessage().getMessage(),
StandardCharsets.UTF_8);
System.out.printf("Received new message on topic %s: %s%n", this.topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
- Python (IPC client V1)
-
Example: Subscribe to local publish/subscribe messages
This example assumes that you are using version 1.5.4 or later of the AWS IoT Device SDK for
Python v2. If you are using version 1.5.3 of the SDK, see Use AWS IoT Device SDK for Python v2 (IPC client V1) for information about connecting to the AWS IoT Greengrass Core IPC service.
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message_string = str(event.binary_message.message, "utf-8")
# Handle message.
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
# Handle error.
return True # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
# Handle close.
pass
topic = "my/topic"
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
# Keep the main thread alive, or the process will exit.
while True:
time.sleep(10)
# To stop subscribing, close the operation stream.
operation.close()
- C++
-
Example: Subscribe to local publish/subscribe messages
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
Examples
Use the following examples to learn how to use the publish/subscribe IPC service in your
components.
The following example recipe allows the component to publish to all topics.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherJava",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherJava:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/PubSubPublisher.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherJava
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
'com.example.PubSubPublisherJava:pubsub:1':
policyDescription: Allows access to publish to all topics.
operations:
- 'aws.greengrass#PublishToTopic'
resources:
- '*'
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/PubSubPublisher.jar
The following example Java application demonstrates how to use the publish/subscribe
IPC service to publish messages to other components.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
public static void main(String[] args) {
String message = "Hello from the pub/sub publisher (Java).";
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
while (true) {
PublishToTopicRequest publishRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishRequest.setPublishMessage(publishMessage);
publishRequest.setTopic(topic);
CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient
.publishToTopic(publishRequest, Optional.empty()).getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Execution exception while publishing to topic: " + topic);
}
throw e;
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
System.out.println("Publisher interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
}
The following example recipe allows the component to subscribe to all topics.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberJava",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberJava:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/PubSubSubscriber.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberJava
ComponentVersion: '1.0.0'
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
'com.example.PubSubSubscriberJava:pubsub:1':
policyDescription: Allows access to subscribe to all topics.
operations:
- 'aws.greengrass#SubscribeToTopic'
resources:
- '*'
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/PubSubSubscriber.jar
The following example Java application demonstrates how to use the publish/subscribe
IPC service to subscribe to messages to other components.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubSubscriber {
public static void main(String[] args) {
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest();
subscribeRequest.setTopic(topic);
SubscribeToTopicResponseHandler operationResponseHandler = ipcClient
.subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler()));
CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
System.err.println("Execution exception while subscribing to topic: " + topic);
}
throw e;
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message = new String(subscriptionResponseMessage.getBinaryMessage()
.getMessage(), StandardCharsets.UTF_8);
System.out.println("Received new message: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
The following example recipe allows the component to publish to all topics.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherPython",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherPython:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk",
"run": "python3 -u {artifacts:path}/pubsub_publisher.py"
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "py -3 -m pip install --user awsiotsdk",
"run": "py -3 -u {artifacts:path}/pubsub_publisher.py"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherPython
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubPublisherPython:pubsub:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToTopic
resources:
- "*"
Manifests:
- Platform:
os: linux
Lifecycle:
install: python3 -m pip install --user awsiotsdk
run: python3 -u {artifacts:path}/pubsub_publisher.py
- Platform:
os: windows
Lifecycle:
install: py -3 -m pip install --user awsiotsdk
run: py -3 -u {artifacts:path}/pubsub_publisher.py
The following example Python application demonstrates how to use the publish/subscribe
IPC service to publish messages to other components.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
UnauthorizedError
)
topic = "test/topic/python"
message = "Hello from the pub/sub publisher (Python)."
TIMEOUT = 10
try:
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully published to topic: ' + topic)
except concurrent.futures.TimeoutError:
print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr)
except UnauthorizedError as e:
print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while publishing to topic: ' + topic, file=sys.stderr)
raise e
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
The following example recipe allows the component to subscribe to all topics.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberPython",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberPython:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk",
"run": "python3 -u {artifacts:path}/pubsub_subscriber.py"
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "py -3 -m pip install --user awsiotsdk",
"run": "py -3 -u {artifacts:path}/pubsub_subscriber.py"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberPython
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubSubscriberPython:pubsub:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToTopic
resources:
- "*"
Manifests:
- Platform:
os: linux
Lifecycle:
install: python3 -m pip install --user awsiotsdk
run: python3 -u {artifacts:path}/pubsub_subscriber.py
- Platform:
os: windows
Lifecycle:
install: py -3 -m pip install --user awsiotsdk
run: py -3 -u {artifacts:path}/pubsub_subscriber.py
The following example Python application demonstrates how to use the publish/subscribe
IPC service to subscribe to messages to other components.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage,
UnauthorizedError
)
topic = "test/topic/python"
TIMEOUT = 10
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, "utf-8")
print("Received new message: " + message)
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
print("Received a stream error.", file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
print('Subscribe to topic stream closed.')
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully subscribed to topic: ' + topic)
except concurrent.futures.TimeoutError as e:
print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except UnauthorizedError as e:
print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while subscribing to topic: ' + topic, file=sys.stderr)
raise e
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
The following example recipe allows the component to publish to all topics.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherCpp:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_publisher"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubPublisherCpp:pubsub:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToTopic
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_pubsub_publisher"
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher
Permission:
Execute: OWNER
The following example C++ application demonstrates how to use the publish/subscribe
IPC service to publish messages to other components.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the pub/sub publisher (C++).");
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
The following example recipe allows the component to subscribe to all topics.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberCpp:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pub_sub_subscriber"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubSubscriberCpp:pubsub:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToTopic
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_pub_sub_subscriber"
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber
Permission:
Execute: OWNER
The following example C++ application demonstrates how to use the publish/subscribe
IPC service to subscribe to messages to other components.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
std::cout << "Received new message: " << messageString << std::endl;
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::cout << "Received new message: " << messageString << std::endl;
}
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to topic stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToTopicRequest request;
request.SetTopic(topic);
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}