使 AWS IoT Device SDK 用與 Greengrass 核、其他元件和通訊 AWS IoT Core - AWS IoT Greengrass

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使 AWS IoT Device SDK 用與 Greengrass 核、其他元件和通訊 AWS IoT Core

核心裝置上執行的元件可以使用中的核 AWS IoT Greengrass 心處理序間通訊 (IPC) 程式庫,與 AWS IoT Greengrass 核心和其他 Greengrass 元件進行通訊。 AWS IoT Device SDK 若要開發和執行使用的自訂元件IPC,您必須使用連線 AWS IoT Device SDK 至 AWS IoT Greengrass Core IPC 服務並執行IPC作業。

該IPC界面支持兩種類型的操作:

  • 請求/回應

    組件向IPC服務發送請求,並接收包含請求結果的響應。

  • 訂閱

    元件會將訂閱要求傳送至IPC服務,並預期事件訊息串流以作回應。組件提供處理事件消息,錯誤和流關閉的訂閱處理程序。 AWS IoT Device SDK 包括一個處理常式介面,其中包含每個IPC作業的正確回應和事件類型。如需詳細資訊,請參閱訂閱IPC事件串流

IPC用戶端版本

在 Java 和 Python 的更高版本中SDKs, AWS IoT Greengrass 提供了IPC客戶端的改進版本,稱為IPC客戶端 V2。IPC用戶端 V2:

  • 減少使用IPC作業所需撰寫的程式碼量,並協助避免用IPC戶端 V1 可能發生的常見錯誤。

  • 在單獨的執行緒中呼叫訂閱處理常式回呼,因此您現在可以在訂閱處理常式回呼中執行封鎖程式碼,包括其他IPC函數呼叫。IPC客戶端 V1 使用相同的線程與IPC服務器進行通信,並調用訂閱處理程序回調。

  • 可讓您使用 Lambda 運算式 (Java) 或函數 (Python) 呼叫訂閱作業。IPC用戶端 V1 需要您定義訂閱處理常式類別。

  • 提供每個IPC作業的同步和非同步版本。IPC用戶端 V1 僅提供每個作業的非同步版本。

我們建議您使用用IPC戶端 V2 來利用這些改良功能。不過,本文件和某些線上內容中的許多範例只會示範如何使用 IPC Client V1。您可以使用下列範例和教學課程來查看使用 IPC Client V2 的範例元件:

目前,C ++ v2 僅支持IPC客戶端 V1。 AWS IoT Device SDK

支援SDKs處理序間通訊

AWS IoT Greengrass 核心IPC程式庫包含在以下 AWS IoT Device SDK 版本中。

Connect 至 AWS IoT Greengrass 核心IPC服務

若要在自訂元件中使用處理序間通訊,您必須建立與 AWS IoT Greengrass Core 軟體執行之IPC伺服器通訊端的連線。完成下列任務以下載並使 AWS IoT Device SDK 用您選擇的語言。

若要使 AWS IoT Device SDK 用 Java V2 (IPC客戶端 V2)
  1. 下載AWS IoT Device SDK 適用於 Java V2(版本 1.6.0 或更高版本)。

  2. 執行下列其中一項作業,在元件中執行自訂程式碼:

    • 將元件建置為包含的JAR檔案 AWS IoT Device SDK,並在元件方JAR案中執行此檔案。

    • 定義 AWS IoT Device SDK JAR為元件人工因素,並在元件方案中執行應用程式時,將該成品新增至類別路徑。

  3. 使用下面的代碼來創建IPC客戶端。

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
若要使用用 AWS IoT Device SDK 於 Python V2 (用IPC戶端 V2)
  1. 下載AWS IoT Device SDK 適用於 Python(版本 1.9.0 或更高版本)。

  2. 安裝步驟新增至元件方案中的安裝生命週期。SDK

  3. 建立與 AWS IoT Greengrass 核心IPC服務的連線。使用下面的代碼來創建IPC客戶端。

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

要為 C ++ 構建 AWS IoT Device SDK v2,設備必須具有以下工具:

  • C ++ 11 或更高版本

  • CMake3.1 或更新版本

  • 下列其中一個編譯器:

    • GCC4.8 或更新版本

    • 鐺 3.9 或更高版本

    • MSVC2015 年或以後

若要使 AWS IoT Device SDK 用 C++ V2 的
  1. 下載AWS IoT Device SDK 適用於 C ++ 第 2 版(v 1.17.0 或更高版本)。

  2. 遵循中的安裝指示 README,從原始碼建置 C++ v2。 AWS IoT Device SDK

  3. 在您的 C ++ 構建工具中,鏈接您在上一步中構建的 Greengrass IPC 庫。AWS::GreengrassIpc-cpp下列CMakeLists.txt範例會將 Greengrass 程式IPC庫連結至您使用建置的專案。CMake

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. 在元件程式碼中,建立與 AWS IoT Greengrass Core IPC 服務的連線,以建立IPC用戶端 (Aws::Greengrass::GreengrassCoreIpcClient)。您必須定義處理IPC連線、中斷連IPC線和錯誤事件的連線生命週期處理常式。下列範例會建立IPC用戶端和IPC連線生命週期處理常式,該處理常式會在IPC用戶端連線、中斷連線及發生錯誤時列印。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. 若要在元件中執行自訂程式碼,請將程式碼建置為二進位成品,然後在元件方案中執行二進位成品。將成品的Execute權限設定OWNER為允許 AWS IoT Greengrass 核心軟體執行二進位成品。

    您的組件配方的Manifests部分看起來可能類似於以下示例。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

要構建 JavaScript v2 以與 NodeJS 一起使用,設備必須具有以下工具: AWS IoT Device SDK

  • NodeJS JS 10.0 或更高版本

    • 執行node -v以檢查節點版本。

  • CMake3.1 或更新版本

若要使 AWS IoT Device SDK 用 JavaScript v2 (用IPC戶端 V1)
  1. 下載AWS IoT Device SDK 適用於第 JavaScript 2 版 (1.12.10 版或更新版本)。

  2. 遵循中的安裝指示,從來源建置 AWS IoT Device SDK for JavaScript v2。README

  3. 建立與 AWS IoT Greengrass 核心IPC服務的連線。完成下列步驟來建立用IPC戶端並建立連線。

  4. 使用下面的代碼來創建IPC客戶端。

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. 使用下面的代碼來建立從組件到 Greengrass 核的連接。

    await client.connect();

授權元件執行IPC作業

若要允許自訂元件使用某些作IPC業,您必須定義授權原則,以允許元件在特定資源上執行作業。每個授權原則都會定義作業清單以及原則允許的資源清單。例如,發佈/訂閱訊息IPC服務會定義主題資源的發佈和訂閱作業。您可以使用*萬用字元來允許存取所有作業或所有資源。

您可以使用accessControl組態參數來定義授權原則,您可以在元件方案中或部署元件時設定這些參數。accessControl物件會將IPC服務識別碼對應至授權原則清單。您可以為每個IPC服務定義多個授權原則來控制存取。每個授權策略都有一個策略 ID,它在所有元件中必須是唯一的。

提示

若要建立唯一的原則IDs,您可以結合元件名稱、IPC服務名稱和計數器。例如,名為的元件com.example.HelloWorld可能會定義兩個具有下列內容的發佈/訂閱授權原則:IDs

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授權原則使用下列格式。此物件是組accessControl態參數。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

授權原則中的萬用字元

您可以在IPC授權原則的resources元素中使用*萬用字元,以允許存取單一授權原則中的多個資源。

  • Greengrass 核心的所有版本中,您可以指定單一*字元作為資源,以允許存取所有資源。

  • Greengrass 核 v2.6.0 及更高版本中,您可以在資源中指定字符以匹配任意*字符組合。例如,您可以指定factory/1/devices/Thermostat*/status允許存取工廠中所有恆溫器裝置的狀態主題,其中每個裝置的名稱開頭為Thermostat

當您定義 AWS IoT Core MQTTIPC服務的授權原則時,您也可以使用MQTT萬用字元 (+#) 來比對多個資源。如需詳細資訊,請參閱AWS IoT Core MQTTIPC授權原則中的MQTT萬用字元。

授權政策中的配方變數

如果您使用 Greengrass 核心 v2.6.0 或更新版本,並將 true Greengrass 核心的interpolateComponentConfiguration組態選項設定為,您可以在授權原則中使用 recipe 變數。{iot:thingName}當您需要包含核心裝置名稱的授權原則時 (例如針對MQTT主題或裝置陰影),您可以使用此 recipe 變數為一組核心裝置設定單一授權原則。例如,您可以允許元件存取下列資源,以進行陰影IPC作業。

$aws/things/{iot:thingName}/shadow/

授權政策中的特殊字元

若要在授權原則中指定常值*?字元,您必須使用逸出序列。下列逸出序列會指示 AWS IoT Greengrass 核心軟體使用常值,而非字元的特殊意義。例如,*是萬用字元,符合任何字元組合。

常值字元 逸出序列 備註

*

${*}

?

${?}

AWS IoT Greengrass 目前不支援符合任何單一字?元的萬用字元。

$

${$}

使用此逸出序列來比對包含的資源${。例如,若要符合名為的資源${resourceName},您必須指定${$}{resourceName}。否則,若要比對包含的資源$,您可以使用常值$,例如允許存取以開頭的主題$aws

授權政策範例

您可以參考下列授權原則範例,協助您設定元件的授權原則。

範例 含授權原則的範例元件配方

下列範例元件方案包含定義授權原則的accessControl物件。此原則會授權com.example.HelloWorld元件發佈至主test/topic題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
範例 使用授權原則更新元件組態範例

下列部署中的組態更新範例會指定使用定義授權原則的accessControl物件來設定元件。此原則會授權com.example.HelloWorld元件發佈至主test/topic題。

Console
要合併的組態
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

下列指令會建立核心裝置的部署。

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

該文hello-world-deployment.json件包含以下JSON文檔。

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

下列 Greengrass CLI 命令會在核心裝置上建立本機部署。

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

該文hello-world-configuration.json件包含以下JSON文檔。

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

訂閱IPC事件串流

您可以使用IPC操作來訂閱 Greengrass 核心設備上的事件流。若要使用訂閱作業,請定義訂閱處理常式並建立對IPC服務的要求。然後,每當核心裝置將事件訊息串流至您的元件時,IPC用戶端就會執行訂閱處理常式的函數。

您可以關閉訂閱以停止處理事件訊息。若要這麼做,請closeStream()在您用來開啟訂閱的訂閱作業物件上呼叫 Close() (Java)、(Python) 或 (C++)。close()

AWS IoT Greengrass 核心IPC服務支援下列訂閱作業:

定義訂閱處理器

若要定義訂閱處理常式,請定義處理事件訊息、錯誤和串流結束的回呼函數。如果您使用用IPC戶端 V1,您必須在類別中定義這些函數。如果您使用用IPC戶端 V2 (在較新版本的 Java 和 Python 中提供)SDKs,您可以在不建立訂閱處理常式類別的情況下定義這些函數。

Java

如果您使用用IPC戶端 V1,則必須實作一般software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>介面。StreamEventType 是訂閱作業的事件訊息類型。定義下列函數以處理事件訊息、錯誤和資料流結束。

如果您使用用IPC戶端 V2,您可以在訂閱處理常式類別之外定義這些函數,或使用 Lambda 運算式

void onStreamEvent(StreamEventType event)

IPC用戶端收到事件訊息 (例如訊MQTT息或元件更新通知) 時所呼叫的回呼。

boolean onStreamError(Throwable error)

發生串流錯誤時,IPC用戶端呼叫的回呼。

傳回 true 以因錯誤而關閉訂閱串流,或傳回 false 以保持串流開啟。

void onStreamClosed()

IPC用戶端在串流關閉時呼叫的回呼。

Python

如果您使用 IPC Client V1,則必須擴充與訂閱作業對應的串流回應處理常式類別。 AWS IoT Device SDK 包含每個訂閱作業的訂閱處理常式類別。StreamEventType 是訂閱作業的事件訊息類型。定義下列函數以處理事件訊息、錯誤和資料流結束。

如果您使用用IPC戶端 V2,您可以在訂閱處理常式類別之外定義這些函數,或使用 Lambda 運算式

def on_stream_event(self, event: StreamEventType) -> None

IPC用戶端收到事件訊息 (例如訊MQTT息或元件更新通知) 時所呼叫的回呼。

def on_stream_error(self, error: Exception) -> bool

發生串流錯誤時,IPC用戶端呼叫的回呼。

傳回 true 以因錯誤而關閉訂閱串流,或傳回 false 以保持串流開啟。

def on_stream_closed(self) -> None

IPC用戶端在串流關閉時呼叫的回呼。

C++

實作衍生自對應於訂閱作業之串流回應處理常式類別的類別。 AWS IoT Device SDK 包含每個訂閱作業的訂閱處理常式基底類別。StreamEventType 是訂閱作業的事件訊息類型。定義下列函數以處理事件訊息、錯誤和資料流結束。

void OnStreamEvent(StreamEventType *event)

IPC用戶端收到事件訊息 (例如訊MQTT息或元件更新通知) 時所呼叫的回呼。

bool OnStreamError(OperationError *error)

發生串流錯誤時,IPC用戶端呼叫的回呼。

傳回 true 以因錯誤而關閉訂閱串流,或傳回 false 以保持串流開啟。

void OnStreamClosed()

IPC用戶端在串流關閉時呼叫的回呼。

JavaScript

實作衍生自對應於訂閱作業之串流回應處理常式類別的類別。 AWS IoT Device SDK 包含每個訂閱作業的訂閱處理常式基底類別。StreamEventType 是訂閱作業的事件訊息類型。定義下列函數以處理事件訊息、錯誤和資料流結束。

on(event: 'ended', listener: StreamingOperationEndedListener)

IPC用戶端在串流關閉時呼叫的回呼。

on(event: 'streamError', listener: StreamingRpcErrorListener)

發生串流錯誤時,IPC用戶端呼叫的回呼。

傳回 true 以因錯誤而關閉訂閱串流,或傳回 false 以保持串流開啟。

on(event: 'message', listener: (message: InboundMessageType) => void)

IPC用戶端收到事件訊息 (例如訊MQTT息或元件更新通知) 時所呼叫的回呼。

訂閱處理常式範

下列範例示範如何使用SubscribeToTopic作業和訂閱處理常式來訂閱本機發佈/訂閱訊息。

Java (IPC client V2)
範例:訂閱本機發佈/訂閱訊息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
範例:訂閱本機發佈/訂閱訊息
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
範例:訂閱本機發佈/訂閱訊息
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
範例:訂閱本機發佈/訂閱訊息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

IPC最佳做法

用戶端 V1 和用IPC戶IPC端 V2 之間,IPC在自訂元件中使用的最佳做法會有所不同。請遵循您使用之用IPC戶端版本的最佳作法。

IPC client V2

用IPC戶端 V2 會在個別的執行緒中執行回呼函式,因此與用IPC戶端 V1 相比,在使用IPC和撰寫訂閱處理常式函數時,需要遵循的準則較少。

  • 重用一個用IPC戶端

    建立用IPC戶端之後,請將其保持開啟狀態,並將其重複使用於所有IPC作業。創建多個客戶端會使用額外的資源,並可能導致資源洩漏。

  • 處理異常

    IPC用戶端 V2 會在訂閱處理常式函式中記錄未捕捉到的例外 您應該在處理常式函數中 catch 例外狀況,以處理程式碼中發生的錯誤。

IPC client V1

用IPC戶端 V1 使用與IPC伺服器通訊並呼叫訂閱處理常式的單一執行緒。撰寫訂閱處理常式函數時,必須考慮此同步行為。

  • 重用一個用IPC戶端

    建立用IPC戶端之後,請將其保持開啟狀態,並將其重複使用於所有IPC作業。創建多個客戶端會使用額外的資源,並可能導致資源洩漏。

  • 非同步執行封鎖程式碼

    當執行緒被封鎖時,IPC用戶端 V1 無法傳送新的要求或處理新的事件訊息。您應該在從處理常式函數執行的個別執行緒中執行封鎖程式碼。封鎖程式碼包括sleep呼叫、持續執行的迴圈,以及需要時間才能完成的同步 I/O 要求。

  • 異步發送新IPC請求

    用IPC戶端 V1 無法從訂閱處理常式函式中傳送新的要求,因為如果您等待回應,要求會封鎖處理常式函式。您應該在從處理程序函數運行的單獨線程中發送IPC請求。

  • 處理異常

    用IPC戶端 V1 不會處理訂閱處理常式函數中未捕捉到的例外狀況。如果您的處理常式函數擲回例外狀況,訂閱就會關閉,且例外狀況不會出現在元件記錄中。您應該在處理常式函數中 catch 例外狀況,以保持訂閱開啟狀態,並記錄程式碼中發生的錯誤。