SDK per dispositivi AWS IoT Utilizzatelo per comunicare con il nucleo Greengrass, gli altri componenti e AWS IoT Core - AWS IoT Greengrass

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

SDK per dispositivi AWS IoT Utilizzatelo per comunicare con il nucleo Greengrass, gli altri componenti e AWS IoT Core

I componenti in esecuzione sul dispositivo principale possono utilizzare la libreria AWS IoT Greengrass Core interprocess communication (IPC) SDK per dispositivi AWS IoT per comunicare con il AWS IoT Greengrass nucleus e altri componenti Greengrass. Per sviluppare ed eseguire componenti personalizzati che utilizzanoIPC, è necessario utilizzare il servizio Core SDK per dispositivi AWS IoT per connettersi al IPC servizio AWS IoT Greengrass Core ed eseguire operazioni. IPC

L'IPCinterfaccia supporta due tipi di operazioni:

  • Richiesta/risposta

    I componenti inviano una richiesta al IPC servizio e ricevono una risposta che contiene il risultato della richiesta.

  • Subscription

    I componenti inviano una richiesta di abbonamento al IPC servizio e prevedono un flusso di messaggi relativi agli eventi in risposta. I componenti forniscono un gestore di sottoscrizioni che gestisce i messaggi di eventi, gli errori e la chiusura degli stream. SDK per dispositivi AWS IoT Include un'interfaccia di gestione con la risposta e i tipi di eventi corretti per ogni IPC operazione. Per ulteriori informazioni, consulta Sottoscrivi agli stream di IPC eventi.

IPCversioni client

Nelle versioni successive di Java e PythonSDKs, AWS IoT Greengrass fornisce una versione migliorata del IPC client, chiamata IPC client V2. IPCcliente V2:

  • Riduce la quantità di codice da scrivere per utilizzare IPC le operazioni e aiuta a evitare gli errori comuni che possono verificarsi con IPC il client V1.

  • Richiama i callback del gestore delle sottoscrizioni in un thread separato, quindi ora puoi eseguire il codice di blocco, incluse le chiamate a IPC funzioni aggiuntive, nei callback del gestore delle sottoscrizioni. IPCil client V1 utilizza lo stesso thread per comunicare con il IPC server e chiamare i callback del gestore delle sottoscrizioni.

  • Consente di chiamare le operazioni di sottoscrizione utilizzando espressioni Lambda (Java) o funzioni (Python). IPCil client V1 richiede la definizione di classi di gestione delle sottoscrizioni.

  • Fornisce versioni sincrone e asincrone di ogni operazione. IPC IPCil client V1 fornisce solo versioni asincrone di ogni operazione.

Si consiglia di utilizzare il IPC client V2 per sfruttare questi miglioramenti. Tuttavia, molti esempi in questa documentazione e in alcuni contenuti online dimostrano solo come utilizzare il IPC client V1. È possibile utilizzare i seguenti esempi e tutorial per visualizzare componenti di esempio che utilizzano IPC il client V2:

Attualmente, SDK per dispositivi AWS IoT for C++ v2 supporta solo il IPC client V1.

Supportato SDKs per la comunicazione tra processi

Le IPC librerie AWS IoT Greengrass Core sono incluse nelle seguenti SDK per dispositivi AWS IoT versioni.

Connect al IPC servizio AWS IoT Greengrass Core

Per utilizzare la comunicazione tra processi nel componente personalizzato, è necessario creare una connessione a un socket IPC del server eseguito dal software AWS IoT Greengrass Core. Completate le seguenti attività per scaricarlo e utilizzarlo SDK per dispositivi AWS IoT nella lingua di vostra scelta.

Per utilizzare il SDK per dispositivi AWS IoT per Java v2 (client V2) IPC
  1. Scaricate il file SDK per dispositivi AWS IoT per Java v2 (v1.6.0 o successivo).

  2. Effettuate una delle seguenti operazioni per eseguire il codice personalizzato nel componente:

    • Crea il componente come JAR file che include ed SDK per dispositivi AWS IoT esegui questo JAR file nella ricetta del componente.

    • Definiscilo SDK per dispositivi AWS IoT JAR come elemento componente e aggiungi quell'artefatto al classpath quando esegui l'applicazione nella ricetta del componente.

  3. Usa il codice seguente per creare il client. IPC

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Per usare SDK per dispositivi AWS IoT for Python v2 (client V2) IPC
  1. Scaricate il file SDK per dispositivi AWS IoT per Python (v1.9.0 o successivo).

  2. Aggiungi i passaggi SDK di installazione al ciclo di vita dell'installazione nella ricetta del componente.

  3. Crea una connessione al servizio AWS IoT Greengrass CoreIPC. Usa il codice seguente per creare il IPC client.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Per creare la SDK per dispositivi AWS IoT v2 per C++, un dispositivo deve disporre dei seguenti strumenti:

  • C++ 11 o versione successiva

  • CMake3.1 o versione successiva

  • Uno dei seguenti compilatori:

    • GCC4.8 o versione successiva

    • Clang 3.9 o versione successiva

    • MSVC2015 o successivo

Per utilizzare la versione SDK per dispositivi AWS IoT per C++ v2
  1. Scarica la versione SDK per dispositivi AWS IoT per C++ v2 (v1.17.0 o successiva).

  2. Segui le istruzioni di installazione contenute in per compilare il file README per C++ v2 dal codice sorgente SDK per dispositivi AWS IoT .

  3. Nel tuo strumento di compilazione C++, collega la libreria IPC GreengrassAWS::GreengrassIpc-cpp, che hai creato nel passaggio precedente. L'CMakeLists.txtesempio seguente collega la IPC libreria Greengrass a un progetto con cui crei. CMake

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. Nel codice del componente, create una connessione al IPC servizio AWS IoT Greengrass Core per creare un IPC client (Aws::Greengrass::GreengrassCoreIpcClient). È necessario definire un gestore del ciclo di vita della IPC connessione che gestisca gli eventi di IPC connessione, disconnessione ed errore. L'esempio seguente crea un IPC client e un gestore del ciclo di vita della IPC connessione che stampa quando il client si connette, si disconnette e riscontra IPC errori.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Per eseguire il codice personalizzato nel componente, crea il codice come elemento binario ed esegui l'artefatto binario nella ricetta del componente. Imposta l'Executeautorizzazione dell'artefatto per consentire al software AWS IoT Greengrass Core di OWNER eseguire l'artefatto binario.

    La Manifests sezione relativa alla ricetta del componente potrebbe essere simile all'esempio seguente.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Per creare SDK per dispositivi AWS IoT for JavaScript v2 da utilizzare con NodeJS, un dispositivo deve disporre dei seguenti strumenti:

  • NodeJS 10.0 o versione successiva

    • Esegui node -v per controllare la versione di Node.

  • CMake3.1 o versione successiva

Per utilizzare SDK per dispositivi AWS IoT for JavaScript v2 (IPCclient V1)
  1. Scarica la versione SDK per dispositivi AWS IoT per JavaScript v2 (v1.12.10 o successiva).

  2. Segui le istruzioni di installazione riportate in README per compilare la versione v2 dal codice sorgente SDK per dispositivi AWS IoT . JavaScript

  3. Crea una connessione al IPC servizio AWS IoT Greengrass Core. Completa i seguenti passaggi per creare il IPC client e stabilire una connessione.

  4. Usa il codice seguente per creare il IPC client.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Usa il codice seguente per stabilire una connessione dal tuo componente al nucleo Greengrass.

    await client.connect();

Autorizza i componenti a eseguire operazioni IPC

Per consentire ai componenti personalizzati di utilizzare alcune IPC operazioni, è necessario definire politiche di autorizzazione che consentano al componente di eseguire l'operazione su determinate risorse. Ogni politica di autorizzazione definisce un elenco di operazioni e un elenco di risorse consentite dalla politica. Ad esempio, il IPC servizio di messaggistica publish/subscribe definisce le operazioni di pubblicazione e sottoscrizione per le risorse tematiche. È possibile utilizzare il carattere * jolly per consentire l'accesso a tutte le operazioni o a tutte le risorse.

Le politiche di autorizzazione vengono definite con il parametro di accessControl configurazione, che è possibile impostare nella ricetta del componente o quando si distribuisce il componente. L'accessControloggetto mappa gli identificatori IPC del servizio su elenchi di politiche di autorizzazione. È possibile definire più politiche di autorizzazione per ogni IPC servizio per controllare l'accesso. Ogni politica di autorizzazione ha un ID di policy, che deve essere univoco tra tutti i componenti.

Suggerimento

Per creare una politica unicaIDs, è possibile combinare il nome del componente, il nome del IPC servizio e un contatore. Ad esempio, un componente denominato com.example.HelloWorld potrebbe definire due politiche di autorizzazione di pubblicazione/sottoscrizione con le seguenti: IDs

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Le politiche di autorizzazione utilizzano il seguente formato. Questo oggetto è il parametro accessControl di configurazione.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Wildcard nelle politiche di autorizzazione

È possibile utilizzare il carattere * jolly nell'resourceselemento delle politiche di IPC autorizzazione per consentire l'accesso a più risorse in un'unica politica di autorizzazione.

  • In tutte le versioni del nucleo Greengrass, è possibile specificare un singolo * carattere come risorsa per consentire l'accesso a tutte le risorse.

  • In Greengrass nucleus v2.6.0 e versioni successive, puoi specificare il carattere in una risorsa in modo che corrisponda a qualsiasi combinazione di * caratteri. Ad esempio, è possibile specificare di consentire l'accesso factory/1/devices/Thermostat*/status a un argomento di stato per tutti i dispositivi termostati di una fabbrica, dove il nome di ogni dispositivo inizia con. Thermostat

Quando si definiscono le politiche di autorizzazione per il AWS IoT Core MQTT IPC servizio, è anche possibile utilizzare caratteri MQTT jolly (+e#) per abbinare più risorse. Per ulteriori informazioni, consulta i caratteri MQTTjolly nelle politiche di AWS IoT Core MQTT IPC autorizzazione.

Variabili di ricetta nelle politiche di autorizzazione

Se si utilizza Greengrass nucleus v2.6.0 o versione successiva e si imposta l'opzione di interpolateComponentConfigurationconfigurazione di Greengrass nucleus sutrue, è possibile utilizzare la variabile recipe nelle politiche di autorizzazione. {iot:thingName} Quando è necessaria una politica di autorizzazione che includa il nome del dispositivo principale, ad esempio per MQTT argomenti o ombre del dispositivo, è possibile utilizzare questa variabile recipe per configurare un'unica politica di autorizzazione per un gruppo di dispositivi principali. Ad esempio, è possibile consentire a un componente l'accesso alla seguente risorsa per IPC le operazioni shadow.

$aws/things/{iot:thingName}/shadow/

Caratteri speciali nelle politiche di autorizzazione

Per specificare un valore letterale * o un ? carattere in una politica di autorizzazione, è necessario utilizzare una sequenza di escape. Le seguenti sequenze di escape indicano al software AWS IoT Greengrass Core di utilizzare il valore letterale anziché il significato speciale del carattere. Ad esempio, il * carattere è un jolly che corrisponde a qualsiasi combinazione di caratteri.

Carattere letterale Sequenza di escape Note

*

${*}

?

${?}

AWS IoT Greengrass attualmente non supporta il carattere ? jolly, che corrisponde a qualsiasi carattere singolo.

$

${$}

Usa questa sequenza di escape per abbinare una risorsa che contiene${. Ad esempio, per abbinare una risorsa denominata${resourceName}, è necessario specificare${$}{resourceName}. Altrimenti, per far corrispondere una risorsa che contiene$, è possibile utilizzare un valore letterale$, ad esempio per consentire l'accesso a un argomento che inizia con$aws.

Esempi di politiche di autorizzazione

Puoi fare riferimento ai seguenti esempi di politiche di autorizzazione per aiutarti a configurare le politiche di autorizzazione per i tuoi componenti.

Esempio di ricetta dei componenti con una politica di autorizzazione

La seguente ricetta di componenti di esempio include un accessControl oggetto che definisce una politica di autorizzazione. Questa politica autorizza il com.example.HelloWorld componente a pubblicare sull'test/topicargomento.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
Esempio di aggiornamento della configurazione del componente con una politica di autorizzazione

L'esempio seguente di aggiornamento della configurazione in una distribuzione specifica di configurare un componente con un accessControl oggetto che definisce una politica di autorizzazione. Questa politica autorizza il com.example.HelloWorld componente a pubblicare sull'test/topicargomento.

Console
Configurazione da unire
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

Il comando seguente crea una distribuzione su un dispositivo principale.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

Il hello-world-deployment.json file contiene il seguente JSON documento.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

Il seguente CLI comando Greengrass crea una distribuzione locale su un dispositivo principale.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

Il hello-world-configuration.json file contiene il seguente JSON documento.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Sottoscrivi agli stream di IPC eventi

Puoi utilizzare IPC le operazioni per iscriverti ai flussi di eventi su un dispositivo core Greengrass. Per utilizzare un'operazione di sottoscrizione, definisci un gestore di sottoscrizioni e crea una richiesta al servizio. IPC Quindi, il IPC client esegue le funzioni del gestore delle sottoscrizioni ogni volta che il dispositivo principale trasmette un messaggio di evento al componente.

È possibile chiudere un abbonamento per interrompere l'elaborazione dei messaggi relativi agli eventi. A tale scopo, chiamate closeStream() (Java), close() (Python) o Close() (C++) sull'oggetto dell'operazione di sottoscrizione che avete usato per aprire l'abbonamento.

Il IPC servizio AWS IoT Greengrass Core supporta le seguenti operazioni di sottoscrizione:

Definire i gestori delle sottoscrizioni

Per definire un gestore di sottoscrizioni, definite le funzioni di callback che gestiscono i messaggi di evento, gli errori e la chiusura dei flussi. Se si utilizza IPC il client V1, è necessario definire queste funzioni in una classe. Se si utilizza IPC il client V2, disponibile nelle versioni successive di Java e SDKs Python, è possibile definire queste funzioni senza creare una classe di gestione delle sottoscrizioni.

Java

Se si utilizza IPC il client V1, è necessario implementare l'interfaccia generica. software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> StreamEventType è il tipo di messaggio di evento per l'operazione di sottoscrizione. Definite le seguenti funzioni per gestire i messaggi di evento, gli errori e la chiusura dei flussi.

Se si utilizza IPC il client V2, è possibile definire queste funzioni al di fuori di una classe di gestione delle sottoscrizioni o utilizzare espressioni lambda.

void onStreamEvent(StreamEventType event)

Il callback che il IPC client chiama quando riceve un messaggio di evento, ad esempio un MQTT messaggio o una notifica di aggiornamento del componente.

boolean onStreamError(Throwable error)

Il callback che il IPC client chiama quando si verifica un errore di stream.

Restituisci true per chiudere il flusso di sottoscrizioni a causa dell'errore o restituisci false per mantenere aperto lo stream.

void onStreamClosed()

Il callback che il IPC client chiama alla chiusura dello stream.

Python

Se si utilizza IPC il client V1, è necessario estendere la classe stream response handler che corrisponde all'operazione di sottoscrizione. SDK per dispositivi AWS IoT Include una classe di gestione delle sottoscrizioni per ogni operazione di sottoscrizione. StreamEventType è il tipo di messaggio di evento per l'operazione di sottoscrizione. Definite le seguenti funzioni per gestire i messaggi di evento, gli errori e la chiusura dei flussi.

Se si utilizza IPC il client V2, è possibile definire queste funzioni al di fuori di una classe di gestione delle sottoscrizioni o utilizzare espressioni lambda.

def on_stream_event(self, event: StreamEventType) -> None

Il callback che il IPC client chiama quando riceve un messaggio di evento, ad esempio un MQTT messaggio o una notifica di aggiornamento del componente.

def on_stream_error(self, error: Exception) -> bool

Il callback che il IPC client chiama quando si verifica un errore di stream.

Restituisci true per chiudere il flusso di sottoscrizioni a causa dell'errore o restituisci false per mantenere aperto lo stream.

def on_stream_closed(self) -> None

Il callback che il IPC client chiama alla chiusura dello stream.

C++

Implementa una classe che deriva dalla classe stream response handler che corrisponde all'operazione di sottoscrizione. SDK per dispositivi AWS IoT Include una classe base di gestione delle sottoscrizioni per ogni operazione di sottoscrizione. StreamEventType è il tipo di messaggio di evento per l'operazione di sottoscrizione. Definite le seguenti funzioni per gestire i messaggi di evento, gli errori e la chiusura dei flussi.

void OnStreamEvent(StreamEventType *event)

Il callback che il IPC client chiama quando riceve un messaggio di evento, ad esempio un MQTT messaggio o una notifica di aggiornamento di un componente.

bool OnStreamError(OperationError *error)

Il callback che il IPC client chiama quando si verifica un errore di stream.

Restituisci true per chiudere il flusso di sottoscrizioni a causa dell'errore o restituisci false per mantenere aperto lo stream.

void OnStreamClosed()

Il callback che il IPC client chiama alla chiusura dello stream.

JavaScript

Implementa una classe che deriva dalla classe stream response handler che corrisponde all'operazione di sottoscrizione. SDK per dispositivi AWS IoT Include una classe base di gestione delle sottoscrizioni per ogni operazione di sottoscrizione. StreamEventType è il tipo di messaggio di evento per l'operazione di sottoscrizione. Definite le seguenti funzioni per gestire i messaggi di evento, gli errori e la chiusura dei flussi.

on(event: 'ended', listener: StreamingOperationEndedListener)

Il callback che il IPC client chiama alla chiusura dello stream.

on(event: 'streamError', listener: StreamingRpcErrorListener)

Il callback che il IPC client chiama quando si verifica un errore di stream.

Restituisci true per chiudere il flusso di sottoscrizioni a causa dell'errore o restituisci false per mantenere aperto lo stream.

on(event: 'message', listener: (message: InboundMessageType) => void)

Il callback che il IPC client chiama quando riceve un messaggio di evento, ad esempio un MQTT messaggio o una notifica di aggiornamento del componente.

Esempi di gestori di sottoscrizioni

L'esempio seguente mostra come utilizzare l'SubscribeToTopicoperazione e un gestore di sottoscrizioni per sottoscrivere i messaggi di pubblicazione/sottoscrizione locali.

Java (IPC client V2)
Esempio: iscriversi ai messaggi locali di pubblicazione/sottoscrizione
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
Esempio: iscriversi ai messaggi locali di pubblicazione/sottoscrizione
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
Esempio: iscriversi ai messaggi locali di pubblicazione/sottoscrizione
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Esempio: iscriversi ai messaggi locali di pubblicazione/sottoscrizione
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

IPCmigliori pratiche

Le migliori pratiche per l'utilizzo IPC nei componenti personalizzati differiscono tra IPC client V1 e IPC client V2. Segui le migliori pratiche per la versione IPC client che utilizzi.

IPC client V2

Il IPC client V2 esegue le funzioni di callback in un thread separato, quindi rispetto al IPC client V1, ci sono meno linee guida da seguire quando si utilizzano IPC e si scrivono le funzioni di gestione delle sottoscrizioni.

  • IPCRiutilizza un client

    Dopo aver creato un IPC client, tienilo aperto e riutilizzalo per tutte le IPC operazioni. La creazione di più client utilizza risorse aggiuntive e può causare perdite di risorse.

  • Gestisci le eccezioni

    Il IPC client V2 registra le eccezioni non rilevate nelle funzioni di gestione delle sottoscrizioni. È necessario catturare le eccezioni nelle funzioni del gestore per gestire gli errori che si verificano nel codice.

IPC client V1

Il IPC client V1 utilizza un singolo thread che comunica con il IPC server e chiama i gestori di sottoscrizione. È necessario considerare questo comportamento sincrono quando si scrivono funzioni di gestione delle sottoscrizioni.

  • Riutilizza un client IPC

    Dopo aver creato un IPC client, tienilo aperto e riutilizzalo per tutte le IPC operazioni. La creazione di più client utilizza risorse aggiuntive e può causare perdite di risorse.

  • Esegui il codice di blocco in modo asincrono

    Il IPC client V1 non può inviare nuove richieste o elaborare nuovi messaggi di eventi mentre il thread è bloccato. È necessario eseguire il codice di blocco in un thread separato eseguito dalla funzione di gestione. Il codice di blocco include sleep chiamate, loop in esecuzione continua e richieste di I/O sincrone il cui completamento richiede tempo.

  • Invia nuove richieste in modo asincrono IPC

    Il IPC client V1 non può inviare una nuova richiesta dall'interno delle funzioni di gestione delle sottoscrizioni, poiché la richiesta blocca la funzione di gestione se si attende una risposta. È necessario inviare IPC le richieste in un thread separato eseguito dalla funzione di gestione.

  • Gestisci le eccezioni

    Il IPC client V1 non gestisce le eccezioni non rilevate nelle funzioni di gestione delle sottoscrizioni. Se la funzione di gestione genera un'eccezione, l'abbonamento si chiude e l'eccezione non viene visualizzata nei registri dei componenti. È necessario catturare le eccezioni nelle funzioni del gestore per mantenere aperto l'abbonamento e registrare gli errori che si verificano nel codice.