Exporte fluxos de dados para a Nuvem AWS (CLI) - AWS IoT Greengrass

AWS IoT Greengrass Version 1 entrou na fase de vida útil prolongada em 30 de junho de 2023. Para obter mais informações, consulte política de manutenção do AWS IoT Greengrass V1. Após essa data, AWS IoT Greengrass V1 não lançaremos atualizações que forneçam recursos, aprimoramentos, correções de erros ou patches de segurança. Os dispositivos que funcionam AWS IoT Greengrass V1 não serão interrompidos e continuarão operando e se conectando à nuvem. É altamente recomendável que você migre para AWS IoT Greengrass Version 2, o que adiciona novos recursos significativos e suporte para plataformas adicionais.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exporte fluxos de dados para a Nuvem AWS (CLI)

Este tutorial mostra como usar a AWS CLI para criar e implantar um grupo AWS IoT Greengrass com gerenciador de fluxo habilitado. O grupo contém uma função do Lambda definida pelo usuário que grava em um fluxo no gerenciador de fluxo e é exportado automaticamente para a Nuvem AWS.

O gerenciador de fluxo torna mais eficientes e confiáveis a ingestão, o processamento e a exportação de fluxos de dados de alto volume. Neste tutorial, você criará uma função do Lambda TransferStream que consome dados de IoT. A função do Lambda usa o SDK do AWS IoT Greengrass Core para criar um fluxo no gerenciador de fluxo e ler e gravar dados nele. Em seguida, o gerenciador de fluxo exporta o fluxo para o Kinesis Data Streams. O diagrama a seguir mostra esse fluxo de trabalho.

Diagrama do fluxo de trabalho do gerenciamento de fluxo.

O foco deste tutorial é mostrar como as funções do Lambda definidas pelo usuário usam o objeto StreamManagerClient no SKD AWS IoT Greengrass do Core para interagir com o gerenciador de fluxo. Para simplificar, a função do Lambda em Python que você cria para este tutorial gera dados simulados do dispositivo.

Quando você usa a API do AWS IoT Greengrass, que inclui os comandos do Greengrass no AWS CLI, para criar um grupo, o gerenciador de fluxo é desabilitado por padrão. Para habilitar o gerenciador de fluxo em seu núcleo, crie uma versão de definição de função que inclua a função do Lambda do sistema GGStreamManager e uma versão de grupo que faça referência à nova versão de definição de função. Depois, implante o grupo.

Pré-requisitos

Para concluir este tutorial, você precisa de:

  • Um grupo do Greengrass e um núcleo do Greengrass (versão 1.10 ou posterior). Para obter informações sobre como criar um grupo e um núcleo do Greengrass, consulte Começando com AWS IoT Greengrass. O tutorial Conceitos básicos também inclui etapas para instalar o software do núcleo do AWS IoT Greengrass.

    nota

    O gerenciador de fluxo não é compatível com distribuições OpenWrT.

  • O Java 8 Runtime (JDK 8) instalado no dispositivo de núcleo.

    • Para distribuições com base em Debian (incluindo Raspbian) ou distribuições com base em Ubuntu, execute o comando a seguir:

      sudo apt install openjdk-8-jdk
    • Para distribuições com base em Red Hat (incluindo o Amazon Linux), execute o comando a seguir:

      sudo yum install java-1.8.0-openjdk

      Para obter mais informações, consulte Como fazer download e instalar pacotes OpenJDK pré-compilados na documentação do OpenJDK.

  • SDK do AWS IoT Greengrass Core para Python v1.5.0 ou posterior. Para usar StreamManagerClient no SDK do AWS IoT Greengrass Core para Python, você deve:

    • Instalar o Python 3.7 ou posterior no dispositivo de núcleo.

    • Incluir o SDK e suas dependências em seu pacote de implantação da função do Lambda. As instruções são fornecidas neste tutorial.

    dica

    Você pode usar o StreamManagerClient com Java ou NodeJS. Por exemplo, consulte o AWS IoT Greengrass SDK do Core para Java e SDK do AWS IoT Greengrass Core para Node.js no GitHub.

  • Um fluxo de destino chamado MyKinesisStream criado no Amazon Kinesis Data Streams na mesma Região da AWS que seu grupo do Greengrass. Para obter mais informações, consulte Criar um fluxo no Guia do desenvolvedor do Amazon Kinesis.

    nota

    Neste tutorial, o gerenciador de fluxo exporta dados para o Kinesis Data Streams, o que resulta em cobranças em sua Conta da AWS. Para obter informações sobre a definição de preços, consulte Definição de preço do Kinesis Data Streams.

    Para evitar incorrer em cobranças, você pode executar este tutorial sem criar um fluxo de dados do Kinesis. Nesse caso, verifique os logs para confirmar se o gerenciador de fluxo tentou exportar o fluxo para o Kinesis Data Streams.

  • Uma política do IAM adicionada à Função do grupo do Greengrass. que permite a ação kinesis:PutRecords no fluxo de dados de destino, conforme mostrado no exemplo a seguir:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

 

O tutorial contém as seguintes etapas de nível elevado:

O tutorial levará aproximadamente 30 minutos para ser concluído.

Etapa 1: crie um pacote de implantação para a função do Lambda

Nesta etapa, você cria um pacote de implantação da função do Lambda que contém o código e as dependências da função do Python. Faça upload desse pacote posteriormente, ao criar a função do Lambda no AWS Lambda. A função do Lambda usa o SDK do AWS IoT Greengrass Core para criar e interagir com fluxos locais.

nota

As funções do Lambda definidas pelo usuário devem usar o SDK AWS IoT Greengrass do Core para interagir com o gerenciador de fluxo. Para obter mais informações sobre os requisitos para o gerenciador de fluxo do Greengrass, consulte os requisitos do gerenciador de fluxo do Greengrass.

  1. Baixe o SDK do AWS IoT Greengrass Core para Python v1.5.0 ou posterior.

  2. Descompacte o pacote obtido por download para obter o SDK. O SDK é a pasta do greengrasssdk.

  3. Instale as dependências do pacote para serem incluídas com o SDK no pacote de implantação da função do Lambda.

    1. Navegue até o diretório SDK que contém o arquivo requirements.txt. Esse arquivo lista as dependências.

    2. Instale as dependências do SDK. Por exemplo, execute o seguinte comando pip para instalar as dependências no diretório atual:

      pip install --target . -r requirements.txt
  4. Salve a seguinte função do código Python em um arquivo local denominado transfer_stream.py.

    dica

    Para um exemplo de código que usa Java e NodeJS, consulte o SDK do AWS IoT Greengrass Core para Java e o SDK do AWS IoT Greengrass Core para Node.js no GitHub.

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. Comprima os seguintes itens em um arquivo denominado transfer_stream_python.zip. Esse é o pacote de implantação de sua função do Lambda.

    • transfer_stream.py. Lógica do aplicativo.

    • greengrasssdk. Biblioteca necessária para funções Python do Lambda do Greengrass que publicam mensagens MQTT.

      As operações do gerenciador de fluxo estão disponíveis na versão 1.5.0 ou posterior do SDK do AWS IoT Greengrass Core para Python.

    • As dependências instaladas para o SDK do AWS IoT Greengrass Core para Python (por exemplo, os diretórios cbor2).

    Ao criar o arquivo zip, inclua apenas esses itens, não a pasta que contém os arquivos.

Etapa 2: criar uma função do Lambda

  1. Crie um perfil do IAM para você transmitir no ARN da função ao criar a função.

    JSON Expanded
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }'
    JSON Single-line
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
    nota

    O AWS IoT Greengrass não usa essa função, pois as permissões para as funções do Lambda do Greengrass são especificadas na função de grupo do Greengrass. Neste tutorial, você cria uma função vazia.

  2. Copie a Arn da saída.

  3. Use a API do AWS Lambda para criar a função TransferStream. O comando a seguir pressupõe que o arquivo zip esteja no diretório atual.

    • Substitua role-arn pelo Arn que você copiou.

    aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role role-arn \ --handler transfer_stream.function_handler \ --runtime python3.7
  4. Publique uma versão da função.

    aws lambda publish-version --function-name TransferStream --description 'First version'
  5. Crie um alias para a versão publicada.

    Os grupos do Greengrass podem fazer referência a uma função do Lambda por alias (recomendado) ou por versão. Usar um alias facilita o gerenciamento de atualizações de código porque você não precisa alterar a tabela de assinaturas nem a definição do grupo ao atualizar a função do código. Em vez disso, você pode simplesmente apontar o alias para a nova versão da função.

    aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
    nota

    O AWS IoT Greengrass não oferece suporte a alias do Lambda para as versões $LATEST.

  6. Copie a AliasArn da saída. Você usa esse valor ao configurar a função para o AWS IoT Greengrass.

Agora você está pronto para configurar a função para o AWS IoT Greengrass.

Etapa 3: Criar uma definição e uma versão de função

Essa etapa cria uma versão de definição de função que faz referência à função do Lambda do sistema GGStreamManager e à sua função do Lambda TransferStream definida pelo usuário. Para ativar o gerenciador de fluxo ao usar a API AWS IoT Greengrass, sua versão de definição de função deve incluir a função GGStreamManager.

  1. Crie uma definição de função com uma versão inicial que contenha as funções do Lambda definidas pelo sistema e pelo usuário.

    A versão de definição a seguir ativa o gerenciador de fluxo com configurações de parâmetros padrão. Para definir configurações personalizadas, você deve definir variáveis de ambiente para os parâmetros correspondentes do gerenciador de fluxo. Para um exemplo, consulte Habilitar, desabilitar ou definir as configurações do gerenciador de fluxo (CLI). O AWS IoT Greengrass usa definições padrão para os parâmetros omitidos. O MemorySize deve ser de, no mínimo, 128000. Pinned deve ser definido como true.

    nota

    Uma função de longa duração (ou pinned) do Lambda é automaticamente iniciada após a inicialização do AWS IoT Greengrass e continua sendo executada no seu próprio contêiner. Isso contrasta com uma função do Lambda sob demanda, que é iniciada quando invocada e interrompida quando não há tarefas a serem executadas. Para obter mais informações, consulte Configuração do ciclo de vida das funções do Lambda do Greengrass.

    • Substitua arbitrary-function-id por um nome da função, como stream-manager.

    • Substitua alias-arn pelo AliasArn que você copiou quando criou o alias para a função do Lambda TransferStream.

     

    JSON expanded
    aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{ "Functions": [ { "Id": "arbitrary-function-id", "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": { "MemorySize": 128000, "Pinned": true, "Timeout": 3 } }, { "Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": { "Executable": "transfer_stream.function_handler", "MemorySize": 16000, "Pinned": true, "Timeout": 5 } } ] }'
    JSON single
    aws greengrass create-function-definition \ --name MyGreengrassFunctions \ --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
    nota

    O Timeout é exigido pela versão de definição de função, mas o GGStreamManager não o utiliza. Para obter mais informações sobre Timeout e outras configurações em nível de grupo, consulte Controlar a execução de funções do Lambda do Greengrass usando a configuração específica do grupo.

  2. Copie a LatestVersionArn da saída. Você usa esse valor para adicionar a versão de definição da função à versão de grupo que você implanta no núcleo.

Etapa 4: Criar uma definição e versão do logger

Defina as configurações de registro do grupo. Neste tutorial, você configura os componentes do sistema do AWS IoT Greengrass, funções do Lambda definidas pelo usuário e conectores para gravar logs no sistema de arquivos do dispositivo do núcleo. Você pode usar logs para solucionar quaisquer problemas que possa encontrar. Para obter mais informações, consulte Monitoramento com logs do AWS IoT Greengrass.

  1. Crie uma definição de logger que inclua uma versão inicial.

    JSON Expanded
    aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{ "Loggers": [ { "Id": "1", "Component": "GreengrassSystem", "Level": "INFO", "Space": 10240, "Type": "FileSystem" }, { "Id": "2", "Component": "Lambda", "Level": "INFO", "Space": 10240, "Type": "FileSystem" } ] }'
    JSON Single-line
    aws greengrass create-logger-definition \ --name "LoggingConfigs" \ --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
  2. Copie a LatestVersionArn da definição de logger da saída. Use esse valor para adicionar a versão de definição de logger à versão de grupo implantada no núcleo.

Etapa 5: Obter o ARN da sua versão de definição de núcleo

Obtenha o ARN da versão de definição de núcleo para adicionar à sua nova versão de grupo. Para implantar uma versão de grupo, ela deve fazer referência a uma versão de definição de núcleo que contenha exatamente um núcleo.

  1. Obtenha os IDs do grupo do Greengrass de destino e a versão do grupo. Esse procedimento pressupõe que esse seja o grupo e a versão mais recente do grupo. A consulta a seguir retorna o grupo criado mais recentemente.

    aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"

    Ou é possível consultar por nome. Os nomes de grupo não precisam ser exclusivos, portanto, vários grupos podem ser retornados.

    aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
    nota

    Também é possível encontrar esses valores no console do AWS IoT. O ID do grupo é exibido na página Settings (Configurações) do grupo. Os IDs de versão do grupo são exibidos na guia Implantações do grupo.

  2. Copie da saída o Id do grupo de destino. Você usa isso para obter a versão de definição de núcleo e ao implantar o grupo.

  3. Copie a LatestVersion da saída, que é o ID da última versão adicionada ao grupo. Você usa isso para obter a versão de definição do núcleo.

  4. Obtenha o ARN da versão de definição de núcleo:

    1. Obtenha a versão do grupo.

      • Substitua group-id pelo Id que você copiou para o grupo.

      • Substitua group-version-id pelo LatestVersion que você copiou para o grupo.

      aws greengrass get-group-version \ --group-id group-id \ --group-version-id group-version-id
    2. Copie a CoreDefinitionVersionArn da saída. Use esse valor para adicionar a versão de definição à versão de grupo implantada no núcleo.

Etapa 6: Criar uma versão de grupo

Agora, você está pronto para criar uma versão de grupo que contém todas as entidades que você deseja implantar. Faça isso criando uma versão de grupo que faz referência à versão de destino de cada componente. Neste tutorial, você inclui uma versão de definição de núcleo, uma versão de definição de função e uma versão de definição de logger.

  1. Criar uma versão de grupo.

    • Substitua group-id pelo Id que você copiou para o grupo.

    • Substitua core-definition-version-arn pelo CoreDefinitionVersionArn que você copiou para a versão de definição do núcleo.

    • Substitua function-definition-version-arn pelo LatestVersionArn copiado para a sua nova versão da definição de função.

    • Substitua logger-definition-version-arn pelo LatestVersionArn copiado para a sua nova versão da definição de logger.

    aws greengrass create-group-version \ --group-id group-id \ --core-definition-version-arn core-definition-version-arn \ --function-definition-version-arn function-definition-version-arn \ --logger-definition-version-arn logger-definition-version-arn
  2. Copie a Version da saída. Este é o ID da nova versão do grupo.

Etapa 7: Criar uma implantação

Implante o grupo no dispositivo do núcleo.

  1. Verifique se o núcleo AWS IoT Greengrass está em execução. Execute os seguintes comandos no seu terminal do Raspberry Pi, conforme necessário.

    1. Para verificar se o daemon está em execução:

      ps aux | grep -E 'greengrass.*daemon'

      Se a saída contém uma entrada root para /greengrass/ggc/packages/ggc-version/bin/daemon, o daemon está em execução.

      nota

      A versão no caminho depende da versão do software do AWS IoT Greengrass Core que foi instalada no seu dispositivo de núcleo.

    2. Para iniciar o daemon:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. Crie um implantação do .

    • Substitua group-id pelo Id que você copiou para o grupo.

    • Substitua group-version-id pelo Version copiado para a nova versão do grupo.

    aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id group-id \ --group-version-id group-version-id
  3. Copie a DeploymentId da saída.

  4. Obtenha o status de implantação.

    • Substitua group-id pelo Id que você copiou para o grupo.

    • Substitua deployment-id pelo DeploymentId que você copiou para a implantação.

    aws greengrass get-deployment-status \ --group-id group-id \ --deployment-id deployment-id

    Se o status for Success, a implantação foi bem-sucedida. Para obter ajuda sobre a solução de problemas, consulte Solução de problemas de AWS IoT Greengrass.

Etapa 8: Testar o aplicativo

A função do Lambda do TransferStream gera dados simulados do dispositivo. Ela grava dados em um fluxo que o gerenciador de fluxo exporta para o fluxo de dados do Kinesis de destino.

  1. No console do Amazon Kinesis, em Fluxo de dados do Kinesis, selecione MyKinesisStream.

    nota

    Se você executou o tutorial sem um fluxo de dados do Kinesis de destino, verifique o arquivo de log do gerenciador de fluxo (GGStreamManager). Se ele contiver export stream MyKinesisStream doesn't exist em uma mensagem de erro, o teste será bem-sucedido. Esse erro significa que o serviço tentou exportar para o fluxo, mas o fluxo não existe.

  2. Na página MyKinesisStream, selecione Monitoring (Monitoramento). Se o teste for bem-sucedido, você verá os dados nos gráficos Put Records (Inserir registros) . Dependendo da sua conexão, pode demorar um minuto até que os dados sejam exibidos.

    Importante

    Ao terminar o teste, exclua o fluxo de dados do Kinesis para evitar mais cobranças.

    Ou execute o comando a seguir para interromper o daemon do Greengrass. Isso impedirá que o núcleo envie mensagens até que você esteja pronto para dar continuidade aos testes.

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. Remova a função do Lambda TransferStream do núcleo.

    1. Siga Etapa 6: Criar uma versão de grupo para criar uma nova versão de grupo, mas remova a opção --function-definition-version-arn no comando create-group-version. Ou crie uma versão de definição de função que não inclua a função TransferStream do Lambda.

      nota

      Ao omitir a função do Lambda do sistema GGStreamManager da versão do grupo implantado, desabilite o gerenciamento de fluxo no núcleo.

    2. Siga Etapa 7: Criar uma implantação para implantar a nova versão do grupo.

Para exibir informações de registro ou solucionar problemas com fluxos, verifique os logs das funções TransferStream e GGStreamManager. Você deve ter permissões root para ler logs do AWS IoT Greengrass no sistema de arquivos.

  • TransferStream grava entradas de log em greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log.

  • GGStreamManager grava entradas de log em greengrass-root/ggc/var/log/system/GGStreamManager.log.

Se precisar de mais informações sobre a resolução de problemas, você pode definir o nível de registro do Lambda como DEBUG e criar e implantar uma nova versão de grupo.

Consulte também