翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
を使用して AWS IoT Device SDK Greengrass nucleus、その他のコンポーネント、および と通信します。 AWS IoT Core
コアデバイスで実行されているコンポーネントは、 の AWS IoT Greengrass Core プロセス間通信 (IPC) ライブラリを使用して AWS IoT Device SDK 、 AWS IoT Greengrass nucleus やその他の Greengrass コンポーネントと通信できます。を使用するカスタムコンポーネントを開発して実行するにはIPC、 を使用して AWS IoT Greengrass Core IPCサービス AWS IoT Device SDK に接続し、IPCオペレーションを実行する必要があります。
IPC インターフェイスは、次の 2 種類のオペレーションをサポートしています。
-
リクエスト/レスポンス
コンポーネントはサービスにリクエストを送信IPCし、リクエストの結果を含むレスポンスを受け取ります。
-
サブスクリプション
コンポーネントはサブスクリプションリクエストをIPCサービスに送信し、レスポンスとしてイベントメッセージのストリームを期待します。コンポーネントは、イベントメッセージ、エラー、およびストリームクロージャを処理するサブスクリプションハンドラーを提供します。には、IPCオペレーションごとに正しいレスポンスとイベントタイプを持つハンドラーインターフェイス AWS IoT Device SDK が含まれています。詳細については、「IPC イベントストリームをサブスクライブする」を参照してください。
IPC クライアントバージョン
Java および Python の以降のバージョンではSDKs、 はIPCクライアント V2 と呼ばれる改善されたバージョンのIPCクライアント AWS IoT Greengrass を提供します。IPC クライアント V2:
-
IPC オペレーションを使用するために記述する必要があるコードの量を減らし、IPCクライアント V1 で発生する可能性のある一般的なエラーを回避します。
-
サブスクリプションハンドラーのコールバックを別のスレッドで呼び出すため、サブスクリプションハンドラーのコールバックで追加のIPC関数呼び出しを含むブロッキングコードを実行できるようになりました。IPC クライアント V1 は、同じスレッドを使用してIPCサーバーと通信し、サブスクリプションハンドラーのコールバックを呼び出します。
-
Lambda 式 (Java の場合) または関数 (Python の場合) を使用して、サブスクリプションオペレーションを呼び出せます。IPC クライアント V1 では、サブスクリプションハンドラークラスを定義する必要があります。
-
各IPCオペレーションの同期バージョンと非同期バージョンを提供します。IPC クライアント V1 は、各オペレーションの非同期バージョンのみを提供します。
これらの改善を活用するには、IPCクライアント V2 を使用することをお勧めします。ただし、このドキュメントや一部のオンラインコンテンツでは、IPCクライアント V1 の使用方法のみを示す例が多数あります。次の例とチュートリアルを使用して、IPCクライアント V2 を使用するサンプルコンポーネントを表示できます。
現在、 AWS IoT Device SDK for C++ v2 はIPCクライアント V1 のみをサポートしています。
プロセス間通信SDKsでサポート
AWS IoT Greengrass Core IPCライブラリは次の AWS IoT Device SDK バージョンに含まれています。
AWS IoT Greengrass Core IPCサービスに接続する
カスタムコンポーネントでプロセス間通信を使用するには、 AWS IoT Greengrass Core ソフトウェアが実行するIPCサーバーソケットへの接続を作成する必要があります。選択した言語で をダウンロードして使用する AWS IoT Device SDK には、次のタスクを実行します。
AWS IoT Device SDK for Java v2 (IPC クライアント V2) を使用するには
-
AWS IoT Device SDK for Java v2 (v1.6.0 以降) をダウンロードします。
-
以下のいずれかを行って、コンポーネントでカスタムコードを実行します。
-
次のコードを使用してIPCクライアントを作成します。
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
// Use client.
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
System.exit(1);
}
AWS IoT Device SDK for Python v2 (IPC クライアント V2) を使用するには
-
AWS IoT Device SDK for Python (v1.9.0 以降) をダウンロードします。
-
SDKコンポーネントの recipe のインストールライフサイクルに のインストールステップを追加します。
-
AWS IoT Greengrass Core IPCサービスへの接続を作成します。次のコードを使用してIPCクライアントを作成します。
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
try:
ipc_client = GreengrassCoreIPCClientV2()
# Use IPC client.
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
AWS IoT Device SDK v2 for C++ を構築するには、デバイスに次のツールが必要です。
-
C++ 11 以降
-
CMake 3.1 以降
-
以下のいずれかのコンパイラ:
-
GCC 4.8 以降
-
Clang 3.9 以降
-
MSVC 2015 以降
AWS IoT Device SDK for C++ v2 を使用するには
-
AWS IoT Device SDK for C++ v2 (v1.17.0 以降) をダウンロードします。
-
「」のインストール手順に従ってREADME、ソースから for C++ v2 を構築します。 AWS IoT Device SDK
-
C++ 構築ツールで、前のステップで構築AWS::GreengrassIpc-cpp
した Greengrass IPCライブラリ をリンクします。次のCMakeLists.txt
例では、Greengrass IPCライブラリを で構築したプロジェクトにリンクしますCMake。
cmake_minimum_required(VERSION 3.1)
project (greengrassv2_pubsub_subscriber)
file(GLOB MAIN_SRC
"*.h"
"*.cpp"
)
add_executable(${PROJECT_NAME} ${MAIN_SRC})
set_target_properties(${PROJECT_NAME} PROPERTIES
LINKER_LANGUAGE CXX
CXX_STANDARD 11)
find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
コンポーネントコードで、 AWS IoT Greengrass Core IPCサービスへの接続を作成して IPCクライアント () を作成しますAws::Greengrass::GreengrassCoreIpcClient
。IPC 接続、切断、エラーイベントを処理するIPC接続ライフサイクルハンドラーを定義する必要があります。次の例では、IPCクライアントと、IPCクライアントがIPC接続、切断、エラーが発生したときに が出力する接続ライフサイクルハンドラーを作成します。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
// Create the IPC client.
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
// Use the IPC client to create an operation request.
// Activate the operation request.
auto activate = operation.Activate(request, nullptr);
activate.wait();
// Wait for Greengrass Core to respond to the request.
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
// Check the result of the request.
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
return 0;
}
-
コンポーネントでカスタムコードを実行するには、コードをバイナリアーティファクトとしてビルドし、コンポーネント recipe でバイナリアーティファクトを実行します。アーティファクトのExecute
アクセス許可を に設定OWNER
して、 AWS IoT Greengrass Core ソフトウェアがバイナリアーティファクトを実行できるようにします。
コンポーネント recipe の Manifests
セクションは、次の例のようになります。
- JSON
-
{
...
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
...
Manifests:
- Lifecycle:
run: {artifacts:path}/greengrassv2_pubsub_subscriber
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
Permission:
Execute: OWNER
NodeJS で使用する AWS IoT Device SDK for JavaScript v2 を構築するには、デバイスに次のツールが必要です。 NodeJS
-
NodeJS 10.0 以降
-
CMake 3.1 以降
for JavaScript v2 ( AWS IoT Device SDK IPC クライアント V1) を使用するには
-
AWS IoT Device SDK for JavaScript v2 (v 以降) をダウンロードします。1.12.10
-
「」のインストール手順に従って、READMEソースから AWS IoT Device SDK for JavaScript v2 を構築します。
-
AWS IoT Greengrass Core IPCサービスへの接続を作成します。IPC 次の手順を実行してクライアントを作成し、接続を確立します。
-
次のコードを使用してIPCクライアントを作成します。
import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
let client = greengrascoreipc.createClient();
-
次のコードを使用して、コンポーネントから Greengrass nucleus への接続を確立します。
await client.connect();
コンポーネントにIPCオペレーションの実行を許可する
カスタムコンポーネントが一部のIPCオペレーションを使用できるようにするには、コンポーネントが特定のリソースでオペレーションを実行できるようにする承認ポリシーを定義する必要があります。各承認ポリシーは、ポリシーが許可するオペレーションのリストとリソースのリストを定義します。例えば、パブリッシュ/サブスクライブメッセージングIPCサービスは、トピックリソースのパブリッシュおよびサブスクライブオペレーションを定義します。*
ワイルドカードを使用すると、すべてのオペレーションまたはすべてのリソースへのアクセスを許可できます。
承認ポリシーは accessControl
設定パラメータで定義しますが、これはコンポーネント recipe で、またはコンポーネントをデプロイするときに設定できます。accessControl
オブジェクトは、IPCサービス識別子を認可ポリシーのリストにマッピングします。アクセスを制御するために、IPCサービスごとに複数の承認ポリシーを定義できます。各承認ポリシーにはポリシー ID があり、すべてのコンポーネントで一意である必要があります。
一意のポリシー を作成するにはIDs、コンポーネント名、IPCサービス名、カウンターを組み合わせることができます。例えば、 という名前のコンポーネントcom.example.HelloWorld
は、次の を使用して 2 つのパブリッシュ/サブスクライブ承認ポリシーを定義できますIDs。
承認ポリシーは以下の形式を使用します。このオブジェクトは accessControl
設定パラメータです。
- JSON
-
{
"IPC service identifier
": {
"policyId
": {
"policyDescription": "description
",
"operations": [
"operation1
",
"operation2
"
],
"resources": [
"resource1
",
"resource2
"
]
}
}
}
- YAML
-
IPC service identifier
:
policyId
:
policyDescription: description
operations:
- operation1
- operation2
resources:
- resource1
- resource2
承認ポリシー内のワイルドカード
IPC 承認ポリシーの resources
要素で*
ワイルドカードを使用して、1 つの承認ポリシーで複数のリソースへのアクセスを許可できます。
-
Greengrass nucleus の全てのバージョンにおいて、リソースとして *
の文字を指定すると、全てのリソースにアクセスできます。
-
Greengrass nucleus の v2.6.0 以降では、リソースで *
の文字を指定すると、どの文字列の組み合わせにも一致します。例えば factory/1/devices/Thermostat*/status
と指定すると、工場内のサーモスタットデバイスのうち、名前が Thermostat
で始まるすべてのデバイスのステータストピックへアクセスできます。
IPC サービスの承認 AWS IoT Core MQTTポリシーを定義するときは、MQTTワイルドカード (+
と #
) を使用して複数のリソースを照合することもできます。詳細については、「 MQTTIPC承認ポリシー」の AWS IoT Core MQTT「ワイルドカード」を参照してください。
承認ポリシーの recipe 変数
Greengrass nucleus v2.6.0 以降を使用していて、Greengrass nucleus interpolateComponentConfigurationの設定オプションを に設定している場合はtrue
、承認ポリシーで {iot:thingName}
recipe 変数を使用できます。MQTT トピックやデバイスシャドウなど、コアデバイスの名前を含む承認ポリシーが必要な場合は、この recipe 変数を使用して、コアデバイスのグループに 1 つの承認ポリシーを設定できます。例えば、シャドウIPCオペレーションのために次のリソースへのコンポーネントアクセスを許可できます。
$aws/things/{iot:thingName}/shadow/
承認ポリシーの特殊文字
承認ポリシーで *
または ?
をリテラル文字として指定するには、エスケープシーケンスを使う必要があります。次のエスケープシーケンスは、 AWS IoT Greengrass Core ソフトウェアに、キャラクターの特別な意味の代わりにリテラル値を使用するように指示します。例えば、 *
の文字は任意の文字の組み合わせに一致するワイルドカードです。
リテラル文字 |
エスケープシーケンス |
メモ |
*
|
${*}
|
|
?
|
${?}
|
AWS IoT Greengrass は現在、任意の 1 文字に一致する? ワイルドカードをサポートしていません。
|
$
|
${$}
|
このエスケープシーケンスを使用すると、${ を含むリソースに一致します。例えば、${resourceName} という名前のリソースに一致させるには、${$}{resourceName} と指定する必要があります。もしくは、$aws で始まるトピックへアクセスできるように、リテラルの $ を使って $ を含むリソースに一致させます。
|
承認ポリシーの例
次の承認ポリシーの例を参照して、コンポーネントの承認ポリシー設定の参考にできます。
例 承認ポリシーを使用したコンポーネント recipe の例
次のコンポーネント recipe の例には、承認ポリシーを定義する accessControl
オブジェクトが含まれます。このポリシーは com.example.HelloWorld
コンポーネントを承認して test/topic
トピックに発行します。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/HelloWorld.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
"com.example.HelloWorld:pubsub:1":
policyDescription: Allows access to publish to test/topic.
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "test/topic"
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/HelloWorld.jar
例 承認ポリシーを使用したコンポーネント設定更新の例
次のデプロイの設定更新の例では、承認ポリシーを定義する accessControl
オブジェクトでコンポーネントを設定することが指定されています。このポリシーは com.example.HelloWorld
コンポーネントを承認して test/topic
トピックに発行します。
- Console
-
- マージする設定
-
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
- AWS CLI
-
次のコマンドは、コアデバイスにデプロイを作成します。
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
hello-world-deployment.json
ファイルには、次のJSONドキュメントが含まれています。
{
"targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
"deploymentName": "Deployment for MyGreengrassCore",
"components": {
"com.example.HelloWorld": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
}
}
}
}
- Greengrass CLI
-
次の Greengrass CLI コマンドは、コアデバイスにローカルデプロイを作成します。
sudo greengrass-cli deployment create \
--recipeDir recipes \
--artifactDir artifacts \
--merge "com.example.HelloWorld=1.0.0" \
--update-config hello-world-configuration.json
hello-world-configuration.json
ファイルには、次のJSONドキュメントが含まれています。
{
"com.example.HelloWorld": {
"MERGE": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
}
}
IPC イベントストリームをサブスクライブする
IPC オペレーションを使用して、Greengrass コアデバイスのイベントのストリームをサブスクライブできます。サブスクライブオペレーションを使用するには、サブスクリプションハンドラーを定義し、IPCサービスへのリクエストを作成します。次に、コアデバイスがイベントメッセージをコンポーネントにストリーミングするたびに、IPCクライアントはサブスクリプションハンドラーの関数を実行します。
サブスクリプションを閉じると、イベントメッセージの処理を停止できます。これを行うには、サブスクリプションを開くときに使用したサブスクリプションオペレーションオブジェクトで、closeStream()
(Java)、close()
(Python)、または Close()
(C++) を呼び出します。
AWS IoT Greengrass Core IPCサービスは、次のサブスクライブオペレーションをサポートしています。
サブスクリプションハンドラーの定義
サブスクリプションハンドラーを定義するには、イベントメッセージ、エラー、およびストリームクロージャを処理するコールバック関数を定義します。IPC クライアント V1 を使用する場合は、これらの関数をクラスで定義する必要があります。Java および Python の以降のバージョンで利用可能なIPCクライアント V2 を使用する場合SDKs、サブスクリプションハンドラークラスを作成せずにこれらの関数を定義できます。
- Java
-
IPC クライアント V1 を使用する場合は、汎用software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType
>
インターフェイスを実装する必要があります。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
IPC クライアント V2 を使用する場合は、サブスクリプションハンドラークラスの外部でこれらの関数を定義するか、lambda 式 を使用できます。
void onStreamEvent(StreamEventType
event)
-
メッセージMQTTやコンポーネント更新通知などのイベントメッセージを受信したときにIPCクライアントが呼び出すコールバック。
boolean onStreamError(Throwable error)
-
ストリームエラーが発生したときにIPCクライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
void onStreamClosed()
-
ストリームが閉じられたときにIPCクライアントが呼び出すコールバック。
- Python
-
IPC クライアント V1 を使用する場合は、サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスを拡張する必要があります。には、サブスクリプションオペレーションごとにサブスクリプションハンドラークラス AWS IoT Device SDK が含まれています。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
IPC クライアント V2 を使用する場合は、サブスクリプションハンドラークラスの外部でこれらの関数を定義するか、lambda 式 を使用できます。
def on_stream_event(self, event:
StreamEventType
) -> None
-
メッセージMQTTやコンポーネント更新通知などのイベントメッセージを受信したときにIPCクライアントが呼び出すコールバック。
def on_stream_error(self, error: Exception) -> bool
-
ストリームエラーが発生したときにIPCクライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
def on_stream_closed(self) -> None
-
ストリームが閉じられたときにIPCクライアントが呼び出すコールバック。
- C++
-
サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスから派生したクラスを実装します。には、各サブスクリプションオペレーションのサブスクリプションハンドラーベースクラス AWS IoT Device SDK が含まれています。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
void OnStreamEvent(StreamEventType
*event)
-
メッセージMQTTやコンポーネント更新通知などのイベントメッセージを受信したときにIPCクライアントが呼び出すコールバック。
bool OnStreamError(OperationError *error)
-
ストリームエラーが発生したときにIPCクライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
void OnStreamClosed()
-
ストリームが閉じられたときにIPCクライアントが呼び出すコールバック。
- JavaScript
-
サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスから派生したクラスを実装します。には、各サブスクリプションオペレーションのサブスクリプションハンドラーベースクラス AWS IoT Device SDK が含まれています。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
on(event: 'ended', listener: StreamingOperationEndedListener)
-
ストリームが閉じられたときにIPCクライアントが呼び出すコールバック。
on(event: 'streamError', listener: StreamingRpcErrorListener)
-
ストリームエラーが発生したときにIPCクライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
on(event: 'message', listener: (message: InboundMessageType) => void)
-
メッセージMQTTやコンポーネント更新通知などのイベントメッセージを受信したときにIPCクライアントが呼び出すコールバック。
サブスクリプションハンドラーの例
次の例は、SubscribeToTopic オペレーションと、ローカルのパブリッシュ/サブスクライブメッセージにサブスクライブするためのサブスクリプションハンドラーの使用方法を示します。
- Java (IPC client V2)
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
IPC ベストプラクティス
カスタムコンポーネントIPCで を使用するためのベストプラクティスは、IPCクライアント V1 とIPCクライアント V2 で異なります。使用するIPCクライアントバージョンのベストプラクティスに従ってください。
- IPC client V2
-
IPC クライアント V2 は別のスレッドでコールバック関数を実行するため、IPCクライアント V1 と比較して、サブスクリプションハンドラー関数を使用IPCおよび書き込む場合に従うべきガイドラインが少なくなります。
- IPC client V1
-
IPC クライアント V1 は、IPCサーバーと通信し、サブスクリプションハンドラーを呼び出す単一のスレッドを使用します。サブスクリプションハンドラー関数を記述するときは、この同期動作を考慮する必要があります。
-
1 つのIPCクライアントを再利用する
IPC クライアントを作成したら、開いたままにして、すべてのIPCオペレーションで再利用します。複数のクライアントを作成すると、リソースが余分に消費され、リソースリークが発生する可能性があります。
-
ブロッキングコードを非同期で実行する
スレッドがブロックされている間、IPCクライアント V1 は新しいリクエストを送信したり、新しいイベントメッセージを処理したりすることはできません。ブロッキングコードは、ハンドラー関数から実行する別個のスレッドで実行するべきです。ブロックコードには sleep
呼び出し、連続して実行されるループ、および完了までに時間がかかる同期 I/O リクエストなどがあります。
-
新しいIPCリクエストを非同期で送信する
IPC クライアント V1 は、サブスクリプションハンドラー関数内から新しいリクエストを送信できません。これは、レスポンスを待機すると、リクエストによってハンドラー関数がブロックされるためです。ハンドラー関数から実行する別のスレッドでIPCリクエストを送信する必要があります。
-
例外を処理する
IPC クライアント V1 は、サブスクリプションハンドラー関数でキャッチされていない例外を処理しません。ハンドラー関数が例外をスローすると、サブスクリプションが閉じて、例外はコンポーネントログに表示されません。ハンドラー関数で例外をキャッチして、サブスクリプションを開いたままにし、コードで発生したエラーをログに記録してください。