Use the AWS IoT Device SDK to communicate with the Greengrass nucleus, other components, and AWS IoT Core
Components running on your core device can use the AWS IoT Greengrass Core interprocess communication (IPC) library in the AWS IoT Device SDK to communicate with the AWS IoT Greengrass nucleus and other Greengrass components. To develop and run custom components that use IPC, you must use the AWS IoT Device SDK to connect to the AWS IoT Greengrass Core IPC service and perform IPC operations.
The IPC interface supports two types of operations:
-
Request/response
Components send a request to the IPC service and receive a response that contains the result of the request.
-
Subscription
Components send a subscription request to the IPC service and expect a stream of event messages in response. Components provide a subscription handler that handles event messages, errors, and stream closure. The AWS IoT Device SDK includes a handler interface with the correct response and event types for each IPC operation. For more information, see Subscribe to IPC event streams.
Topics
- Supported SDKs for interprocess communication
- Connect to the AWS IoT Greengrass Core IPC service
- Authorize components to perform IPC operations
- Subscribe to IPC event streams
- Publish/subscribe local messages
- Publish/subscribe AWS IoT Core MQTT messages
- Interact with component lifecycle
- Interact with component configuration
- Retrieve secret values
- Interact with local shadows
- Manage local deployments and components
- Authenticate and authorize client devices
Supported SDKs for interprocess communication
The AWS IoT Greengrass Core IPC libraries are included in the following AWS IoT Device SDK versions.
-
AWS IoT Device SDK for Java v2
(v1.2.10 or later) For more information about using the AWS IoT Device SDK for Java v2 to connect to the AWS IoT Greengrass Core IPC service, see Use AWS IoT Device SDK for Java v2.
-
AWS IoT Device SDK for Python v2
(v1.5.3 or later) For more information about using the AWS IoT Device SDK for Python v2 to connect to the AWS IoT Greengrass Core IPC service, see Use AWS IoT Device SDK for Python v2.
-
AWS IoT Device SDK for C++ v2
(Linux: v1.13.0 or later; Windows: v1.14.6 or later) For more information about using the AWS IoT Device SDK for C++ v2 to connect to the AWS IoT Greengrass Core IPC service, see Use AWS IoT Device SDK for C++ v2.
Connect to the AWS IoT Greengrass Core IPC service
To use interprocess communication in your custom component, you must create a connection to an IPC server socket that the AWS IoT Greengrass Core software runs. Complete the following tasks to download and use the AWS IoT Device SDK in the language of your choice.
To use the AWS IoT Device SDK for Java v2
-
Download the AWS IoT Device SDK for Java v2
(v1.2.10 or later). -
Do one of the following to run your custom code in your component:
-
Build your component as a JAR file that includes the AWS IoT Device SDK, and run this JAR file in your component recipe.
-
Define the AWS IoT Device SDK JAR as a component artifact, and add that artifact to the classpath when you run your application in your component recipe.
-
-
Create a connection to the AWS IoT Greengrass Core IPC service. The IPC client,
GreengrassCoreIPCClient
, requires anEventStreamRPCConnection
. Download the followingIPCUtils
class that provides this connection for you./* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.greengrass.util; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnectionConfig; import software.amazon.awssdk.eventstreamrpc.GreengrassConnectMessageSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public final class IPCUtils { // Port number is not used in domain sockets. // It is ignored but the field needs to be set when creating socket connection. public static final int DEFAULT_PORT_NUMBER = 8033; private static EventStreamRPCConnection clientConnection = null; private IPCUtils() { } public static EventStreamRPCConnection getEventStreamRpcConnection() throws ExecutionException, InterruptedException { String ipcServerSocketPath = System.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT"); String authToken = System.getenv("SVCUID"); SocketOptions socketOptions = IPCUtils.getSocketOptionsForIPC(); if (clientConnection == null) { clientConnection = connectToGGCOverEventStreamIPC(socketOptions, authToken, ipcServerSocketPath); } return clientConnection; } private static EventStreamRPCConnection connectToGGCOverEventStreamIPC(SocketOptions socketOptions, String authToken, String ipcServerSocketPath) throws ExecutionException, InterruptedException { try (EventLoopGroup elGroup = new EventLoopGroup(1); ClientBootstrap clientBootstrap = new ClientBootstrap(elGroup, null)) { final EventStreamRPCConnectionConfig config = new EventStreamRPCConnectionConfig(clientBootstrap, elGroup, socketOptions, null, ipcServerSocketPath, DEFAULT_PORT_NUMBER, GreengrassConnectMessageSupplier.connectMessageSupplier(authToken)); final CompletableFuture<Void> connected = new CompletableFuture<>(); final EventStreamRPCConnection connection = new EventStreamRPCConnection(config); final boolean[] disconnected = {false}; final int[] disconnectedCode = {-1}; connection.connect(new EventStreamRPCConnection.LifecycleHandler() { // Only called on successful connection. @Override public void onConnect() { connected.complete(null); } @Override public void onDisconnect(int errorCode) { disconnected[0] = true; disconnectedCode[0] = errorCode; clientConnection = null; } // This on error is for any error that is connection level, including problems during connect() @Override public boolean onError(Throwable t) { connected.completeExceptionally(t); clientConnection = null; return true; // True instructs handler to disconnect due to this error. } }); connected.get(); return connection; } } private static SocketOptions getSocketOptionsForIPC() { SocketOptions socketOptions = new SocketOptions(); socketOptions.connectTimeoutMs = 3000; socketOptions.domain = SocketOptions.SocketDomain.LOCAL; socketOptions.type = SocketOptions.SocketType.STREAM; return socketOptions; } }
-
Use the following code to create the IPC client.
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
To use the AWS IoT Device SDK for Python v2
-
Download the AWS IoT Device SDK for Python
(v1.5.3 or later). -
Add the SDK's installation steps
to the install lifecycle in your component's recipe. -
Create a connection to the AWS IoT Greengrass Core IPC service. Complete the following steps to create the IPC client and establish a connection.
AWS IoT Greengrass doesn't currently support this feature on Windows core devices.
To build the AWS IoT Device SDK v2 for C++, a device must have the following tools:
-
C++ 11 or later
-
CMake 3.1 or later
-
One of the following compilers:
-
GCC 4.8 or later
-
Clang 3.9 or later
-
MSVC 2015 or later
-
To use the AWS IoT Device SDK for C++ v2
-
Download the AWS IoT Device SDK for C++ v2
(Linux: v1.13.0 or later; Windows: v1.14.6 or later). -
Follow the installation instructions in the README
to build the AWS IoT Device SDK for C++ v2 from source. -
In your C++ build tool, link the Greengrass IPC library,
AWS::GreengrassIpc-cpp
, that you built in the previous step. The followingCMakeLists.txt
example links the Greengrass IPC library to a project that you build with CMake.cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
In your component code, create a connection to the AWS IoT Greengrass Core IPC service to create an IPC client (
Aws::Greengrass::GreengrassCoreIpcClient
). You must define an IPC connection lifecycle handler that handles IPC connection, disconnection, and error events. The following example creates an IPC client and an IPC connection lifecycle handler that prints when the IPC client connects, disconnects, and encounters errors.#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
-
To run your custom code in your component, build your code as a binary artifact, and run the binary artifact in your component recipe. Set the artifact's
Execute
permission toOWNER
to enable the AWS IoT Greengrass Core software to run the binary artifact.Your component recipe's
Manifests
section might look similar to the following example.
Authorize components to perform IPC operations
To allow your custom components to use some IPC operations, you must define
authorization policies that allow the component to perform the
operation on certain resources. Each authorization policy defines a list of operations and a
list of resources that the policy allows. For example, the publish/subscribe messaging IPC
service defines publish and subscribe operations for topic resources. You can use the
*
wildcard to allow access to all operations or all resources.
You define authorization policies with the accessControl
configuration
parameter, which you can set in the component recipe or when you deploy the component. The
accessControl
object maps IPC service identifiers to lists of authorization
policies. You can define multiple authorization policies for each IPC service to control
access. Each authorization policy has a policy ID, which must be unique among all
components.
To create unique policy IDs, you can combine the component name, IPC service name, and a
counter. For example, a component named com.example.HelloWorld
might define two
publish/subscribe authorization policies with the following IDs:
-
com.example.HelloWorld:pubsub:1
-
com.example.HelloWorld:pubsub:2
Authorization policies use the following format. This object is the
accessControl
configuration parameter.
Wildcards in authorization policies
You can use the *
wildcard in the resources
element of IPC
authorization policies to allow access to multiple resources in a single authorization
policy.
-
In all versions of the Greengrass nucleus, you can specify a single
*
character as a resource to allow access to all resources. -
In Greengrass nucleus v2.6.0 and later, you can specify the
*
character in a resource to match any combination of characters. For example, you can specifyfactory/1/devices/Thermostat*/status
to allow access to a status topic for all thermostat devices in a factory, where each device's name begins withThermostat
.
When you define authorization policies for the AWS IoT Core MQTT IPC service, you can also
use MQTT wildcards (+
and #
) to match multiple resources. For more
information, see MQTT
wildcards in AWS IoT Core MQTT IPC authorization policies.
Recipe variables in authorization policies
If you use Greengrass nucleus v2.6.0 or later, you can use the {iot:thingName}
recipe variable in authorization policies. When you
need an authorization policy that includes the core device's name, such as for MQTT topics
or device shadows, you can use this recipe variable to configure a single authorization
policy for a group of core devices. For example, you can allow a component access to the
following resource for shadow IPC operations.
$aws/things/{iot:thingName}/shadow/
Special characters in authorization policies
To specify a literal *
or ?
character in an authorization
policy, you must use an escape sequence. The following escape sequences instruct the AWS IoT Greengrass Core
software to use the literal value instead of the character's special meaning. For example,
the *
character is a wildcard that matches any combination of characters.
Literal character | Escape sequence | Notes |
---|---|---|
|
|
|
|
|
AWS IoT Greengrass doesn't currently support the |
|
|
Use this escape sequence to match a resource that contains |
Authorization policy examples
You can reference the following authorization policy examples to help you configure authorization policies for your components.
Example component recipe with an authorization policy
The following example component recipe includes an accessControl
object
defines an authorization policy. This policy authorizes the
com.example.HelloWorld
component to publish to the test/topic
topic.
Example component configuration update with an authorization policy
The following example configuration update in a deployment specifies to configure a
component with an accessControl
object that defines an authorization policy.
This policy authorizes the com.example.HelloWorld
component to publish to the
test/topic
topic.
Subscribe to IPC event streams
You can use IPC operations to subscribe to streams of events on a Greengrass core device. To use a subscribe operation, define a subscription handler and create a request to the IPC service. Then, the IPC client runs the subscription handler's functions each time that the core device streams an event message to your component.
You can close a subscription to stop processing event messages. To do so, call
closeStream()
(Java), close()
(Python), or Close()
(C++) on the subscription operation object that you used to open the subscription.
The AWS IoT Greengrass Core IPC service supports the following subscribe operations:
Topics
Define subscription handlers
To define a subscription handler, create a class with callback functions that handle event messages, errors, and stream closure.
Best practices for subscription handlers
The IPC client uses a single thread that communicates with the IPC server and calls your subscription handler. You must consider this synchronous behavior when you write subscription handler functions. Follow these guidelines when you write subscription handler functions.
-
Run blocking code asynchronously
The IPC client can't send new requests or process new event messages while the thread is blocked. You can run blocking code in a separate thread that you run from the handler function. Blocking code includes
sleep
calls, loops that continuously run, and synchronous I/O requests that take time to complete. -
Send new IPC requests asynchronously
The IPC client can't send a new request from within subscription handler functions, because the request blocks the handler function if you wait for a response. You can send IPC requests in a separate thread that you run from the handler function.
-
Handle exceptions
The IPC client doesn't handle uncaught exceptions in subscription handler functions. If your handler function throws an exception, the subscription closes, and the exception doesn't appear in your component logs. You can catch exceptions in your handler functions to keep the subscription open and log errors that occur in your code.
Example subscription handlers
The following example demonstrates how to use the SubscribeToTopic operation and a subscription handler to subscribe to local publish/subscribe messages.