Verwenden Sie die AWS IoT Device SDK, um mit dem Greengrass-Kern, anderen Komponenten und zu kommunizieren AWS IoT Core - AWS IoT Greengrass

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden Sie die AWS IoT Device SDK, um mit dem Greengrass-Kern, anderen Komponenten und zu kommunizieren AWS IoT Core

Komponenten, die auf Ihrem Core-Gerät ausgeführt werden, können die AWS IoT Greengrass Core Interprocess Communication (IPC)-Bibliothek in der verwendenAWS IoT Device SDK, um mit dem AWS IoT Greengrass Kern und anderen Greengrass-Komponenten zu kommunizieren. Um benutzerdefinierte Komponenten zu entwickeln und auszuführen, die IPC verwenden, müssen Sie die verwendenAWS IoT Device SDK, um eine Verbindung zum AWS IoT Greengrass Core- IPC-Service herzustellen und IPC-Operationen durchzuführen.

Die IPC-Schnittstelle unterstützt zwei Arten von Operationen:

  • Anfrage/Antwort

    Komponenten senden eine Anfrage an den IPC-Service und erhalten eine Antwort, die das Ergebnis der Anfrage enthält.

  • Abonnement

    Komponenten senden eine Abonnementanfrage an den IPC-Service und erwarten als Reaktion darauf einen Stream von Ereignisnachrichten. Komponenten bieten einen Abonnement-Handler, der Ereignismeldungen, Fehler und Stream-Schließung verarbeitet. Die AWS IoT Device SDK enthält eine Handler-Schnittstelle mit den richtigen Antwort- und Ereignistypen für jede IPC-Operation. Weitere Informationen finden Sie unter Abonnieren von IPC-Ereignisstreams.

IPC-Clientversionen

In späteren Versionen der Java- und Python-SDKs AWS IoT Greengrass bietet eine verbesserte Version des IPC-Clients, den sogenannten IPC-Client V2. IPC-Client V2:

  • Reduziert die Menge an Code, die Sie für die Verwendung von IPC-Operationen schreiben müssen, und trägt dazu bei, häufige Fehler zu vermeiden, die bei IPC-Client V1 auftreten können.

  • Ruft Abonnement-Handler-Callbacks in einem separaten Thread auf, sodass Sie jetzt blockierenden Code, einschließlich zusätzlicher IPC-Funktionsaufrufe, in Abonnement-Handler-Callbacks ausführen können. Der IPC-Client V1 verwendet denselben Thread für die Kommunikation mit dem IPC-Server und Call-Subscriber-Handler-Callbacks.

  • Ermöglicht das Aufrufen von Abonnementoperationen mithilfe von Lambda-Ausdrücken (Java) oder -Funktionen (Python). Für den IPC-Client V1 müssen Sie Abonnement-Handler-Klassen definieren.

  • Stellt synchrone und asynchrone Versionen jeder IPC-Operation bereit. Der IPC-Client V1 stellt nur asynchrone Versionen jeder Operation bereit.

Wir empfehlen Ihnen, den IPC-Client V2 zu verwenden, um diese Verbesserungen zu nutzen. Viele Beispiele in dieser Dokumentation und in einigen Online-Inhalten veranschaulichen jedoch nur, wie IPC-Client V1 verwendet wird. Sie können die folgenden Beispiele und Tutorials verwenden, um Beispielkomponenten anzuzeigen, die den IPC-Client V2 verwenden:

Derzeit unterstützt für AWS IoT Device SDK C++ v2 nur den IPC-Client V1.

Unterstützte SDKs für die Kommunikation zwischen Prozessen

Die AWS IoT Greengrass Core IPC-Bibliotheken sind in den folgenden AWS IoT Device SDK Versionen enthalten.

Herstellen einer Verbindung mit dem AWS IoT Greengrass Core IPC-Service

Um die prozessübergreifende Kommunikation in Ihrer benutzerdefinierten Komponente zu verwenden, müssen Sie eine Verbindung zu einem IPC-Server-Socket herstellen, den die AWS IoT Greengrass Core-Software ausführt. Führen Sie die folgenden Aufgaben aus, um die AWS IoT Device SDK in der Sprache Ihrer Wahl herunterzuladen und zu verwenden.

So verwenden Sie die AWS IoT Device SDK für Java v2 (IPC-Client V2)
  1. Laden Sie die AWS IoT Device SDK für Java v2 (v1.6.0 oder höher) herunter.

  2. Führen Sie einen der folgenden Schritte aus, um Ihren benutzerdefinierten Code in Ihrer Komponente auszuführen:

    • Erstellen Sie Ihre Komponente als JAR-Datei, die die enthältAWS IoT Device SDK, und führen Sie diese JAR-Datei in Ihrem Komponentenrezept aus.

    • Definieren Sie das AWS IoT Device SDK JAR als Komponentenartefakt und fügen Sie dieses Artefakt dem Klassenpfad hinzu, wenn Sie Ihre Anwendung in Ihrem Komponentenrezept ausführen.

  3. Verwenden Sie den folgenden Code, um den IPC-Client zu erstellen.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
So verwenden Sie die AWS IoT Device SDK für Python v2 (IPC-Client V2)
  1. Laden Sie die AWS IoT Device SDK für Python (v1.9.0 oder höher) herunter.

  2. Fügen Sie die Installationsschritte des SDK zum Installationslebenszyklus im Rezept Ihrer Komponente hinzu.

  3. Erstellen Sie eine Verbindung zum AWS IoT Greengrass Core IPC-Service. Verwenden Sie den folgenden Code, um den IPC-Client zu erstellen.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Um AWS IoT Device SDK v2 für C++ zu erstellen, muss ein Gerät über die folgenden Tools verfügen:

  • C++ 11 oder höher

  • CMake 3.1 oder höher

  • Einer der folgenden Compiler:

    • GCC 4.8 oder höher

    • Clang 3.9 oder höher

    • MSVC 2015 oder höher

So verwenden Sie die AWS IoT Device SDK für C++ v2
  1. Laden Sie die AWS IoT Device SDK für C++ v2 (v1.17.0 oder höher) herunter.

  2. Folgen Sie den Installationsanweisungen in README, um die AWS IoT Device SDK für C++ v2 aus der Quelle zu erstellen.

  3. Verknüpfen Sie in Ihrem C++-Build-Tool die Greengrass IPC-Bibliothek, AWS::GreengrassIpc-cpp, die Sie im vorherigen Schritt erstellt haben. Im folgenden CMakeLists.txt Beispiel wird die Greengrass IPC-Bibliothek mit einem Projekt verknüpft, das Sie mit CMake erstellen.

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. Erstellen Sie in Ihrem Komponentencode eine Verbindung zum AWS IoT Greengrass Core IPC-Service, um einen IPC-Client (Aws::Greengrass::GreengrassCoreIpcClient) zu erstellen. Sie müssen einen IPC-Verbindungslebenszyklus-Handler definieren, der IPC-Verbindungs-, Verbindungs- und Fehlerereignisse behandelt. Im folgenden Beispiel werden ein IPC-Client und ein IPC-Verbindungslebenszyklus-Handler erstellt, der ausgegeben wird, wenn der IPC-Client eine Verbindung herstellt, die Verbindung trennt und auf Fehler stößt.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Um Ihren benutzerdefinierten Code in Ihrer Komponente auszuführen, erstellen Sie Ihren Code als binäres Artefakt und führen Sie das binäre Artefakt in Ihrem Komponentenrezept aus. Legen Sie die Execute Berechtigung des Artefakts auf festOWNER, damit die AWS IoT Greengrass -Core-Software das binäre Artefakt ausführen kann.

    Der Manifests Abschnitt Ihres Komponentenrezepts könnte dem folgenden Beispiel ähneln.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Um die AWS IoT Device SDK für JavaScript v2 zur Verwendung mit NodeJS zu erstellen, muss ein Gerät über die folgenden Tools verfügen:

  • NodeJS 10.0 oder höher

    • Führen Sie ausnode -v, um die Knotenversion zu überprüfen.

  • CMake 3.1 oder höher

So verwenden Sie die AWS IoT Device SDK für JavaScript v2 (IPC-Client V1)
  1. Laden Sie die AWS IoT Device SDK für JavaScript v2 (v1.12.10 oder höher) herunter.

  2. Folgen Sie den Installationsanweisungen in README, um die AWS IoT Device SDK für JavaScript v2 aus der Quelle zu erstellen.

  3. Erstellen Sie eine Verbindung zum AWS IoT Greengrass Core IPC-Service. Führen Sie die folgenden Schritte aus, um den IPC-Client zu erstellen und eine Verbindung herzustellen.

  4. Verwenden Sie den folgenden Code, um den IPC-Client zu erstellen.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Verwenden Sie den folgenden Code, um eine Verbindung von Ihrer Komponente zum Greengrass-Kernus herzustellen.

    await client.connect();

Autorisieren von Komponenten zur Durchführung von IPC-Operationen

Damit Ihre benutzerdefinierten Komponenten einige IPC-Operationen verwenden können, müssen Sie Autorisierungsrichtlinien definieren, die es der Komponente ermöglichen, die Operation für bestimmte Ressourcen auszuführen. Jede Autorisierungsrichtlinie definiert eine Liste von Operationen und eine Liste von Ressourcen, die die Richtlinie zulässt. Der IPC-Service zum Veröffentlichen/Abonnieren von Nachrichten definiert beispielsweise Veröffentlichungs- und Abonnementvorgänge für Themenressourcen. Sie können den * Platzhalter verwenden, um den Zugriff auf alle Operationen oder alle Ressourcen zu erlauben.

Sie definieren Autorisierungsrichtlinien mit dem accessControl Konfigurationsparameter, den Sie im Komponentenrezept oder bei der Bereitstellung der Komponente festlegen können. Das -accessControlObjekt ordnet IPC-Service-IDs Listen von Autorisierungsrichtlinien zu. Sie können mehrere Autorisierungsrichtlinien für jeden IPC-Service definieren, um den Zugriff zu steuern. Jede Autorisierungsrichtlinie hat eine Richtlinien-ID, die unter allen Komponenten eindeutig sein muss.

Tipp

Um eindeutige Richtlinien-IDs zu erstellen, können Sie den Komponentennamen, den IPC-Servicenamen und einen Zähler kombinieren. Beispielsweise com.example.HelloWorld kann eine Komponente mit dem Namen zwei Richtlinien für die Autorisierung zum Veröffentlichen/Abonnieren mit den folgenden IDs definieren:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Autorisierungsrichtlinien verwenden das folgende Format. Dieses Objekt ist der accessControl Konfigurationsparameter.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Platzhalter in Autorisierungsrichtlinien

Sie können den * Platzhalter im -resourcesElement von IPC-Autorisierungsrichtlinien verwenden, um den Zugriff auf mehrere Ressourcen in einer einzigen Autorisierungsrichtlinie zu ermöglichen.

  • In allen Versionen des Greengrass-Kerns können Sie ein einzelnes * Zeichen als Ressource angeben, um den Zugriff auf alle Ressourcen zu ermöglichen.

  • In Greengrass-Kern v2.6.0 und höher können Sie das * Zeichen in einer Ressource so angeben, dass es einer beliebigen Kombination von Zeichen entspricht. Sie können beispielsweise angeben, factory/1/devices/Thermostat*/status um den Zugriff auf ein Statusthema für alle Heater-Geräte in einer Fabrik zu erlauben, bei denen der Name jedes Geräts mit beginntThermostat.

Wenn Sie Autorisierungsrichtlinien für den AWS IoT Core MQTT IPC-Service definieren, können Sie auch MQTT-Platzhalter (+ und #) verwenden, um mehrere Ressourcen abzugleichen. Weitere Informationen finden Sie unter MQTT-Platzhalter in AWS IoT Core den MQTT IPC-Autorisierungsrichtlinien.

Rezeptvariablen in Autorisierungsrichtlinien

Wenn Sie den Greengrass-Kern v2.6.0 oder höher verwenden und die interpolateComponentConfiguration Konfigurationsoption des Greengrass-Kerns auf festlegentrue, können Sie die -{iot:thingName}Rezeptvariable in Autorisierungsrichtlinien verwenden. Wenn Sie eine Autorisierungsrichtlinie benötigen, die den Namen des Core-Geräts enthält, z. B. für MQTT-Themen oder Geräteschatten, können Sie diese Rezeptvariable verwenden, um eine einzelne Autorisierungsrichtlinie für eine Gruppe von Core-Geräten zu konfigurieren. Sie können beispielsweise einer Komponente Zugriff auf die folgende Ressource für Schatten- IPC-Operationen gewähren.

$aws/things/{iot:thingName}/shadow/

Sonderzeichen in Autorisierungsrichtlinien

Um ein Literal * oder ? Zeichen in einer Autorisierungsrichtlinie anzugeben, müssen Sie eine Escape-Sequenz verwenden. Die folgenden Escape-Sequenzen weisen die AWS IoT Greengrass Core-Software an, den Literalwert anstelle der besonderen Bedeutung des Zeichens zu verwenden. Das * Zeichen ist beispielsweise ein Platzhalter, der einer beliebigen Kombination von Zeichen entspricht.

Literales Zeichen Escape-Sequenz Hinweise

*

${*}

?

${?}

AWS IoT Greengrass unterstützt derzeit keinen ? Platzhalter, der einem einzelnen Zeichen entspricht.

$

${$}

Verwenden Sie diese Escape-Sequenz, um eine Ressource abzugleichen, die enthält${. Um beispielsweise mit einer Ressource namens übereinzustimmen${resourceName}, müssen Sie angeben${$}{resourceName}. Andernfalls können Sie als Übereinstimmung mit einer Ressource, die enthält$, ein Literal verwenden$, z. B. um den Zugriff auf ein Thema zu erlauben, das mit beginnt$aws.

Beispiele für Autorisierungsrichtlinien

Sie können auf die folgenden Beispiele für Autorisierungsrichtlinien verweisen, um Ihnen bei der Konfiguration von Autorisierungsrichtlinien für Ihre Komponenten zu helfen.

Beispiel für ein Komponentenrezept mit einer Autorisierungsrichtlinie

Das folgende Beispiel-Komponentenrezept enthält ein -accessControlObjekt, das eine Autorisierungsrichtlinie definiert. Diese Richtlinie autorisiert die com.example.HelloWorld Komponente zum Veröffentlichen im test/topic Thema.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
Beispiel für eine Aktualisierung der Komponentenkonfiguration mit einer Autorisierungsrichtlinie

Das folgende Beispiel für eine Konfigurationsaktualisierung in einer Bereitstellung legt fest, eine Komponente mit einem accessControl Objekt zu konfigurieren, das eine Autorisierungsrichtlinie definiert. Diese Richtlinie autorisiert die com.example.HelloWorld Komponente zum Veröffentlichen im test/topic Thema.

Console
Konfiguration zum Zusammenführen
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

Der folgende Befehl erstellt eine Bereitstellung auf einem Core-Gerät.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

Die hello-world-deployment.json Datei enthält das folgende JSON-Dokument.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

Der folgende Greengrass-CLI-Befehl erstellt eine lokale Bereitstellung auf einem Core-Gerät.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

Die hello-world-configuration.json Datei enthält das folgende JSON-Dokument.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Abonnieren von IPC-Ereignisstreams

Sie können IPC-Operationen verwenden, um Streams von Ereignissen auf einem Greengrass-Kerngerät zu abonnieren. Um einen Abonnementvorgang zu verwenden, definieren Sie einen Abonnement-Handler und erstellen Sie eine Anfrage an den IPC-Service. Anschließend führt der IPC-Client jedes Mal die Funktionen des Abonnement-Handlers aus, wenn das Core-Gerät eine Ereignisnachricht an Ihre Komponente streamt.

Sie können ein Abonnement schließen, um die Verarbeitung von Ereignismeldungen zu beenden. Rufen Sie dazu closeStream() (Java), close() (Python) oder Close() (C++) für das Abonnementoperationsobjekt auf, mit dem Sie das Abonnement geöffnet haben.

Der AWS IoT Greengrass Core IPC-Service unterstützt die folgenden Abonnementvorgänge:

Definieren von Abonnement-Handlern

Um einen Abonnement-Handler zu definieren, definieren Sie Callback-Funktionen, die Ereignismeldungen, Fehler und Stream-Schließung verarbeiten. Wenn Sie den IPC-Client V1 verwenden, müssen Sie diese Funktionen in einer Klasse definieren. Wenn Sie den IPC-Client V2 verwenden, der in späteren Versionen der Java- und Python-SDKs verfügbar ist, können Sie diese Funktionen definieren, ohne eine Abonnement-Handler-Klasse zu erstellen.

Java

Wenn Sie den IPC-Client V1 verwenden, müssen Sie die generische software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> Schnittstelle implementieren. StreamEventType ist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und Stream-Schließung zu behandeln.

Wenn Sie IPC-Client V2 verwenden, können Sie diese Funktionen außerhalb einer Abonnement-Handler-Klasse definieren oder Lambda-Ausdrücke verwenden.

void onStreamEvent(StreamEventType event)

Der Rückruf, den der IPC-Client aufruft, wenn er eine Ereignismeldung erhält, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über Komponentenaktualisierungen.

boolean onStreamError(Throwable error)

Der Rückruf, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder false, um den Stream geöffnet zu lassen.

void onStreamClosed()

Der Rückruf, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

Python

Wenn Sie den IPC-Client V1 verwenden, müssen Sie die Stream-Antwort-Handler-Klasse erweitern, die dem Abonnementvorgang entspricht. Die AWS IoT Device SDK enthält eine Abonnement-Handler-Klasse für jeden Abonnementvorgang. StreamEventType ist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und Stream-Schließung zu behandeln.

Wenn Sie IPC-Client V2 verwenden, können Sie diese Funktionen außerhalb einer Abonnement-Handler-Klasse definieren oder Lambda-Ausdrücke verwenden.

def on_stream_event(self, event: StreamEventType) -> None

Der Rückruf, den der IPC-Client aufruft, wenn er eine Ereignismeldung erhält, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über Komponentenaktualisierungen.

def on_stream_error(self, error: Exception) -> bool

Der Rückruf, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder false, um den Stream geöffnet zu lassen.

def on_stream_closed(self) -> None

Der Rückruf, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

C++

Implementieren Sie eine Klasse, die aus der Stream-Antwort-Handler-Klasse abgeleitet wird, die dem Abonnementvorgang entspricht. Die AWS IoT Device SDK enthält eine Basisklasse für den Abonnement-Handler für jeden Abonnementvorgang. StreamEventType ist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und Stream-Schließung zu behandeln.

void OnStreamEvent(StreamEventType *event)

Der Rückruf, den der IPC-Client aufruft, wenn er eine Ereignismeldung erhält, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über Komponentenaktualisierungen.

bool OnStreamError(OperationError *error)

Der Rückruf, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder false, um den Stream geöffnet zu lassen.

void OnStreamClosed()

Der Rückruf, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

JavaScript

Implementieren Sie eine Klasse, die aus der Stream-Antwort-Handler-Klasse abgeleitet wird, die dem Abonnementvorgang entspricht. Die AWS IoT Device SDK enthält eine Basisklasse für den Abonnement-Handler für jeden Abonnementvorgang. StreamEventType ist der Typ der Ereignisnachricht für den Abonnementvorgang. Definieren Sie die folgenden Funktionen, um Ereignismeldungen, Fehler und Stream-Schließung zu behandeln.

on(event: 'ended', listener: StreamingOperationEndedListener)

Der Rückruf, den der IPC-Client aufruft, wenn der Stream geschlossen wird.

on(event: 'streamError', listener: StreamingRpcErrorListener)

Der Rückruf, den der IPC-Client aufruft, wenn ein Stream-Fehler auftritt.

Geben Sie true zurück, um den Abonnement-Stream aufgrund des Fehlers zu schließen, oder false, um den Stream geöffnet zu lassen.

on(event: 'message', listener: (message: InboundMessageType) => void)

Der Rückruf, den der IPC-Client aufruft, wenn er eine Ereignismeldung erhält, z. B. eine MQTT-Nachricht oder eine Benachrichtigung über Komponentenaktualisierungen.

Beispiel für Abonnement-Handler

Das folgende Beispiel zeigt, wie Sie die -SubscribeToTopicOperation und einen Abonnement-Handler verwenden, um lokale Veröffentlichungs-/Abonnementnachrichten zu abonnieren.

Java (IPC client V2)
Beispiel: Abonnieren von lokalen Veröffentlichungs-/Abonnementnachrichten
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
Beispiel: Abonnieren von lokalen Veröffentlichungs-/Abonnementnachrichten
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
Beispiel: Abonnieren von lokalen Veröffentlichungs-/Abonnementnachrichten
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Beispiel: Abonnieren von lokalen Veröffentlichungs-/Abonnementnachrichten
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

Bewährte Methoden für IPC

Die bewährten Methoden für die Verwendung von IPC in benutzerdefinierten Komponenten unterscheiden sich zwischen IPC-Client V1 und IPC-Client V2. Befolgen Sie die bewährten Methoden für die von Ihnen verwendete IPC-Clientversion.

IPC client V2

Der IPC-Client V2 führt Rückruffunktionen in einem separaten Thread aus. Im Vergleich zu IPC-Client V1 gibt es daher weniger Richtlinien, die Sie befolgen müssen, wenn Sie IPC verwenden und Abonnement-Handler-Funktionen schreiben.

  • Einen IPC-Client wiederverwenden

    Nachdem Sie einen IPC-Client erstellt haben, lassen Sie ihn geöffnet und verwenden Sie ihn für alle IPC-Operationen. Das Erstellen mehrerer Clients verbraucht zusätzliche Ressourcen und kann zu Ressourcenlecks führen.

  • Ausnahmen behandeln

    Der IPC-Client V2 protokolliert Ausnahmen in Abonnement-Handler-Funktionen. Sie sollten Ausnahmen in Ihren Handler-Funktionen abfangen, um Fehler zu behandeln, die in Ihrem Code auftreten.

IPC client V1

Der IPC-Client V1 verwendet einen einzelnen Thread, der mit dem IPC-Server kommuniziert und Abonnement-Handler aufruft. Sie müssen dieses synchrone Verhalten berücksichtigen, wenn Sie Abonnement-Handler-Funktionen schreiben.

  • Einen IPC-Client wiederverwenden

    Nachdem Sie einen IPC-Client erstellt haben, lassen Sie ihn geöffnet und verwenden Sie ihn für alle IPC-Operationen. Das Erstellen mehrerer Clients verbraucht zusätzliche Ressourcen und kann zu Ressourcenlecks führen.

  • Blockieren von Code asynchron ausführen

    Der IPC-Client V1 kann keine neuen Anfragen senden oder neue Ereignismeldungen verarbeiten, während der Thread blockiert ist. Sie sollten Blockiercode in einem separaten Thread ausführen, den Sie von der Handler-Funktion aus ausführen. Das Blockieren von Code umfasst sleep Aufrufe, Schleifen, die kontinuierlich ausgeführt werden, und synchrone E/A-Anforderungen, deren Abschluss einige Zeit in Anspruch nimmt.

  • Neue IPC-Anforderungen asynchron senden

    Der IPC-Client V1 kann innerhalb von Abonnement-Handler-Funktionen keine neue Anfrage senden, da die Anfrage die Handler-Funktion blockiert, wenn Sie auf eine Antwort warten. Sie sollten IPC-Anforderungen in einem separaten Thread senden, den Sie von der Handler-Funktion aus ausführen.

  • Ausnahmen behandeln

    Der IPC-Client V1 behandelt keine unerkannten Ausnahmen in Abonnement-Handler-Funktionen. Wenn Ihre Handler-Funktion eine Ausnahme auslöst, wird das Abonnement geschlossen und die Ausnahme wird nicht in Ihren Komponentenprotokollen angezeigt. Sie sollten Ausnahmen in Ihren Handler-Funktionen abfangen, um das Abonnement geöffnet zu lassen und Fehler zu protokollieren, die in Ihrem Code auftreten.