AWS IoT Device SDK を使用して Greengrass nucleus、その他のコンポーネント、および AWS IoT Core と通信する - AWS IoT Greengrass

AWS IoT Device SDK を使用して Greengrass nucleus、その他のコンポーネント、および AWS IoT Core と通信する

コアデバイスで実行されているコンポーネントは、AWS IoT Greengrass Core プロセス間通信 (IPC) ライブラリを AWS IoT Device SDK で使用して、AWS IoT Greengrass nucleus やその他の Greengrass コンポーネントと通信できます。IPC を使用するカスタムコンポーネントを開発して実行するには、AWS IoT Device SDK を使用して AWS IoT Greengrass Core IPC サービスに接続し、IPC オペレーションを実行する必要があります。

IPC インターフェイスは、以下の 2 種類のオペレーションをサポートします。

  • リクエスト/レスポンス

    コンポーネントは IPC サービスにリクエストを送信し、リクエストの結果を含むレスポンスを受け取ります。

  • サブスクリプション

    コンポーネントはサブスクリプション要求を IPC サービスに送信し、イベントメッセージのストリームをレスポンスとして期待します。コンポーネントは、イベントメッセージ、エラー、およびストリームクロージャを処理するサブスクリプションハンドラーを提供します。AWS IoT Device SDK には、各 IPC オペレーションの正しいレスポンスとイベントタイプを持つハンドラーインターフェイスが含まれます。詳細については、「IPC イベントストリームへのサブスクライブ」を参照してください。

IPC クライアントのバージョン

それ以降の Java および Python SDK のバージョンでは、IPC クライアント V2 と呼ばれる IPC クライアントの改良版を、AWS IoT Greengrass が提供します。IPC クライアント V2

  • IPC オペレーションの使用に必要なコードの記述を削減し、IPC クライアント V1 で発生する一般的なエラーの回避に役立ちます。

  • サブスクリプションハンドラーコールバックを別のスレッドで呼び出すため、IPC 関数呼び出しの追加呼び出しを含めたブロッキングコードを、サブスクリプションハンドラーコールバック内で実行できるようになりました。IPC クライアント V1 は同じスレッドを使用して、IPC サーバーとの通信とサブスクリプションハンドラーの呼び出しを行います。

  • Lambda 式 (Java の場合) または関数 (Python の場合) を使用して、サブスクリプションオペレーションを呼び出せます。IPC クライアント V1 では、サブスクリプションハンドラクラスの定義が必要です。

  • 同期バージョンと非同期バージョンの IPC オペレーションを提供します。IPC クライアント V1 では、非同期バージョンのオペレーションのみ提供します。

これらの機能改善を利用するには、IPC クライアント V2 の使用をお勧めします。ただし、このドキュメントやオンラインコンテンツに掲載されている例の多くは、IPC クライアント V1 の使用方法のみ紹介しています。次の例とチュートリアルから、IPC クライアント V2 を使用するサンプルのコンポーネントを確認できます。

現在 AWS IoT Device SDK for C++ v2 では、IPC クライアント V1のみをサポートしています。

プロセス間通信でサポートされている SDK

AWS IoT Greengrass Core IPC ライブラリは以下の AWS IoT Device SDK バージョンに含まれています。

AWS IoT Greengrass Core IPC サービスに接続する

カスタムコンポーネントでプロセス間通信を使用するには、AWS IoT Greengrass Core ソフトウェアが稼働する IPC サーバーソケットへの接続を作成する必要があります。AWS IoT Device SDK を任意の言語でダウンロードして使用するには、次のタスクを実行してください。

AWS IoT Device SDK for Java v2 (IPC クライアント V2) を使用するには
  1. AWS IoT Device SDK for Java v2 (v1.6.0 以降) をダウンロードします。

  2. 以下のいずれかを行って、コンポーネントでカスタムコードを実行します。

    • AWS IoT Device SDK JAR ファイルとしてコンポーネントを構築し、この JAR ファイルをコンポーネント recipe で実行します。

    • AWS IoT Device SDK JAR をコンポーネントアーティファクトとして定義し、コンポーネント recipe でアプリケーションを実行するときに、そのアーティファクトをクラスパスに追加します。

  3. 以下のコードを使用して IPC クライアントを作成します。

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
AWS IoT Device SDK for Python v2 (IPC クライアント V2) を使用するには
  1. AWS IoT Device SDK for Python (v1.9.0 以降) をダウンロードします。

  2. SDK のインストール手順を、コンポーネントの recipe のインストールライフサイクルに追加します。

  3. AWS IoT Greengrass Core IPC サービスへの接続を作成します。以下のコードを使用して IPC クライアントを作成します。

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
AWS IoT Device SDK for Java v2 (IPC クライアント V1) を使用するには
  1. AWS IoT Device SDK for Java v2 (v1.2.10 以降) をダウンロードします。

  2. 以下のいずれかを行って、コンポーネントでカスタムコードを実行します。

    • AWS IoT Device SDK JAR ファイルとしてコンポーネントを構築し、この JAR ファイルをコンポーネント recipe で実行します。

    • AWS IoT Device SDK JAR をコンポーネントアーティファクトとして定義し、コンポーネント recipe でアプリケーションを実行するときに、そのアーティファクトをクラスパスに追加します。

  3. AWS IoT Greengrass Core IPC サービスへの接続を作成します。IPC クライアント GreengrassCoreIPCClient には EventStreamRPCConnection が必要です。以下の IPCUtils クラスをダウンロードすると、この接続が提供されます。

    /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.greengrass.util; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnectionConfig; import software.amazon.awssdk.eventstreamrpc.GreengrassConnectMessageSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public final class IPCUtils { // Port number is not used in domain sockets. // It is ignored but the field needs to be set when creating socket connection. public static final int DEFAULT_PORT_NUMBER = 8033; private static EventStreamRPCConnection clientConnection = null; private IPCUtils() { } public static EventStreamRPCConnection getEventStreamRpcConnection() throws ExecutionException, InterruptedException { String ipcServerSocketPath = System.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT"); String authToken = System.getenv("SVCUID"); SocketOptions socketOptions = IPCUtils.getSocketOptionsForIPC(); if (clientConnection == null) { clientConnection = connectToGGCOverEventStreamIPC(socketOptions, authToken, ipcServerSocketPath); } return clientConnection; } private static EventStreamRPCConnection connectToGGCOverEventStreamIPC(SocketOptions socketOptions, String authToken, String ipcServerSocketPath) throws ExecutionException, InterruptedException { try (EventLoopGroup elGroup = new EventLoopGroup(1); ClientBootstrap clientBootstrap = new ClientBootstrap(elGroup, null)) { final EventStreamRPCConnectionConfig config = new EventStreamRPCConnectionConfig(clientBootstrap, elGroup, socketOptions, null, ipcServerSocketPath, DEFAULT_PORT_NUMBER, GreengrassConnectMessageSupplier.connectMessageSupplier(authToken)); final CompletableFuture<Void> connected = new CompletableFuture<>(); final EventStreamRPCConnection connection = new EventStreamRPCConnection(config); final boolean[] disconnected = {false}; final int[] disconnectedCode = {-1}; connection.connect(new EventStreamRPCConnection.LifecycleHandler() { // Only called on successful connection. @Override public void onConnect() { connected.complete(null); } @Override public void onDisconnect(int errorCode) { disconnected[0] = true; disconnectedCode[0] = errorCode; clientConnection = null; } // This on error is for any error that is connection level, including problems during connect() @Override public boolean onError(Throwable t) { connected.completeExceptionally(t); clientConnection = null; return true; // True instructs handler to disconnect due to this error. } }); connected.get(); return connection; } } private static SocketOptions getSocketOptionsForIPC() { SocketOptions socketOptions = new SocketOptions(); socketOptions.connectTimeoutMs = 3000; socketOptions.domain = SocketOptions.SocketDomain.LOCAL; socketOptions.type = SocketOptions.SocketType.STREAM; return socketOptions; } }
  4. 以下のコードを使用して IPC クライアントを作成します。

    try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
AWS IoT Device SDK for Python v2 (IPC クライアント V1) を使用するには
  1. AWS IoT Device SDK for Python (v1.5.3 以降) をダウンロードします。

  2. SDK のインストール手順を、コンポーネントの recipe のインストールライフサイクルに追加します。

  3. AWS IoT Greengrass Core IPC サービスへの接続を作成します。次の手順を実行して、IPC クライアントを作成し、接続を確立します。

    SDK v1.5.4 or later

    以下のコードを使用して IPC クライアントを作成します。

    import awsiot.greengrasscoreipc try: ipc_client = awsiot.greengrasscoreipc.connect() # Use client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
    SDK v1.5.3
    1. 以下の IPCUtils クラスをダウンロードすると、IPC サーバー接続が提供されます。

      # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os from awscrt.io import ( ClientBootstrap, DefaultHostResolver, EventLoopGroup, SocketDomain, SocketOptions, ) from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment TIMEOUT = 10 class IPCUtils: def connect(self): elg = EventLoopGroup() resolver = DefaultHostResolver(elg) bootstrap = ClientBootstrap(elg, resolver) socket_options = SocketOptions() socket_options.domain = SocketDomain.Local amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID")) hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT") connection = Connection( host_name=hostname, port=8033, bootstrap=bootstrap, socket_options=socket_options, connect_message_amender=amender, ) self.lifecycle_handler = LifecycleHandler() connect_future = connection.connect(self.lifecycle_handler) connect_future.result(TIMEOUT) return connection
    2. 以下のコードを使用して IPC クライアントを作成します。

      import awsiot.greengrasscoreipc.client as client try: ipc_utils = IPCUtils() connection = ipc_utils.connect() ipc_client = client.GreengrassCoreIPCClient(connection) # Use client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

AWS IoT Device SDK v2 for C++ をビルドするには、デバイスに以下のツールが必要です。

  • C++ 11 以降

  • CMake 3.1 以降

  • 以下のいずれかのコンパイラ:

    • GCC 4.8 以降

    • Clang 3.9 以降

    • MSVC 2015 以降

AWS IoT Device SDK for C++ v2 を使用するには
  1. AWS IoT Device SDK for C++ v2 (v1.17.0 以降) をダウンロードします。

  2. README のインストール手順に従って、AWS IoT Device SDK for C++ v2 をソースからビルドします。

  3. C++ ビルドツールで、前の手順でビルドした Greengrass IPC ライブラリ (AWS::GreengrassIpc-cpp) をリンクします。以下の CMakeLists.txt の例は、Greengrass IPC ライブラリを CMake でビルドしたプロジェクトにリンクします。

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. コンポーネントコードで AWS IoT Greengrass Core IPC サービスへの接続を作成して、IPC クライアント (Aws::Greengrass::GreengrassCoreIpcClient) を作成します。IPC の接続、切断、およびエラーイベントを処理する IPC 接続ライフサイクルハンドラーを定義する必要があります。次の例は、IPC クライアントの接続、切断、およびエラーの発生時に出力される IPC クライアントおよび IPC 接続ライフサイクルハンドラーを作成します。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. コンポーネントでカスタムコードを実行するには、コードをバイナリアーティファクトとしてビルドし、コンポーネント recipe でバイナリアーティファクトを実行します。OWNER へのアーティファクトの Execute 権限を設定して、AWS IoT Greengrass Core ソフトウェアがバイナリアーティファクトを実行できるようにします。

    コンポーネント recipe の Manifests セクションは、次の例のようになります。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: Run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

コンポーネントに IPC オペレーションの実行を許可する

カスタムコンポーネントが IPC オペレーションの一部を使用できるようにするには、コンポーネントが特定のリソースに対しオペレーションを実行できるようにする承認ポリシーを定義する必要があります。各承認ポリシーは、ポリシーが許可するオペレーションのリストとリソースのリストを定義します。たとえば、パブリッシュ/サブスクライブメッセージング IPC サービスは、トピックリソースの発行およびサブスクライブのオペレーションを定義します。* ワイルドカードを使用すると、すべてのオペレーションまたはすべてのリソースへのアクセスを許可できます。

承認ポリシーは accessControl 設定パラメータで定義しますが、これはコンポーネント recipe で、またはコンポーネントをデプロイするときに設定できます。accessControl オブジェクトは、IPC サービス識別子を承認ポリシーのリストにマッピングします。各 IPC サービスに対しては、複数の承認ポリシーを定義してアクセスを制御できます。各承認ポリシーにはポリシー ID があり、すべてのコンポーネントで一意である必要があります。

ヒント

一意のポリシー ID を作成するには、コンポーネント名、IPC サービス名、およびカウンターを組み合わせることができます。たとえば com.example.HelloWorld という名前のコンポーネントには、以下の ID を持つ 2 つのパブリッシュ/サブスクライブ承認ポリシーを定義できます。

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

承認ポリシーは以下の形式を使用します。このオブジェクトは accessControl 設定パラメータです。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

承認ポリシー内のワイルドカード

IPC 承認ポリシーの resources エレメントで * ワイルドカードを使うと、1 つの承認ポリシーにある複数のリソースにアクセスできます。

  • Greengrass nucleus の全てのバージョンにおいて、リソースとして * の文字を指定すると、全てのリソースにアクセスできます。

  • Greengrass nucleus の v2.6.0 以降では、リソースで * の文字を指定すると、どの文字列の組み合わせにも一致します。例えば factory/1/devices/Thermostat*/status と指定すると、工場内のサーモスタットデバイスのうち、名前が Thermostat で始まるすべてのデバイスのステータストピックへアクセスできます。

AWS IoT Core MQTT IPC サービスの承認ポリシーを定義する場合、MQTT ワイルドカード (+ および #) を使用すると、複数のリソースに一致します。詳細については、「MQTT wildcards in AWS IoT Core MQTT IPC authorization policies」(AWS IoT Core MQTT IPC 承認ポリシーにおける MQTT ワイルドカード) を参照してください。

承認ポリシーの recipe 変数

Greengrass nucleus の v2.6.0 以降を使用していて、Greengrass nucleus の interpolateComponentConfiguration 設定オプションを true に設定している場合、承認ポリシーで {iot:thingName} recipe 変数を使用できます。MQTT トピックやデバイスシャドウなど、コアデバイスの名前を含む承認ポリシーが必要な場合、この recipe 変数を使用すると、複数のコアデバイスからなるグループに対して、1 つの承認ポリシーを設定できます。例えば、シャドウ IPC オペレーションのために、コンポーネントに次のリソースへのアクセスを許可することができます。

$aws/things/{iot:thingName}/shadow/

承認ポリシーの特殊文字

承認ポリシーで * または ? をリテラル文字として指定するには、エスケープシーケンスを使う必要があります。次のエスケープシーケンスは、その文字が持つ特別な意味の代わりに、文字のリテラル値を使うように AWS IoT Greengrass Core ソフトウェアに指示します。例えば、 * の文字は任意の文字の組み合わせに一致するワイルドカードです。

リテラル文字 エスケープシーケンス メモ

*

${*}

?

${?}

現在 AWS IoT Greengrass は、任意の 1 文字に一致する ? ワイルドカードをサポートしていません。

$

${$}

このエスケープシーケンスを使用すると、${ を含むリソースに一致します。例えば、${resourceName} という名前のリソースに一致させるには、${$}{resourceName} と指定する必要があります。もしくは、$aws で始まるトピックへアクセスできるように、リテラルの $ を使って $ を含むリソースに一致させます。

承認ポリシーの例

次の承認ポリシーの例を参照して、コンポーネントの承認ポリシー設定の参考にできます。

例 承認ポリシーを使用したコンポーネント recipe の例

次のコンポーネント recipe の例には、承認ポリシーを定義する accessControl オブジェクトが含まれます。このポリシーは com.example.HelloWorld コンポーネントを承認して test/topic トピックに発行します。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/HelloWorld.jar
例 承認ポリシーを使用したコンポーネント設定更新の例

次のデプロイの設定更新の例では、承認ポリシーを定義する accessControl オブジェクトでコンポーネントを設定することが指定されています。このポリシーは com.example.HelloWorld コンポーネントを承認して test/topic トピックに発行します。

Console
マージする設定
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

次のコマンドは、コアデバイスにデプロイを作成します。

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

hello-world-deployment.json ファイルには、次の JSON ドキュメントが含まれています。

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

次の Greengrass CLI コマンドは、コアデバイスにローカルデプロイを作成します。

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

hello-world-configuration.json ファイルには、次の JSON ドキュメントが含まれています。

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

IPC イベントストリームへのサブスクライブ

IPC オペレーションを使用して、Greengrass コアデバイスのイベントのストリームにサブスクライブできます。サブスクライブオペレーションを使用するには、サブスクリプションハンドラーを定義して、IPC サービスへのリクエストを作成します。これで IPC クライアントは、コアデバイスがコンポーネントにイベントメッセージをストリーミングするたびに、サブスクリプションハンドラーの関数が実行するようになります。

サブスクリプションを閉じると、イベントメッセージの処理を停止できます。これを行うには、サブスクリプションを開くときに使用したサブスクリプションオペレーションオブジェクトで、closeStream() (Java)、close() (Python)、または Close() (C++) を呼び出します。

AWS IoT Greengrass Core IPC サービスは、以下のサブスクライブオペレーションをサポートします。

サブスクリプションハンドラーの定義

サブスクリプションハンドラーを定義するには、イベントメッセージ、エラー、およびストリームクロージャを処理するコールバック関数を定義します。IPC クライアント V1 を使用する場合は、クラス内でこれらの関数を定義する必要があります。最近の Java または Python SDK で利用可能な IPC クライアント V2 を使用している場合、サブスクリプションハンドラークラスを作成しなくても、これらの関数を定義できます。

Java

IPC クライアント V1 を使用している場合は、一般的な software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> インターフェイスを実装する必要があります。StreamEventType は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。

IPC クライアント V2 を使用している場合は、サブスクリプションハンドラクラスの外部でこれらの関数を定義するか、Lambda 式を使います。

void onStreamEvent(StreamEventType event)

IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。

boolean onStreamError(Throwable error)

ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。

エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。

void onStreamClosed()

ストリームが閉じたときに IPC クライアントが呼び出すコールバック。

Python

IPC クライアント V1 を使用している場合、サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスを拡張する必要があります。AWS IoT Device SDK には、サブスクリプションオペレーションごとにサブスクリプションハンドラークラスが含まれています。StreamEventType は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。

IPC クライアント V2 を使用している場合は、サブスクリプションハンドラクラスの外部でこれらの関数を定義するか、Lambda 式を使います。

def on_stream_event(self, event: StreamEventType) -> None

IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。

def on_stream_error(self, error: Exception) -> bool

ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。

エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。

def on_stream_closed(self) -> None

ストリームが閉じたときに IPC クライアントが呼び出すコールバック。

C++

サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスから派生したクラスを実装します。AWS IoT Device SDK には、サブスクリプションオペレーションごとにサブスクリプションハンドラーベースクラスが含まれています。StreamEventType は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。

void OnStreamEvent(StreamEventType *event)

IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。

bool OnStreamError(OperationError *error)

ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。

エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。

void OnStreamClosed()

ストリームが閉じたときに IPC クライアントが呼び出すコールバック。

サブスクリプションハンドラーの例

次の例は、SubscribeToTopic オペレーションと、ローカルのパブリッシュ/サブスクライブメッセージにサブスクライブするためのサブスクリプションハンドラーの使用方法を示します。

Java (IPC client V2)
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
Java (IPC client V1)
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
注記

この例は IPCUtils クラスを使用して、AWS IoT Greengrass Core IPC サービスへの接続を作成します。詳細については、「AWS IoT Greengrass Core IPC サービスに接続する」を参照してください。

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToTopic { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler = new SubscriptionResponseHandler(topic); SubscribeToTopicResponseHandler responseHandler = SubscribeToTopic.subscribeToTopic(ipcClient, topic, streamResponseHandler); CompletableFuture<SubscribeToTopicResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToTopicResponseHandler subscribeToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler) { SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest(); subscribeToTopicRequest.setTopic(topic); return greengrassCoreIPCClient.subscribeToTopic(subscribeToTopicRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { private final String topic; public SubscriptionResponseHandler(String topic) { this.topic = topic; } @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage().getMessage(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", this.topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
Python (IPC client V1)
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
注記

この例では、AWS IoT Device SDK for Python v2 のバージョン 1.5.4 以降を使用していることを前提としています。SDK のバージョン 1.5.3 を使用している場合、AWS IoT Greengrass Core IPC サービスへの接続方法の詳細については「AWS IoT Device SDK for Python v2 (IPC クライアント V1) を使用する」を参照してください。

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message_string = str(event.binary_message.message, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }

IPC ベストプラクティス

カスタムコンポーネントで IPC を使用する場合のベストプラクティスは、IPC クライアント V1 と IPC クライアント V2 とで異なります。使用する IPC クライアントのバージョンのベストプラクティスに従ってください。

IPC client V2

IPC クライアント V2 は、別のスレッドでコールバック関数を実行するため、IPC クライアント V1 と比較すると、IPC を使用する場合や、サブスクリプションハンドラ関数を記述する場合に、従うべきガイドラインが少なくなります。

  • 1 つの IPC クライアントを再利用する

    IPC クライアントを作成したら、そのクライアントを開いたままにして、全ての IPC オペレーションに再利用します。複数のクライアントを作成すると、リソースが余分に消費され、リソースリークが発生する可能性があります。

  • 例外を処理する

    IPC クライアント V2 は、サブスクリプションハンドラー関数でキャッチされない例外をログに記録します。コードで発生したエラーを処理するには、ハンドラー関数で例外をキャッチする必要があります。

IPC client V1

IPC クライアント V1 は単一のスレッドを使用し、IPC サーバーとの通信とサブスクリプションハンドラーの呼び出しを行います。サブスクリプションハンドラー関数を記述するときは、この同期動作を考慮する必要があります。

  • 1 つの IPC クライアントを再利用する

    IPC クライアントを作成したら、そのクライアントを開いたままにして、全ての IPC オペレーションに再利用します。複数のクライアントを作成すると、リソースが余分に消費され、リソースリークが発生する可能性があります。

  • ブロッキングコードを非同期で実行する

    IPC クライアント V1 は、スレッドがブロックされている間は、新しいリクエストの送信や新しいイベントメッセージの処理を行うことはできません。ブロッキングコードは、ハンドラー関数から実行する別個のスレッドで実行するべきです。ブロックコードには sleep 呼び出し、連続して実行されるループ、および完了までに時間がかかる同期 I/O リクエストなどがあります。

  • 新しい IPC 要求を非同期で送信する

    IPC クライアント V1 はサブスクリプションハンドラー関数内から新しいリクエストを送信できません。これは、応答を待機するとリクエストによってハンドラー関数がブロックされるためです。IPC リクエストの送信は、ハンドラー関数から実行する別個のスレッドで行うべきです。

  • 例外を処理する

    IPC クライアント V1 は、キャッチされない例外はサブスクリプションハンドラー関数で処理しません。ハンドラー関数が例外をスローすると、サブスクリプションが閉じて、例外はコンポーネントログに表示されません。ハンドラー関数で例外をキャッチして、サブスクリプションを開いたままにし、コードで発生したエラーをログに記録してください。