Use o AWS IoT Device SDK para se comunicar com o núcleo do Greengrass, outros componentes e AWS IoT Core - AWS IoT Greengrass

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Use o AWS IoT Device SDK para se comunicar com o núcleo do Greengrass, outros componentes e AWS IoT Core

Os componentes executados em seu dispositivo principal podem usar a biblioteca de comunicação entre processos AWS IoT Greengrass principais (IPC) no AWS IoT Device SDK para se comunicar com o AWS IoT Greengrass núcleo e outros componentes do Greengrass. Para desenvolver e executar componentes personalizados que usam IPC, você deve usar o AWS IoT Device SDK para se conectar ao serviço AWS IoT Greengrass Core IPC e realizar operações de IPC.

A interface IPC suporta dois tipos de operações:

  • Solicitação/resposta

    Os componentes enviam uma solicitação ao serviço IPC e recebem uma resposta que contém o resultado da solicitação.

  • Assinatura

    Os componentes enviam uma solicitação de assinatura ao serviço IPC e esperam um fluxo de mensagens de eventos em resposta. Os componentes fornecem um manipulador de assinaturas que lida com mensagens de eventos, erros e encerramento de streams. AWS IoT Device SDK Isso inclui uma interface de manipulador com os tipos corretos de resposta e evento para cada operação de IPC. Para ter mais informações, consulte Inscreva-se nos streams de eventos do IPC.

Versões do cliente IPC

Nas versões posteriores dos SDKs de Java e Python, AWS IoT Greengrass fornece uma versão aprimorada do cliente IPC, chamada cliente IPC V2. Cliente IPC V2:

  • Reduz a quantidade de código que você precisa escrever para usar operações IPC e ajuda a evitar erros comuns que podem ocorrer com o cliente IPC V1.

  • Chama os retornos de chamada do manipulador de assinatura em um thread separado, para que agora você possa executar o código de bloqueio, incluindo chamadas de função IPC adicionais, nos retornos de chamada do manipulador de assinaturas. O cliente IPC V1 usa o mesmo encadeamento para se comunicar com o servidor IPC e chamar os retornos de chamada do manipulador de assinatura.

  • Permite chamar operações de assinatura usando expressões Lambda (Java) ou funções (Python). O cliente IPC V1 exige que você defina classes de manipuladores de assinaturas.

  • Fornece versões síncronas e assíncronas de cada operação IPC. O cliente IPC V1 fornece somente versões assíncronas de cada operação.

Recomendamos que você use o cliente IPC V2 para aproveitar essas melhorias. No entanto, muitos exemplos nesta documentação e em alguns conteúdos on-line demonstram somente como usar o cliente IPC V1. Você pode usar os exemplos e tutoriais a seguir para ver exemplos de componentes que usam o cliente IPC V2:

Atualmente, o AWS IoT Device SDK for C++ v2 suporta somente o cliente IPC V1.

SDKs compatíveis para comunicação entre processos

As bibliotecas AWS IoT Greengrass principais do IPC estão incluídas nas seguintes AWS IoT Device SDK versões.

Conecte-se ao serviço AWS IoT Greengrass Core IPC

Para usar a comunicação entre processos em seu componente personalizado, você deve criar uma conexão com um soquete de servidor IPC executado pelo software AWS IoT Greengrass Core. Conclua as tarefas a seguir para baixar e usar o AWS IoT Device SDK no idioma de sua escolha.

Para usar o AWS IoT Device SDK para Java v2 (cliente IPC V2)
  1. Faça o download do AWS IoT Device SDK para Java v2 (v1.6.0 ou posterior).

  2. Faça o seguinte para executar seu código personalizado em seu componente:

    • Crie seu componente como um arquivo JAR que inclua AWS IoT Device SDK o. e execute esse arquivo JAR na receita do componente.

    • Defina o AWS IoT Device SDK JAR como um artefato de componente e adicione esse artefato ao classpath ao executar seu aplicativo na receita do componente.

  3. Use o código a seguir para criar o cliente IPC.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Para usar o AWS IoT Device SDK para Python v2 (cliente IPC V2)
  1. Baixe o AWS IoT Device SDK para Python (v1.9.0 ou posterior).

  2. Adicione as etapas de instalação do SDK ao ciclo de vida da instalação na receita do seu componente.

  3. Crie uma conexão com o serviço AWS IoT Greengrass Core IPC. Use o código a seguir para criar o cliente IPC.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Para criar a AWS IoT Device SDK v2 para C++, um dispositivo deve ter as seguintes ferramentas:

  • C++ 11 ou posterior

  • CMake 3.1 ou posterior

  • Um dos seguintes compiladores:

    • GCC 4.8 ou posterior

    • Clang 3.9 ou posterior

    • MSVC 2015 ou posterior

Para usar o AWS IoT Device SDK para C++ v2
  1. Faça o download do AWS IoT Device SDK para C++ v2 (v1.17.0 ou posterior).

  2. Siga as instruções de instalação no README para criar o AWS IoT Device SDK para C++ v2 a partir do código-fonte.

  3. Em sua ferramenta de criação de C++, vincule a biblioteca Greengrass IPCAWS::GreengrassIpc-cpp,, que você criou na etapa anterior. O CMakeLists.txt exemplo a seguir vincula a biblioteca Greengrass IPC a um projeto que você cria com o CMake.

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. No código do componente, crie uma conexão com o serviço AWS IoT Greengrass Core IPC para criar um cliente IPC ()Aws::Greengrass::GreengrassCoreIpcClient. Você deve definir um manipulador do ciclo de vida da conexão IPC que gerencie eventos de conexão, desconexão e erro IPC. O exemplo a seguir cria um cliente IPC e um manipulador do ciclo de vida da conexão IPC que imprime quando o cliente IPC se conecta, desconecta e encontra erros.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Para executar seu código personalizado em seu componente, crie seu código como um artefato binário e execute o artefato binário em sua receita de componente. Defina a Execute permissão do artefato OWNER para permitir que o software AWS IoT Greengrass Core execute o artefato binário.

    A Manifests seção da receita do seu componente pode ser semelhante ao exemplo a seguir.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Para criar o AWS IoT Device SDK for JavaScript v2 para uso com o NodeJS, um dispositivo deve ter as seguintes ferramentas:

  • NodeJS 10.0 ou posterior

    • Execute node -v para verificar a versão do Node.

  • CMake 3.1 ou posterior

Para usar o AWS IoT Device SDK for JavaScript v2 (cliente IPC V1)
  1. Baixe o AWS IoT Device SDK para JavaScript v2 (v1.12.10 ou posterior).

  2. Siga as instruções de instalação no README para criar o AWS IoT Device SDK for JavaScript v2 a partir do código-fonte.

  3. Crie uma conexão com o serviço AWS IoT Greengrass Core IPC. Conclua as etapas a seguir para criar o cliente IPC e estabelecer uma conexão.

  4. Use o código a seguir para criar o cliente IPC.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Use o código a seguir para estabelecer uma conexão do seu componente com o núcleo do Greengrass.

    await client.connect();

Autorize componentes a realizar operações de IPC

Para permitir que seus componentes personalizados usem algumas operações de IPC, você deve definir políticas de autorização que permitam que o componente execute a operação em determinados recursos. Cada política de autorização define uma lista de operações e uma lista de recursos que a política permite. Por exemplo, o serviço IPC de mensagens de publicação/assinatura define operações de publicação e assinatura para recursos de tópicos. Você pode usar o * caractere curinga para permitir o acesso a todas as operações ou a todos os recursos.

Você define políticas de autorização com o parâmetro de accessControl configuração, que pode ser definido na receita do componente ou ao implantar o componente. O accessControl objeto mapeia identificadores de serviço IPC para listas de políticas de autorização. Você pode definir várias políticas de autorização para cada serviço IPC para controlar o acesso. Cada política de autorização tem um ID de política, que deve ser exclusivo entre todos os componentes.

dica

Para criar IDs de política exclusivos, você pode combinar o nome do componente, o nome do serviço IPC e um contador. Por exemplo, um componente chamado com.example.HelloWorld pode definir duas políticas de autorização de publicação/assinatura com as seguintes IDs:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

As políticas de autorização usam o seguinte formato. Esse objeto é o parâmetro accessControl de configuração.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Caracteres curingas nas políticas de autorização

Você pode usar o * caractere curinga no resources elemento das políticas de autorização do IPC para permitir o acesso a vários recursos em uma única política de autorização.

  • Em todas as versões do núcleo do Greengrass, você pode especificar um único * caractere como recurso para permitir o acesso a todos os recursos.

  • No Greengrass nucleus v2.6.0 e versões posteriores, você pode especificar o * caractere em um recurso para corresponder a qualquer combinação de caracteres. Por exemplo, você pode especificar factory/1/devices/Thermostat*/status para permitir o acesso a um tópico de status para todos os dispositivos de termostato em uma fábrica, onde o nome de cada dispositivo começa com. Thermostat

Ao definir políticas de autorização para o serviço AWS IoT Core MQTT IPC, você também pode usar curingas do MQTT (+e#) para combinar vários recursos. Para obter mais informações, consulte Caracteres curinga do MQTT nas políticas de autorização do IPC do AWS IoT Core MQTT.

Variáveis de receita nas políticas de autorização

Se você usar o Greengrass nucleus v2.6.0 ou posterior e definir a opção de interpolateComponentConfigurationconfiguração do Greengrass nucleus comotrue, poderá usar a variável de receita nas políticas de autorização. {iot:thingName} Quando você precisar de uma política de autorização que inclua o nome do dispositivo principal, como para tópicos do MQTT ou sombras do dispositivo, você pode usar essa variável de receita para configurar uma única política de autorização para um grupo de dispositivos principais. Por exemplo, você pode permitir que um componente acesse o seguinte recurso para operações de IPC paralelas.

$aws/things/{iot:thingName}/shadow/

Caracteres especiais nas políticas de autorização

Para especificar um literal * ou ? caractere em uma política de autorização, você deve usar uma sequência de escape. As sequências de escape a seguir instruem o software AWS IoT Greengrass Core a usar o valor literal em vez do significado especial do caractere. Por exemplo, o * caractere é um curinga que corresponde a qualquer combinação de caracteres.

Caráter literal Sequência de escape Observações

*

${*}

?

${?}

AWS IoT Greengrass atualmente não suporta o ? curinga, que corresponde a um único caractere.

$

${$}

Use essa sequência de escape para corresponder a um recurso que contém${. Por exemplo, para corresponder a um recurso chamado${resourceName}, você deve especificar${$}{resourceName}. Caso contrário, para corresponder a um recurso que contém$, você pode usar um literal$, como para permitir acesso a um tópico que comece com$aws.

Exemplos de políticas de autorização

Você pode consultar os exemplos de políticas de autorização a seguir para ajudá-lo a configurar políticas de autorização para seus componentes.

exemplo Exemplo de receita de componente com uma política de autorização

O exemplo de receita de componente a seguir inclui um accessControl objeto que define uma política de autorização. Essa política autoriza o com.example.HelloWorld componente a publicar no test/topic tópico.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
exemplo Exemplo de atualização da configuração do componente com uma política de autorização

O exemplo de atualização de configuração a seguir em uma implantação especifica a configuração de um componente com um accessControl objeto que define uma política de autorização. Essa política autoriza o com.example.HelloWorld componente a publicar no test/topic tópico.

Console
Configuração para mesclar
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

O comando a seguir cria uma implantação em um dispositivo principal.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

O hello-world-deployment.json arquivo contém o seguinte documento JSON.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

O comando CLI do Greengrass a seguir cria uma implantação local em um dispositivo principal.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

O hello-world-configuration.json arquivo contém o seguinte documento JSON.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Inscreva-se nos streams de eventos do IPC

Você pode usar as operações de IPC para assinar fluxos de eventos em um dispositivo principal do Greengrass. Para usar uma operação de assinatura, defina um manipulador de assinatura e crie uma solicitação para o serviço IPC. Em seguida, o cliente IPC executa as funções do manipulador de assinaturas sempre que o dispositivo principal transmite uma mensagem de evento para seu componente.

Você pode fechar uma assinatura para interromper o processamento de mensagens de eventos. Para fazer isso, chame closeStream() (Java), close() (Python) ou Close() (C++) no objeto de operação de assinatura que você usou para abrir a assinatura.

O serviço AWS IoT Greengrass Core IPC suporta as seguintes operações de assinatura:

Defina gerenciadores de assinaturas

Para definir um manipulador de assinatura, defina funções de retorno de chamada que lidem com mensagens de eventos, erros e encerramento de fluxo. Se você usar o cliente IPC V1, deverá definir essas funções em uma classe. Se você usa o cliente IPC V2, que está disponível em versões posteriores dos SDKs para Java e Python, você pode definir essas funções sem criar uma classe de manipulador de assinaturas.

Java

Se você usar o cliente IPC V1, deverá implementar a interface genéricasoftware.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

Se você usa o cliente IPC V2, pode definir essas funções fora de uma classe de manipulador de assinatura ou usar expressões lambda.

void onStreamEvent(StreamEventType event)

O retorno de chamada que o cliente IPC chama quando recebe uma mensagem de evento, como uma mensagem MQTT ou uma notificação de atualização de componente.

boolean onStreamError(Throwable error)

O retorno de chamada que o cliente IPC chama quando ocorre um erro de stream.

Retorne true para fechar o stream de assinatura como resultado do erro ou retorne false para manter o stream aberto.

void onStreamClosed()

O retorno de chamada que o cliente IPC chama quando o fluxo é fechado.

Python

Se você usar o cliente IPC V1, deverá estender a classe do manipulador de resposta de fluxo que corresponde à operação de assinatura. AWS IoT Device SDK Isso inclui uma classe de gerenciador de assinaturas para cada operação de assinatura. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

Se você usa o cliente IPC V2, pode definir essas funções fora de uma classe de manipulador de assinatura ou usar expressões lambda.

def on_stream_event(self, event: StreamEventType) -> None

O retorno de chamada que o cliente IPC chama quando recebe uma mensagem de evento, como uma mensagem MQTT ou uma notificação de atualização de componente.

def on_stream_error(self, error: Exception) -> bool

O retorno de chamada que o cliente IPC chama quando ocorre um erro de stream.

Retorne true para fechar o stream de assinatura como resultado do erro ou retorne false para manter o stream aberto.

def on_stream_closed(self) -> None

O retorno de chamada que o cliente IPC chama quando o fluxo é fechado.

C++

Implemente uma classe derivada da classe do manipulador de resposta de fluxo que corresponde à operação de assinatura. AWS IoT Device SDK Isso inclui uma classe base de manipulador de assinaturas para cada operação de assinatura. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

void OnStreamEvent(StreamEventType *event)

O retorno de chamada que o cliente IPC chama quando recebe uma mensagem de evento, como uma mensagem MQTT ou uma notificação de atualização de componente.

bool OnStreamError(OperationError *error)

O retorno de chamada que o cliente IPC chama quando ocorre um erro de stream.

Retorne true para fechar o stream de assinatura como resultado do erro ou retorne false para manter o stream aberto.

void OnStreamClosed()

O retorno de chamada que o cliente IPC chama quando o fluxo é fechado.

JavaScript

Implemente uma classe derivada da classe do manipulador de resposta de fluxo que corresponde à operação de assinatura. AWS IoT Device SDK Isso inclui uma classe base de manipulador de assinaturas para cada operação de assinatura. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

on(event: 'ended', listener: StreamingOperationEndedListener)

O retorno de chamada que o cliente IPC chama quando o fluxo é fechado.

on(event: 'streamError', listener: StreamingRpcErrorListener)

O retorno de chamada que o cliente IPC chama quando ocorre um erro de stream.

Retorne true para fechar o stream de assinatura como resultado do erro ou retorne false para manter o stream aberto.

on(event: 'message', listener: (message: InboundMessageType) => void)

O retorno de chamada que o cliente IPC chama quando recebe uma mensagem de evento, como uma mensagem MQTT ou uma notificação de atualização de componente.

Exemplos de gerenciadores de assinaturas

O exemplo a seguir demonstra como usar a SubscribeToTopic operação e um manipulador de assinaturas para assinar mensagens locais de publicação/assinatura.

Java (IPC client V2)
exemplo Exemplo: Inscrever-se em mensagens locais de publicação/assinatura
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
exemplo Exemplo: Inscrever-se em mensagens locais de publicação/assinatura
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
exemplo Exemplo: Inscrever-se em mensagens locais de publicação/assinatura
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
exemplo Exemplo: Inscrever-se em mensagens locais de publicação/assinatura
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

Melhores práticas do IPC

As melhores práticas para usar o IPC em componentes personalizados diferem entre o cliente IPC V1 e o cliente IPC V2. Siga as melhores práticas para a versão do cliente IPC que você usa.

IPC client V2

O cliente IPC V2 executa funções de retorno de chamada em um thread separado, portanto, em comparação com o cliente IPC V1, há menos diretrizes a serem seguidas ao usar IPC e escrever funções de manipulador de assinatura.

  • Reutilize um cliente IPC

    Depois de criar um cliente IPC, mantenha-o aberto e reutilize-o para todas as operações de IPC. A criação de vários clientes usa recursos extras e pode resultar em vazamentos de recursos.

  • Lidar com exceções

    O cliente IPC V2 registra exceções não detectadas nas funções do manipulador de assinaturas. Você deve capturar exceções nas funções do manipulador para lidar com erros que ocorrem no seu código.

IPC client V1

O cliente IPC V1 usa um único thread que se comunica com o servidor IPC e chama os manipuladores de assinatura. Você deve considerar esse comportamento síncrono ao escrever funções de manipulador de assinaturas.

  • Reutilize um cliente IPC

    Depois de criar um cliente IPC, mantenha-o aberto e reutilize-o para todas as operações de IPC. A criação de vários clientes usa recursos extras e pode resultar em vazamentos de recursos.

  • Execute o código de bloqueio de forma assíncrona

    O cliente IPC V1 não pode enviar novas solicitações ou processar novas mensagens de eventos enquanto o thread está bloqueado. Você deve executar o código de bloqueio em um encadeamento separado, executado a partir da função de manipulador. O código de bloqueio inclui sleep chamadas, loops que são executados continuamente e solicitações de E/S síncronas que demoram para serem concluídas.

  • Enviar novas solicitações de IPC de forma assíncrona

    O cliente IPC V1 não pode enviar uma nova solicitação de dentro das funções do manipulador de assinatura, porque a solicitação bloqueia a função do manipulador se você esperar por uma resposta. Você deve enviar solicitações de IPC em um thread separado que você executa a partir da função de manipulador.

  • Lidar com exceções

    O cliente IPC V1 não lida com exceções não detectadas nas funções do manipulador de assinaturas. Se sua função de manipulador gerar uma exceção, a assinatura será encerrada e a exceção não aparecerá nos registros do componente. Você deve capturar exceções nas funções do manipulador para manter a assinatura aberta e registrar os erros que ocorrem no seu código.