Utilisez le Kit SDK des appareils AWS IoT pour communiquer avec le noyau de Greengrass, les autres composants et AWS IoT Core - AWS IoT Greengrass

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisez le Kit SDK des appareils AWS IoT pour communiquer avec le noyau de Greengrass, les autres composants et AWS IoT Core

Les composants exécutés sur votre appareil principal peuvent utiliser la bibliothèque AWS IoT Greengrass Core interprocess communication (IPC) Kit SDK des appareils AWS IoT pour communiquer avec le AWS IoT Greengrass noyau et les autres composants de Greengrass. Pour développer et exécuter des composants personnalisés qui utilisentIPC, vous devez utiliser le Kit SDK des appareils AWS IoT pour vous connecter au IPC service AWS IoT Greengrass Core et effectuer IPC des opérations.

L'IPCinterface prend en charge deux types d'opérations :

  • Demande/réponse

    Les composants envoient une demande au IPC service et reçoivent une réponse contenant le résultat de la demande.

  • Abonnement

    Les composants envoient une demande d'abonnement au IPC service et attendent un flux de messages d'événement en réponse. Les composants fournissent un gestionnaire d'abonnement qui gère les messages d'événements, les erreurs et les fermetures de flux. Kit SDK des appareils AWS IoT Il inclut une interface de gestion avec les types de réponse et d'événement appropriés pour chaque IPC opération. Pour de plus amples informations, veuillez consulter Abonnez-vous aux diffusions IPC d'événements.

IPCversions du client

Dans les versions ultérieures de Java et PythonSDKs, AWS IoT Greengrass fournit une version améliorée du IPC client, appelée IPC client V2. IPCclient V2 :

  • Réduit la quantité de code que vous devez écrire pour utiliser les IPC opérations et permet d'éviter les erreurs courantes susceptibles de se produire avec IPC le client V1.

  • Appelle les rappels du gestionnaire d'abonnement dans un thread séparé. Vous pouvez donc désormais exécuter du code de blocage, y compris des appels de IPC fonction supplémentaires, dans les rappels du gestionnaire d'abonnement. IPCle client V1 utilise le même thread pour communiquer avec le IPC serveur et appeler les rappels du gestionnaire d'abonnement.

  • Permet d'appeler des opérations d'abonnement à l'aide d'expressions Lambda (Java) ou de fonctions (Python). IPCle client V1 vous oblige à définir des classes de gestionnaire d'abonnement.

  • Fournit des versions synchrones et asynchrones de chaque opération. IPC IPCle client V1 fournit uniquement des versions asynchrones de chaque opération.

Nous vous recommandons d'utiliser IPC le client V2 pour bénéficier de ces améliorations. Cependant, de nombreux exemples présentés dans cette documentation et dans certains contenus en ligne montrent uniquement comment utiliser IPC le client V1. Vous pouvez utiliser les exemples et didacticiels suivants pour découvrir des exemples de composants utilisant IPC le client V2 :

Actuellement, la version Kit SDK des appareils AWS IoT pour C++ v2 ne prend en charge que IPC le client V1.

Pris en charge SDKs pour la communication interprocessus

Les IPC bibliothèques AWS IoT Greengrass Core sont incluses dans les Kit SDK des appareils AWS IoT versions suivantes.

Connectez-vous au IPC service AWS IoT Greengrass principal

Pour utiliser la communication interprocessus dans votre composant personnalisé, vous devez créer une connexion à un socket IPC serveur exécuté par le logiciel AWS IoT Greengrass Core. Effectuez les tâches suivantes pour télécharger et utiliser le Kit SDK des appareils AWS IoT dans la langue de votre choix.

Pour utiliser le Kit SDK des appareils AWS IoT pour Java v2 (IPCclient V2)
  1. Téléchargez le Kit SDK des appareils AWS IoT pour Java v2 (v1.6.0 ou version ultérieure).

  2. Procédez de l'une des manières suivantes pour exécuter votre code personnalisé dans votre composant :

    • Créez votre composant sous la forme d'un JAR fichier qui inclut le Kit SDK des appareils AWS IoT, et exécutez ce JAR fichier dans votre recette de composant.

    • Kit SDK des appareils AWS IoT JARDéfinissez-le en tant qu'artefact de composant et ajoutez-le au chemin de classe lorsque vous exécutez votre application dans votre recette de composant.

  3. Utilisez le code suivant pour créer le IPC client.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Pour utiliser le Kit SDK des appareils AWS IoT for Python v2 (IPCclient V2)
  1. Téléchargez le Kit SDK des appareils AWS IoT pour Python (v1.9.0 ou version ultérieure).

  2. Ajoutez les étapes SDK d'installation au cycle de vie de l'installation dans la recette de votre composant.

  3. Créez une connexion au IPC service AWS IoT Greengrass Core. Utilisez le code suivant pour créer le IPC client.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Pour créer la Kit SDK des appareils AWS IoT version 2 pour C++, un périphérique doit disposer des outils suivants :

  • C++ 11 ou version ultérieure

  • CMake3.1 ou version ultérieure

  • L'un des compilateurs suivants :

    • GCC4.8 ou version ultérieure

    • Clang 3.9 ou version ultérieure

    • MSVC2015 ou plus tard

Pour utiliser le Kit SDK des appareils AWS IoT pour C++ v2
  1. Téléchargez le Kit SDK des appareils AWS IoT pour C++ v2 (v1.17.0 ou version ultérieure).

  2. Suivez les instructions d'installation du README pour créer le Kit SDK des appareils AWS IoT pour C++ v2 à partir des sources.

  3. Dans votre outil de génération C++, liez la IPC bibliothèque Greengrass que vous avez créée à l'étape précédente. AWS::GreengrassIpc-cpp L'CMakeLists.txtexemple suivant lie la IPC bibliothèque Greengrass à un projet que vous utilisez pour créer. CMake

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. Dans le code de votre composant, créez une connexion au IPC service AWS IoT Greengrass Core pour créer un IPC client (Aws::Greengrass::GreengrassCoreIpcClient). Vous devez définir un gestionnaire du cycle de vie des IPC connexions qui gère les événements de IPC connexion, de déconnexion et d'erreur. L'exemple suivant crée un IPC client et un gestionnaire de cycle de vie des IPC connexions qui s'imprime lorsque le IPC client se connecte, se déconnecte et rencontre des erreurs.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Pour exécuter votre code personnalisé dans votre composant, créez votre code sous forme d'artefact binaire et exécutez l'artefact binaire dans la recette de votre composant. Définissez l'Executeautorisation de l'artefact pour OWNER permettre au logiciel AWS IoT Greengrass Core d'exécuter l'artefact binaire.

    La Manifests section de la recette de votre composant peut ressembler à l'exemple suivant.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Pour créer le Kit SDK des appareils AWS IoT for JavaScript v2 à utiliser avec NodeJS, un appareil doit disposer des outils suivants :

  • NodeJS 10.0 ou version ultérieure

    • Exécutez node -v pour vérifier la version du nœud.

  • CMake3.1 ou version ultérieure

Pour utiliser le Kit SDK des appareils AWS IoT for JavaScript v2 (IPCclient V1)
  1. Téléchargez le Kit SDK des appareils AWS IoT pour la version JavaScript 2 (v1.12.10 ou version ultérieure).

  2. Suivez les instructions d'installation du README pour créer le Kit SDK des appareils AWS IoT for JavaScript v2 à partir des sources.

  3. Créez une connexion au IPC service AWS IoT Greengrass Core. Procédez comme suit pour créer le IPC client et établir une connexion.

  4. Utilisez le code suivant pour créer le IPC client.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Utilisez le code suivant pour établir une connexion entre votre composant et le noyau de Greengrass.

    await client.connect();

Autoriser les composants à effectuer des IPC opérations

Pour permettre à vos composants personnalisés d'utiliser certaines IPC opérations, vous devez définir des politiques d'autorisation qui permettent au composant d'effectuer l'opération sur certaines ressources. Chaque politique d'autorisation définit une liste d'opérations et une liste de ressources autorisées par la politique. Par exemple, le IPC service de messagerie de publication/d'abonnement définit les opérations de publication et d'abonnement pour les ressources thématiques. Vous pouvez utiliser le * caractère générique pour autoriser l'accès à toutes les opérations ou à toutes les ressources.

Vous définissez les politiques d'autorisation à l'aide du paramètre de accessControl configuration, que vous pouvez définir dans la recette du composant ou lorsque vous déployez le composant. L'accessControlobjet associe les identifiants IPC de service à des listes de politiques d'autorisation. Vous pouvez définir plusieurs politiques d'autorisation pour chaque IPC service afin de contrôler l'accès. Chaque politique d'autorisation possède un identifiant de politique, qui doit être unique parmi tous les composants.

Astuce

Pour créer une politique uniqueIDs, vous pouvez combiner le nom du composant, le nom du IPC service et un compteur. Par exemple, un composant nommé com.example.HelloWorld peut définir deux politiques d'autorisation de publication/d'abonnement avec les éléments suivants : IDs

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Les politiques d'autorisation utilisent le format suivant. Cet objet est le paramètre accessControl de configuration.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Des caractères génériques dans les politiques d'autorisation

Vous pouvez utiliser le * caractère générique dans l'resourcesélément des politiques d'IPCautorisation pour autoriser l'accès à plusieurs ressources dans le cadre d'une seule politique d'autorisation.

  • Dans toutes les versions du noyau Greengrass, vous pouvez spécifier un seul * personnage comme ressource pour autoriser l'accès à toutes les ressources.

  • Dans Greengrass nucleus v2.6.0 et versions ultérieures, vous pouvez spécifier le * personnage d'une ressource pour qu'il corresponde à n'importe quelle combinaison de caractères. Par exemple, vous pouvez autoriser factory/1/devices/Thermostat*/status l'accès à une rubrique d'état pour tous les thermostats d'une usine, où le nom de chaque appareil commence parThermostat.

Lorsque vous définissez des politiques d'autorisation pour le AWS IoT Core MQTT IPC service, vous pouvez également utiliser des MQTT caractères génériques (+et#) pour associer plusieurs ressources. Pour plus d'informations, voir les MQTTcaractères génériques dans les politiques AWS IoT Core MQTT IPC d'autorisation.

Variables de recette dans les politiques d'autorisation

Si vous utilisez Greengrass nucleus v2.6.0 ou version ultérieure et que vous définissez l'option de interpolateComponentConfigurationconfiguration du noyau Greengrass sur, vous pouvez utiliser la variable de recette dans les true politiques d'autorisation. {iot:thingName} Lorsque vous avez besoin d'une politique d'autorisation incluant le nom du périphérique principal, par exemple pour les MQTT sujets ou les ombres de périphériques, vous pouvez utiliser cette variable de recette pour configurer une politique d'autorisation unique pour un groupe de périphériques principaux. Par exemple, vous pouvez autoriser un composant à accéder à la ressource suivante pour les IPC opérations parallèles.

$aws/things/{iot:thingName}/shadow/

Caractères spéciaux dans les politiques d'autorisation

Pour spécifier un littéral * ou un ? caractère dans une politique d'autorisation, vous devez utiliser une séquence d'échappement. Les séquences d'échappement suivantes indiquent au logiciel AWS IoT Greengrass Core d'utiliser la valeur littérale au lieu de la signification particulière du caractère. Par exemple, le * caractère est un joker qui correspond à n'importe quelle combinaison de caractères.

Caractère littéral Séquence d'échappement Remarques

*

${*}

?

${?}

AWS IoT Greengrass ne prend actuellement pas en charge le ? joker, qui correspond à n'importe quel caractère.

$

${$}

Utilisez cette séquence d'échappement pour faire correspondre une ressource contenant${. Par exemple, pour faire correspondre une ressource nommée${resourceName}, vous devez spécifier${$}{resourceName}. Sinon, pour faire correspondre une ressource contenant$, vous pouvez utiliser un littéral$, par exemple pour autoriser l'accès à une rubrique commençant $aws par.

Exemples de politiques d'autorisation

Vous pouvez vous référer aux exemples de politiques d'autorisation suivants pour vous aider à configurer les politiques d'autorisation pour vos composants.

Exemple de recette de composant avec une politique d'autorisation

L'exemple de recette de composant suivant inclut un accessControl objet qui définit une politique d'autorisation. Cette politique autorise le com.example.HelloWorld composant à publier dans le test/topic sujet.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
Exemple de mise à jour de configuration de composant avec une politique d'autorisation

L'exemple de mise à jour de configuration suivant dans un déploiement indique de configurer un composant avec un accessControl objet qui définit une politique d'autorisation. Cette politique autorise le com.example.HelloWorld composant à publier dans le test/topic sujet.

Console
Configuration à fusionner
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

La commande suivante crée un déploiement sur un périphérique principal.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

Le hello-world-deployment.json dossier contient le JSON document suivant.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

La CLI commande Greengrass suivante crée un déploiement local sur un appareil principal.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

Le hello-world-configuration.json dossier contient le JSON document suivant.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Abonnez-vous aux diffusions IPC d'événements

Vous pouvez utiliser IPC des opérations pour vous abonner à des flux d'événements sur un appareil principal de Greengrass. Pour utiliser une opération d'abonnement, définissez un gestionnaire d'abonnement et créez une demande auprès du IPC service. Ensuite, le IPC client exécute les fonctions du gestionnaire d'abonnement chaque fois que le périphérique principal transmet un message d'événement à votre composant.

Vous pouvez fermer un abonnement pour arrêter le traitement des messages relatifs aux événements. Pour ce faire, appelez closeStream() (Java), close() (Python) ou Close() (C++) sur l'objet d'opération d'abonnement que vous avez utilisé pour ouvrir l'abonnement.

Le IPC service AWS IoT Greengrass Core prend en charge les opérations d'abonnement suivantes :

Définition des gestionnaires d'abonnements

Pour définir un gestionnaire d'abonnement, définissez des fonctions de rappel qui gèrent les messages d'événements, les erreurs et les fermetures de flux. Si vous utilisez IPC le client V1, vous devez définir ces fonctions dans une classe. Si vous utilisez IPC le client V2, disponible dans les versions ultérieures de Java et PythonSDKs, vous pouvez définir ces fonctions sans créer de classe de gestionnaire d'abonnement.

Java

Si vous utilisez IPC le client V1, vous devez implémenter l'software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>interface générique. StreamEventType est le type de message d'événement pour l'opération d'abonnement. Définissez les fonctions suivantes pour gérer les messages d'événements, les erreurs et les fermetures de flux.

Si vous utilisez IPC le client V2, vous pouvez définir ces fonctions en dehors d'une classe de gestionnaire d'abonnement ou utiliser des expressions lambda.

void onStreamEvent(StreamEventType event)

Le rappel que le IPC client appelle lorsqu'il reçoit un message d'événement, tel qu'un MQTT message ou une notification de mise à jour de composant.

boolean onStreamError(Throwable error)

Le rappel que le IPC client appelle lorsqu'une erreur de diffusion se produit.

Renvoie true pour fermer le stream d'abonnement suite à l'erreur, ou renvoie false pour garder le stream ouvert.

void onStreamClosed()

Le rappel que le IPC client appelle lorsque le flux se ferme.

Python

Si vous utilisez IPC le client V1, vous devez étendre la classe du gestionnaire de réponse aux flux correspondant à l'opération d'abonnement. Kit SDK des appareils AWS IoT Il inclut une classe de gestionnaire d'abonnement pour chaque opération d'abonnement. StreamEventType est le type de message d'événement pour l'opération d'abonnement. Définissez les fonctions suivantes pour gérer les messages d'événements, les erreurs et les fermetures de flux.

Si vous utilisez IPC le client V2, vous pouvez définir ces fonctions en dehors d'une classe de gestionnaire d'abonnement ou utiliser des expressions lambda.

def on_stream_event(self, event: StreamEventType) -> None

Le rappel que le IPC client appelle lorsqu'il reçoit un message d'événement, tel qu'un MQTT message ou une notification de mise à jour de composant.

def on_stream_error(self, error: Exception) -> bool

Le rappel que le IPC client appelle lorsqu'une erreur de diffusion se produit.

Renvoie true pour fermer le stream d'abonnement suite à l'erreur, ou renvoie false pour garder le stream ouvert.

def on_stream_closed(self) -> None

Le rappel que le IPC client appelle lorsque le flux se ferme.

C++

Implémentez une classe dérivée de la classe du gestionnaire de réponse au flux correspondant à l'opération d'abonnement. Kit SDK des appareils AWS IoT Il inclut une classe de base de gestionnaire d'abonnement pour chaque opération d'abonnement. StreamEventType est le type de message d'événement pour l'opération d'abonnement. Définissez les fonctions suivantes pour gérer les messages d'événements, les erreurs et les fermetures de flux.

void OnStreamEvent(StreamEventType *event)

Le rappel que le IPC client appelle lorsqu'il reçoit un message d'événement, tel qu'un MQTT message ou une notification de mise à jour de composant.

bool OnStreamError(OperationError *error)

Le rappel que le IPC client appelle lorsqu'une erreur de diffusion se produit.

Renvoie true pour fermer le stream d'abonnement suite à l'erreur, ou renvoie false pour garder le stream ouvert.

void OnStreamClosed()

Le rappel que le IPC client appelle lorsque le flux se ferme.

JavaScript

Implémentez une classe dérivée de la classe du gestionnaire de réponse au flux correspondant à l'opération d'abonnement. Kit SDK des appareils AWS IoT Il inclut une classe de base de gestionnaire d'abonnement pour chaque opération d'abonnement. StreamEventType est le type de message d'événement pour l'opération d'abonnement. Définissez les fonctions suivantes pour gérer les messages d'événements, les erreurs et les fermetures de flux.

on(event: 'ended', listener: StreamingOperationEndedListener)

Le rappel que le IPC client appelle lorsque le flux se ferme.

on(event: 'streamError', listener: StreamingRpcErrorListener)

Le rappel que le IPC client appelle lorsqu'une erreur de diffusion se produit.

Renvoie true pour fermer le stream d'abonnement suite à l'erreur, ou renvoie false pour garder le stream ouvert.

on(event: 'message', listener: (message: InboundMessageType) => void)

Le rappel que le IPC client appelle lorsqu'il reçoit un message d'événement, tel qu'un MQTT message ou une notification de mise à jour de composant.

Exemples de gestionnaires d'abonnements

L'exemple suivant montre comment utiliser l'SubscribeToTopicopération et un gestionnaire d'abonnement pour s'abonner à des messages de publication/d'abonnement locaux.

Java (IPC client V2)
Exemple : s'abonner à des messages locaux de publication/d'abonnement
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
Exemple : s'abonner à des messages locaux de publication/d'abonnement
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
Exemple : s'abonner à des messages locaux de publication/d'abonnement
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Exemple : s'abonner à des messages locaux de publication/d'abonnement
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

IPCmeilleures pratiques

Les meilleures pratiques d'utilisation IPC dans les composants personnalisés diffèrent entre le IPC client V1 et IPC le client V2. Suivez les meilleures pratiques pour la version du IPC client que vous utilisez.

IPC client V2

Le IPC client V2 exécute les fonctions de rappel dans un thread séparé. Par conséquent, par rapport au IPC client V1, vous devez suivre moins de directives lorsque vous utilisez IPC et écrivez des fonctions de gestion d'abonnement.

  • Réutiliser un IPC client

    Après avoir créé un IPC client, gardez-le ouvert et réutilisez-le pour toutes les IPC opérations. La création de plusieurs clients utilise des ressources supplémentaires et peut entraîner des fuites de ressources.

  • Gérer les exceptions

    Le IPC client V2 enregistre les exceptions non détectées dans les fonctions du gestionnaire d'abonnement. Vous devez intercepter les exceptions dans les fonctions de votre gestionnaire pour gérer les erreurs qui se produisent dans votre code.

IPC client V1

Le IPC client V1 utilise un seul thread qui communique avec le IPC serveur et appelle les gestionnaires d'abonnement. Vous devez tenir compte de ce comportement synchrone lorsque vous écrivez des fonctions de gestion d'abonnement.

  • Réutiliser un IPC client

    Après avoir créé un IPC client, gardez-le ouvert et réutilisez-le pour toutes les IPC opérations. La création de plusieurs clients utilise des ressources supplémentaires et peut entraîner des fuites de ressources.

  • Exécuter le code de blocage de manière asynchrone

    Le IPC client V1 ne peut pas envoyer de nouvelles demandes ou traiter de nouveaux messages d'événements tant que le thread est bloqué. Vous devez exécuter le code de blocage dans un thread distinct que vous exécutez à partir de la fonction de gestion. Le code de blocage inclut sleep les appels, les boucles qui s'exécutent en continu et les demandes d'E/S synchrones dont le traitement prend du temps.

  • Envoyer de nouvelles IPC demandes de manière asynchrone

    Le IPC client V1 ne peut pas envoyer de nouvelle demande depuis les fonctions du gestionnaire d'abonnement, car la demande bloque la fonction du gestionnaire si vous attendez une réponse. Vous devez envoyer les IPC demandes dans un thread distinct que vous exécutez à partir de la fonction de gestion.

  • Gérer les exceptions

    Le IPC client V1 ne gère pas les exceptions non détectées dans les fonctions du gestionnaire d'abonnements. Si votre fonction de gestion génère une exception, l'abonnement prend fin et l'exception n'apparaît pas dans les journaux de vos composants. Vous devez détecter les exceptions dans les fonctions de votre gestionnaire afin de maintenir l'abonnement ouvert et de consigner les erreurs qui se produisent dans votre code.