Verwenden Sie den AWS IoT Device SDK , um mit dem Greengrass-Kern und anderen Komponenten zu kommunizieren und AWS IoT Core - AWS IoT Greengrass

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden Sie den AWS IoT Device SDK , um mit dem Greengrass-Kern und anderen Komponenten zu kommunizieren und AWS IoT Core

Komponenten, die auf Ihrem Core-Gerät ausgeführt werden, können die AWS IoT Greengrass Core Interprocess Communication (IPC) -Bibliothek in der verwenden, AWS IoT Device SDK um mit dem AWS IoT Greengrass Nucleus und anderen Greengrass-Komponenten zu kommunizieren. Um benutzerdefinierte Komponenten zu entwickeln und auszuführen, die IPC verwenden, müssen Sie den verwenden, AWS IoT Device SDK um eine Verbindung zum AWS IoT Greengrass Core IPC-Dienst herzustellen und IPC-Operationen durchzuführen.

Die IPC-Schnittstelle unterstützt zwei Arten von Operationen:

  • Anfrage/Antwort

    Komponenten senden eine Anfrage an den IPC-Dienst und erhalten eine Antwort, die das Ergebnis der Anfrage enthält.

  • Abonnement

    Komponenten senden eine Abonnementanfrage an den IPC-Dienst und erwarten als Antwort einen Strom von Ereignisnachrichten. Komponenten stellen einen Abonnement-Handler bereit, der Ereignismeldungen, Fehler und das Schließen von Streams verarbeitet. Das AWS IoT Device SDK beinhaltet eine Handler-Schnittstelle mit den richtigen Antwort- und Ereignistypen für jeden IPC-Vorgang. Weitere Informationen finden Sie unter Abonnieren Sie IPC-Event-Streams.

IPC-Client-Versionen

In späteren Versionen der Java- und Python-SDKs wird eine verbesserte Version des IPC-Clients AWS IoT Greengrass bereitgestellt, die als IPC-Client V2 bezeichnet wird. IPC-Client V2:

  • Reduziert die Menge an Code, die Sie für die Verwendung von IPC-Operationen schreiben müssen, und trägt dazu bei, häufige Fehler zu vermeiden, die beim IPC-Client V1 auftreten können.

  • Ruft Abonnement-Handler-Callbacks in einem separaten Thread auf, sodass Sie jetzt Blockierungscode, einschließlich zusätzlicher IPC-Funktionsaufrufen, in Abonnement-Handler-Callbacks ausführen können. Der IPC-Client V1 verwendet denselben Thread, um mit dem IPC-Server zu kommunizieren und Abonnement-Handler-Callbacks aufzurufen.

  • Ermöglicht das Aufrufen von Abonnementvorgängen mithilfe von Lambda-Ausdrücken (Java) oder Funktionen (Python). Für den IPC-Client V1 müssen Sie Abonnement-Handler-Klassen definieren.

  • Stellt synchrone und asynchrone Versionen der einzelnen IPC-Operationen bereit. Der IPC-Client V1 stellt nur asynchrone Versionen der einzelnen Operationen bereit.

Wir empfehlen, dass Sie den IPC-Client V2 verwenden, um diese Verbesserungen nutzen zu können. Viele Beispiele in dieser Dokumentation und in einigen Online-Inhalten zeigen jedoch nur, wie der IPC-Client V1 verwendet wird. Anhand der folgenden Beispiele und Tutorials können Sie sich die Beispielkomponenten ansehen, die den IPC-Client V2 verwenden:

Derzeit unterstützt der AWS IoT Device SDK für C++ v2 nur den IPC-Client V1.

Unterstützte SDKs für die Kommunikation zwischen Prozessen

Die AWS IoT Greengrass IPC-Kernbibliotheken sind in den folgenden Versionen enthalten. AWS IoT Device SDK

Connect zum AWS IoT Greengrass Core IPC-Dienst her

Um die Interprozesskommunikation in Ihrer benutzerdefinierten Komponente zu verwenden, müssen Sie eine Verbindung zu einem IPC-Server-Socket herstellen, auf dem die AWS IoT Greengrass Core-Software ausgeführt wird. Führen Sie die folgenden Aufgaben aus, um das herunterzuladen und AWS IoT Device SDK in der Sprache Ihrer Wahl zu verwenden.

Um den AWS IoT Device SDK für Java v2 (IPC-Client V2) zu verwenden
  1. Laden Sie das AWS IoT Device SDK für Java v2 (v1.6.0 oder höher) herunter.

  2. Führen Sie einen der folgenden Schritte aus, um Ihren benutzerdefinierten Code in Ihrer Komponente auszuführen:

    • Erstellen Sie Ihre Komponente als JAR-Datei, die die enthält AWS IoT Device SDK, und führen Sie diese JAR-Datei in Ihrem Komponentenrezept aus.

    • Definieren Sie das AWS IoT Device SDK JAR als Komponentenartefakt und fügen Sie dieses Artefakt dem Klassenpfad hinzu, wenn Sie Ihre Anwendung in Ihrem Komponentenrezept ausführen.

  3. Verwenden Sie den folgenden Code, um den IPC-Client zu erstellen.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Um das AWS IoT Device SDK für Python v2 (IPC-Client V2) zu verwenden
  1. Laden Sie das AWS IoT Device SDK für Python herunter (v1.9.0 oder höher).

  2. Fügen Sie die Installationsschritte des SDK zum Installationslebenszyklus im Rezept Ihrer Komponente hinzu.

  3. Stellen Sie eine Verbindung zum AWS IoT Greengrass Core IPC-Dienst her. Verwenden Sie den folgenden Code, um den IPC-Client zu erstellen.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Um die AWS IoT Device SDK Version 2 für C++ zu erstellen, muss ein Gerät über die folgenden Tools verfügen:

  • C++ 11 oder höher

  • CMake 3.1 oder höher

  • Einer der folgenden Compiler:

    • GCC 4.8 oder höher

    • Clang 3.9 oder höher

    • MSVC 2015 oder später

Um das AWS IoT Device SDK für C++ v2 zu verwenden
  1. Laden Sie das AWS IoT Device SDK für C++ v2 (v1.17.0 oder höher) herunter.

  2. Folgen Sie den Installationsanweisungen in der README-Datei, um das AWS IoT Device SDK für C++ v2 aus dem Quellcode zu erstellen.

  3. Verlinken Sie in Ihrem C++-Build-Tool die Greengrass IPC-Bibliothek,AWS::GreengrassIpc-cpp, die Sie im vorherigen Schritt erstellt haben. Das folgende CMakeLists.txt Beispiel verknüpft die Greengrass IPC-Bibliothek mit einem Projekt, das Sie mit CMake erstellen.

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. Stellen Sie in Ihrem Komponentencode eine Verbindung zum AWS IoT Greengrass Core IPC-Dienst her, um einen IPC-Client zu erstellen (). Aws::Greengrass::GreengrassCoreIpcClient Sie müssen einen IPC-Verbindungslebenszyklus-Handler definieren, der IPC-Verbindungs-, Verbindungstrennungs- und Fehlerereignisse behandelt. Im folgenden Beispiel werden ein IPC-Client und ein IPC-Verbindungslebenszyklus-Handler erstellt, der druckt, wenn der IPC-Client eine Verbindung herstellt oder die Verbindung trennt und auf Fehler stößt.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Um Ihren benutzerdefinierten Code in Ihrer Komponente auszuführen, erstellen Sie Ihren Code als binäres Artefakt und führen Sie das binäre Artefakt in Ihrem Komponentenrezept aus. Legen Sie die Execute Berechtigung des Artefakts auf fest, OWNER damit die AWS IoT Greengrass Core-Software das binäre Artefakt ausführen kann.

    Der Manifests Abschnitt Ihres Komponentenrezepts könnte dem folgenden Beispiel ähneln.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Um das AWS IoT Device SDK für JavaScript v2 für die Verwendung mit NodeJS zu erstellen, muss ein Gerät über die folgenden Tools verfügen:

  • NodeJS 10.0 oder höher

    • Führen Sie ausnode -v, um die Node-Version zu überprüfen.

  • CMake 3.1 oder höher

Um das AWS IoT Device SDK für JavaScript v2 zu verwenden (IPC-Client V1)
  1. Laden Sie das AWS IoT Device SDK für JavaScript v2 herunter (v1.12.10 oder höher).

  2. Folgen Sie den Installationsanweisungen in der README-Datei, um das AWS IoT Device SDK für JavaScript Version 2 aus dem Quellcode zu erstellen.

  3. Stellen Sie eine Verbindung zum AWS IoT Greengrass Core IPC-Dienst her. Gehen Sie wie folgt vor, um den IPC-Client zu erstellen und eine Verbindung herzustellen.

  4. Verwenden Sie den folgenden Code, um den IPC-Client zu erstellen.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Verwenden Sie den folgenden Code, um eine Verbindung von Ihrer Komponente zum Greengrass-Kern herzustellen.

    await client.connect();

Autorisieren Sie Komponenten zur Ausführung von IPC-Vorgängen

Damit Ihre benutzerdefinierten Komponenten einige IPC-Operationen verwenden können, müssen Sie Autorisierungsrichtlinien definieren, die es der Komponente ermöglichen, den Vorgang auf bestimmten Ressourcen auszuführen. Jede Autorisierungsrichtlinie definiert eine Liste von Vorgängen und eine Liste von Ressourcen, die die Richtlinie zulässt. Der IPC-Dienst zum Veröffentlichen und Abonnieren von Nachrichten definiert beispielsweise Veröffentlichungs- und Abonnementvorgänge für Themenressourcen. Sie können den * Platzhalter verwenden, um den Zugriff auf alle Operationen oder alle Ressourcen zu ermöglichen.

Autorisierungsrichtlinien definieren Sie mit dem accessControl Konfigurationsparameter, den Sie im Komponentenrezept oder bei der Bereitstellung der Komponente festlegen können. Das accessControl Objekt ordnet IPC-Dienstkennungen Listen von Autorisierungsrichtlinien zu. Sie können mehrere Autorisierungsrichtlinien für jeden IPC-Dienst definieren, um den Zugriff zu kontrollieren. Jede Autorisierungsrichtlinie hat eine Richtlinien-ID, die für alle Komponenten eindeutig sein muss.

Tipp

Um eindeutige Richtlinien-IDs zu erstellen, können Sie den Komponentennamen, den IPC-Dienstnamen und einen Zähler kombinieren. Eine Komponente mit dem Namen com.example.HelloWorld könnte beispielsweise zwei Autorisierungsrichtlinien für das Publizieren/Abonnieren mit den folgenden IDs definieren:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Autorisierungsrichtlinien verwenden das folgende Format. Dieses Objekt ist der accessControl Konfigurationsparameter.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Platzhalter in Autorisierungsrichtlinien

Sie können den * Platzhalter im resources Element der IPC-Autorisierungsrichtlinien verwenden, um den Zugriff auf mehrere Ressourcen in einer einzigen Autorisierungsrichtlinie zu ermöglichen.

  • In allen Versionen von Greengrass Nucleus können Sie ein einzelnes * Zeichen als Ressource angeben, um Zugriff auf alle Ressourcen zu gewähren.

  • In Greengrass Nucleus v2.6.0 und höher können Sie den * Charakter in einer Ressource so angeben, dass er einer beliebigen Zeichenkombination entspricht. Sie können beispielsweise angeben, dass der factory/1/devices/Thermostat*/status Zugriff auf ein Statusthema für alle Thermostatgeräte in einer Fabrik gewährt werden soll, wobei der Name jedes Geräts mit beginnt. Thermostat

Wenn Sie Autorisierungsrichtlinien für den AWS IoT Core MQTT-IPC-Dienst definieren, können Sie auch MQTT-Platzhalter (+und#) verwenden, um mehrere Ressourcen zuzuordnen. Weitere Informationen finden Sie unter MQTT-Platzhalter in MQTT-IPC-Autorisierungsrichtlinien. AWS IoT Core

Rezeptvariablen in Autorisierungsrichtlinien

Wenn Sie Greengrass Nucleus v2.6.0 oder höher verwenden und die interpolateComponentConfigurationKonfigurationsoption von Greengrass Nucleus auf einstellentrue, können Sie die {iot:thingName} Rezeptvariable in Autorisierungsrichtlinien verwenden. Wenn Sie eine Autorisierungsrichtlinie benötigen, die den Namen des Kerngeräts enthält, z. B. für MQTT-Themen oder Geräteschatten, können Sie diese Rezeptvariable verwenden, um eine einzelne Autorisierungsrichtlinie für eine Gruppe von Kerngeräten zu konfigurieren. Beispielsweise können Sie einer Komponente den Zugriff auf die folgende Ressource für Shadow-IPC-Operationen gewähren.

$aws/things/{iot:thingName}/shadow/

Sonderzeichen in Autorisierungsrichtlinien

Um ein Literal * oder ein ? Zeichen in einer Autorisierungsrichtlinie anzugeben, müssen Sie eine Escape-Sequenz verwenden. Die folgenden Escape-Sequenzen weisen die AWS IoT Greengrass Core-Software an, den Literalwert anstelle der speziellen Bedeutung des Zeichens zu verwenden. Das * Zeichen ist beispielsweise ein Platzhalter, der einer beliebigen Kombination von Zeichen entspricht.

Wörtliches Zeichen Escape-Sequenz Hinweise

*

${*}

?

${?}

AWS IoT Greengrass unterstützt derzeit nicht den ? Platzhalter, der einem einzelnen Zeichen entspricht.

$

${$}

Verwenden Sie diese Escape-Sequenz, um nach einer Ressource zu suchen, die enthält${. Um beispielsweise einer Ressource mit dem Namen zu entsprechen${resourceName}, müssen Sie Folgendes ${$}{resourceName} angeben: Andernfalls können Sie für die Suche nach einer Ressource$, die Folgendes enthält, ein Literal verwenden$, z. B. um Zugriff auf ein Thema zu gewähren, das mit $aws beginnt.

Beispiele für Autorisierungsrichtlinien

Anhand der folgenden Beispiele für Autorisierungsrichtlinien können Sie Autorisierungsrichtlinien für Ihre Komponenten konfigurieren.

Beispiel für ein Komponentenrezept mit einer Autorisierungsrichtlinie

Das folgende Beispiel für ein Komponentenrezept enthält ein accessControl Objekt, das eine Autorisierungsrichtlinie definiert. Diese Richtlinie autorisiert die com.example.HelloWorld Komponente, unter dem test/topic Thema zu veröffentlichen.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
Beispiel für ein Update der Komponentenkonfiguration mit einer Autorisierungsrichtlinie

Das folgende Beispiel für ein Konfigurationsupdate in einer Bereitstellung gibt an, dass eine Komponente mit einem accessControl Objekt konfiguriert werden soll, das eine Autorisierungsrichtlinie definiert. Diese Richtlinie autorisiert die com.example.HelloWorld Komponente, unter dem test/topic Thema zu veröffentlichen.

Console
Konfiguration zum Zusammenführen
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

Der folgende Befehl erstellt eine Bereitstellung auf einem Core-Gerät.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

Die hello-world-deployment.json Datei enthält das folgende JSON-Dokument.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

Der folgende Greengrass-CLI-Befehl erstellt eine lokale Bereitstellung auf einem Core-Gerät.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

Die hello-world-configuration.json Datei enthält das folgende JSON-Dokument.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Abonnieren Sie IPC-Event-Streams

Sie können IPC-Operationen verwenden, um Streams von Ereignissen auf einem Greengrass-Core-Gerät zu abonnieren. Um einen Abonnement-Vorgang zu verwenden, definieren Sie einen Abonnement-Handler und erstellen Sie eine Anfrage an den IPC-Dienst. Anschließend führt der IPC-Client die Funktionen des Abonnement-Handlers jedes Mal aus, wenn das Kerngerät eine Ereignismeldung an Ihre Komponente streamt.

Sie können ein Abonnement schließen, um die Verarbeitung von Ereignismeldungen zu beenden. Rufen Sie dazu closeStream() (Java), (Python) oder close() Close() (C++) für das Objekt für den Abonnementvorgang auf, mit dem Sie das Abonnement geöffnet haben.

Der AWS IoT Greengrass Core IPC-Dienst unterstützt die folgenden Abonnementvorgänge:

Definieren Sie Abonnement-Handler

Um einen Abonnement-Handler zu definieren, definieren Sie Callback-Funktionen, die Ereignismeldungen, Fehler und das Schließen von Streams behandeln. Wenn Sie den IPC-Client V1 verwenden, müssen Sie diese Funktionen in einer Klasse definieren. Wenn Sie den IPC-Client V2 verwenden, der in späteren Versionen der Java- und Python-SDKs verfügbar ist, können Sie diese Funktionen definieren, ohne eine Abonnement-Handler-Klasse zu erstellen.

Java

Wenn Sie den IPC-Client V1 verwenden, müssen Sie die generische Schnittstelle implementieren. software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> StreamEventTypeist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und das Schließen von Streams zu behandeln.

Wenn Sie den IPC-Client V2 verwenden, können Sie diese Funktionen außerhalb einer Abonnement-Handler-Klasse definieren oder Lambda-Ausdrücke verwenden.

void onStreamEvent(StreamEventType event)

Der Callback, den der IPC-Client aufruft, wenn er eine Ereignisnachricht empfängt, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über ein Komponenten-Update.

boolean onStreamError(Throwable error)

Der Callback, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder geben Sie false zurück, um den Stream geöffnet zu lassen.

void onStreamClosed()

Der Callback, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

Python

Wenn Sie den IPC-Client V1 verwenden, müssen Sie die Stream-Response-Handler-Klasse erweitern, die dem Abonnementvorgang entspricht. Die AWS IoT Device SDK beinhaltet eine Abonnement-Handler-Klasse für jeden Abonnementvorgang. StreamEventTypeist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und das Schließen von Streams zu behandeln.

Wenn Sie den IPC-Client V2 verwenden, können Sie diese Funktionen außerhalb einer Abonnement-Handler-Klasse definieren oder Lambda-Ausdrücke verwenden.

def on_stream_event(self, event: StreamEventType) -> None

Der Callback, den der IPC-Client aufruft, wenn er eine Ereignisnachricht empfängt, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über ein Komponenten-Update.

def on_stream_error(self, error: Exception) -> bool

Der Callback, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder geben Sie false zurück, um den Stream geöffnet zu lassen.

def on_stream_closed(self) -> None

Der Callback, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

C++

Implementieren Sie eine Klasse, die von der Stream-Response-Handler-Klasse abgeleitet ist, die dem Abonnementvorgang entspricht. Die AWS IoT Device SDK beinhaltet eine Abonnement-Handler-Basisklasse für jeden Abonnementvorgang. StreamEventTypeist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und das Schließen von Streams zu behandeln.

void OnStreamEvent(StreamEventType *event)

Der Callback, den der IPC-Client aufruft, wenn er eine Ereignisnachricht empfängt, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über ein Komponenten-Update.

bool OnStreamError(OperationError *error)

Der Callback, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder geben Sie false zurück, um den Stream geöffnet zu lassen.

void OnStreamClosed()

Der Callback, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

JavaScript

Implementieren Sie eine Klasse, die von der Stream-Response-Handler-Klasse abgeleitet ist, die dem Abonnementvorgang entspricht. Die AWS IoT Device SDK beinhaltet eine Abonnement-Handler-Basisklasse für jeden Abonnementvorgang. StreamEventTypeist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und das Schließen von Streams zu behandeln.

on(event: 'ended', listener: StreamingOperationEndedListener)

Der Callback, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

on(event: 'streamError', listener: StreamingRpcErrorListener)

Der Callback, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder geben Sie false zurück, um den Stream geöffnet zu lassen.

on(event: 'message', listener: (message: InboundMessageType) => void)

Der Callback, den der IPC-Client aufruft, wenn er eine Ereignisnachricht empfängt, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über ein Komponenten-Update.

Beispiel für Abonnement-Handler

Das folgende Beispiel zeigt, wie der SubscribeToTopic Vorgang und ein Abonnement-Handler verwendet werden, um lokale Veröffentlichungs-/Abonnementnachrichten zu abonnieren.

Java (IPC client V2)
Beispiel: Abonnieren Sie lokale Publish/Subscribe-Nachrichten
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
Beispiel: Lokale Nachrichten zum Publizieren/Abonnieren abonnieren
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
Beispiel: Lokale Nachrichten zum Publizieren/Abonnieren abonnieren
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Beispiel: Lokale Nachrichten zum Publizieren/Abonnieren abonnieren
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

Bewährte IPC-Praktiken

Die bewährten Methoden für die Verwendung von IPC in benutzerdefinierten Komponenten unterscheiden sich zwischen IPC-Client V1 und IPC-Client V2. Folgen Sie den bewährten Methoden für die IPC-Client-Version, die Sie verwenden.

IPC client V2

Der IPC-Client V2 führt Callback-Funktionen in einem separaten Thread aus. Im Vergleich zu IPC-Client V1 gibt es also weniger Richtlinien, die Sie befolgen müssen, wenn Sie IPC verwenden und Abonnement-Handler-Funktionen schreiben.

  • Einen IPC-Client wiederverwenden

    Nachdem Sie einen IPC-Client erstellt haben, lassen Sie ihn geöffnet und verwenden Sie ihn für alle IPC-Operationen erneut. Das Erstellen mehrerer Clients verbraucht zusätzliche Ressourcen und kann zu Ressourcenlecks führen.

  • Behandeln Sie Ausnahmen

    Der IPC-Client V2 protokolliert nicht abgefangene Ausnahmen in Abonnement-Handler-Funktionen. Sie sollten Ausnahmen in Ihren Handlerfunktionen abfangen, um Fehler zu behandeln, die in Ihrem Code auftreten.

IPC client V1

Der IPC-Client V1 verwendet einen einzigen Thread, der mit dem IPC-Server kommuniziert und Abonnement-Handler aufruft. Sie müssen dieses synchrone Verhalten berücksichtigen, wenn Sie Abonnement-Handler-Funktionen schreiben.

  • Einen IPC-Client wiederverwenden

    Nachdem Sie einen IPC-Client erstellt haben, lassen Sie ihn geöffnet und verwenden Sie ihn für alle IPC-Operationen erneut. Das Erstellen mehrerer Clients verbraucht zusätzliche Ressourcen und kann zu Ressourcenlecks führen.

  • Führen Sie den Blockierungscode asynchron aus

    Der IPC-Client V1 kann keine neuen Anfragen senden oder neue Ereignisnachrichten verarbeiten, solange der Thread blockiert ist. Sie sollten den Blockierungscode in einem separaten Thread ausführen, den Sie von der Handler-Funktion aus ausführen. Blockierungscode umfasst sleep Aufrufe, Schleifen, die kontinuierlich ausgeführt werden, und synchrone I/O-Anfragen, deren Abschluss einige Zeit in Anspruch nimmt.

  • Senden Sie neue IPC-Anfragen asynchron

    Der IPC-Client V1 kann innerhalb der Abonnement-Handler-Funktionen keine neue Anfrage senden, da die Anfrage die Handler-Funktion blockiert, wenn Sie auf eine Antwort warten. Sie sollten IPC-Anfragen in einem separaten Thread senden, den Sie von der Handler-Funktion aus ausführen.

  • Behandeln Sie Ausnahmen

    Der IPC-Client V1 behandelt keine nicht abgefangenen Ausnahmen in Abonnement-Handler-Funktionen. Wenn Ihre Handlerfunktion eine Ausnahme auslöst, wird das Abonnement geschlossen und die Ausnahme erscheint nicht in Ihren Komponentenprotokollen. Sie sollten Ausnahmen in Ihren Handler-Funktionen abfangen, um das Abonnement offen zu halten und Fehler zu protokollieren, die in Ihrem Code auftreten.