Kinesis Firehose - AWS IoT Greengrass

AWS IoT Greengrass Version 1 entrou na fase de vida útil prolongada em 30 de junho de 2023. Para obter mais informações, consulte política de manutenção do AWS IoT Greengrass V1. Após essa data, AWS IoT Greengrass V1 não lançaremos atualizações que forneçam recursos, aprimoramentos, correções de erros ou patches de segurança. Os dispositivos que funcionam AWS IoT Greengrass V1 não serão interrompidos e continuarão operando e se conectando à nuvem. É altamente recomendável que você migre para AWS IoT Greengrass Version 2, o que adiciona novos recursos significativos e suporte para plataformas adicionais.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Kinesis Firehose

O conector Kinesis Firehose publica dados por meio de um stream de entrega do Amazon Data Firehose para destinos como Amazon S3, Amazon Redshift ou Amazon Service. OpenSearch

Esse conector é um produtor de dados para um fluxo de entrega do Kinesis. Ele recebe dados de entrada em um tópico MQTT e envia os dados para um fluxo de entrega especificado. Em seguida, o fluxo de entrega envia o registro de dados ao destino configurado (por exemplo, um bucket do S3).

Esse conector tem as seguintes versões.

Version (Versão)

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

Para obter informações sobre alterações de versão, consulte o Changelog.

Requisitos

Esse conector tem os seguintes requisitos:

Version 4 - 5
  • AWS IoT Greengrass Software principal v1.9.3 ou posterior.

  • Python, versão 3.7 ou 3.8, instalado no dispositivo de núcleo e adicionado à variável de ambiente PATH.

    nota

    Para usar o Python 3.8, execute o comando a seguir para criar um symblink da pasta de instalação padrão do Python 3.7 para os binários instalados do Python 3.8.

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    Isso configura seu dispositivo para atender ao requisito Python para AWS IoT Greengrass.

  • Um fluxo de entrega configurado do Kinesis. Para obter mais informações, consulte Criação de um stream de entrega do Amazon Data Firehose no Guia do desenvolvedor do Amazon Kinesis Firehose.

  • A função de grupo do Greengrass configurada para permitir as ações firehose:PutRecord e firehose:PutRecordBatch no fluxo de entrega de destino, conforme mostrado no seguinte exemplo de política do IAM.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Esse conector permite que você substitua dinamicamente o fluxo de entrega padrão na carga da mensagem de entrada. Se a implementação usar esse atributo, a política do IAM deverá incluir todos os fluxos de destino como recursos. Você pode conceder acesso granular ou condicional aos recursos (por exemplo, usando um esquema de nomeação * curinga).

    Para o requisito de função de grupo, você deve configurar a função para conceder as permissões necessárias e certificar-se de que a função tenha sido adicionada ao grupo. Para obter mais informações, consulte Gerenciar a função de grupo do Greengrass (console) ou Gerenciar a função de grupo do Greengrass (CLI).

Versions 2 - 3
  • AWS IoT Greengrass Software principal v1.7 ou posterior.

  • Python versão 2.7 instalado no dispositivo de núcleo e adicionado à variável de ambiente PATH.

  • Um fluxo de entrega configurado do Kinesis. Para obter mais informações, consulte Criação de um stream de entrega do Amazon Data Firehose no Guia do desenvolvedor do Amazon Kinesis Firehose.

  • A função de grupo do Greengrass configurada para permitir as ações firehose:PutRecord e firehose:PutRecordBatch no fluxo de entrega de destino, conforme mostrado no seguinte exemplo de política do IAM.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Esse conector permite que você substitua dinamicamente o fluxo de entrega padrão na carga da mensagem de entrada. Se a implementação usar esse atributo, a política do IAM deverá incluir todos os fluxos de destino como recursos. Você pode conceder acesso granular ou condicional aos recursos (por exemplo, usando um esquema de nomeação * curinga).

    Para o requisito de função de grupo, você deve configurar a função para conceder as permissões necessárias e certificar-se de que a função tenha sido adicionada ao grupo. Para obter mais informações, consulte Gerenciar a função de grupo do Greengrass (console) ou Gerenciar a função de grupo do Greengrass (CLI).

Version 1
  • AWS IoT Greengrass Software principal v1.7 ou posterior.

  • Python versão 2.7 instalado no dispositivo de núcleo e adicionado à variável de ambiente PATH.

  • Um fluxo de entrega configurado do Kinesis. Para obter mais informações, consulte Criação de um stream de entrega do Amazon Data Firehose no Guia do desenvolvedor do Amazon Kinesis Firehose.

  • A função de grupo do Greengrass configurada para permitir a ação firehose:PutRecord no fluxo de entrega de destino, conforme mostrado no seguinte exemplo de política do IAM.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Esse conector permite que você substitua dinamicamente o fluxo de entrega padrão na carga da mensagem de entrada. Se a implementação usar esse atributo, a política do IAM deverá incluir todos os fluxos de destino como recursos. Você pode conceder acesso granular ou condicional aos recursos (por exemplo, usando um esquema de nomeação * curinga).

    Para o requisito de função de grupo, você deve configurar a função para conceder as permissões necessárias e certificar-se de que a função tenha sido adicionada ao grupo. Para obter mais informações, consulte Gerenciar a função de grupo do Greengrass (console) ou Gerenciar a função de grupo do Greengrass (CLI).

Parâmetros do conector

Esse conector oferece os seguintes parâmetros:

Versions 5
DefaultDeliveryStreamArn

O ARN do stream de entrega padrão do Firehose para o qual enviar dados. O fluxo de destino pode ser substituído pela propriedade delivery_stream_arn na carga da mensagem de entrada.

nota

A função do grupo deve permitir as ações apropriadas em todos os fluxos de entrega de destino. Para ter mais informações, consulte Requisitos.

Nome de exibição no AWS IoT console: ARN do fluxo de entrega padrão

Obrigatório: true

Digite: string

Padrão válido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

O número máximo de registros e serem retidos na memória antes que os novos registros para o mesmo fluxo de entrega sejam rejeitados. O valor mínimo é 2000.

Nome de exibição no AWS IoT console: número máximo de registros em buffer (por stream)

Obrigatório: true

Digite: string

Padrão válido: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

A quantidade de memória (em KB) a ser alocada para esse conector.

Nome de exibição no AWS IoT console: Tamanho da memória

Obrigatório: true

Digite: string

Padrão válido: ^[0-9]+$

PublishInterval

O intervalo (em segundos) para publicar registros no Firehose. Para desabilitar o agrupamento em lotes, defina esse valor como 0.

Nome de exibição no AWS IoT console: intervalo de publicação

Obrigatório: true

Digite: string

Valores válidos: 0 - 900

Padrão válido: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

O modo de conteinerização para este conector. O padrão éGreengrassContainer, o que significa que o conector é executado em um ambiente de execução isolado dentro do AWS IoT Greengrass contêiner.

nota

A configuração padrão de conteinerização para o grupo não se aplica aos conectores.

Nome de exibição no AWS IoT console: modo de isolamento de contêiner

Obrigatório: false

Digite: string

Valores válidos: GreengrassContainer ou NoContainer

Padrão válido: ^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

O ARN do stream de entrega padrão do Firehose para o qual enviar dados. O fluxo de destino pode ser substituído pela propriedade delivery_stream_arn na carga da mensagem de entrada.

nota

A função do grupo deve permitir as ações apropriadas em todos os fluxos de entrega de destino. Para ter mais informações, consulte Requisitos.

Nome de exibição no AWS IoT console: ARN do fluxo de entrega padrão

Obrigatório: true

Digite: string

Padrão válido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

O número máximo de registros e serem retidos na memória antes que os novos registros para o mesmo fluxo de entrega sejam rejeitados. O valor mínimo é 2000.

Nome de exibição no AWS IoT console: número máximo de registros em buffer (por stream)

Obrigatório: true

Digite: string

Padrão válido: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

A quantidade de memória (em KB) a ser alocada para esse conector.

Nome de exibição no AWS IoT console: Tamanho da memória

Obrigatório: true

Digite: string

Padrão válido: ^[0-9]+$

PublishInterval

O intervalo (em segundos) para publicar registros no Firehose. Para desabilitar o agrupamento em lotes, defina esse valor como 0.

Nome de exibição no AWS IoT console: intervalo de publicação

Obrigatório: true

Digite: string

Valores válidos: 0 - 900

Padrão válido: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

O ARN do stream de entrega padrão do Firehose para o qual enviar dados. O fluxo de destino pode ser substituído pela propriedade delivery_stream_arn na carga da mensagem de entrada.

nota

A função do grupo deve permitir as ações apropriadas em todos os fluxos de entrega de destino. Para ter mais informações, consulte Requisitos.

Nome de exibição no AWS IoT console: ARN do fluxo de entrega padrão

Obrigatório: true

Digite: string

Padrão válido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

Exemplo de criação de conector (AWS CLI)

O seguinte comando da CLI cria um ConnectorDefinition com uma versão inicial que contém o conector .

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

No AWS IoT Greengrass console, você pode adicionar um conector na página Conectores do grupo. Para ter mais informações, consulte Conceitos básicos de conectores do Greengrass (console).

Dados de entrada

Esse conector aceita conteúdo de fluxos em tópicos MQTT e envia o conteúdo para o fluxo de entrega de destino. Ele aceita dois tipos de dados de entrada:

  • Dados JSON no tópico kinesisfirehose/message.

  • Dados binários no tópico kinesisfirehose/message/binary/#.

Versions 2 - 5
Filtro de tópico: kinesisfirehose/message

Use esse tópico para enviar uma mensagem que contenha dados JSON.

Propriedades de mensagens
request

Os dados a serem enviados para o fluxo de entrega e o fluxo de entrega de destino, se diferentes do fluxo padrão.

Obrigatório: true

Tipo: object que inclui as seguintes propriedades:

data

Os dados a serem enviados para o fluxo de entrega.

Obrigatório: true

Digite: string

delivery_stream_arn

O ARN do fluxo de entrega do Kinesis de destino. Inclua essa propriedade para substituir o fluxo de entrega padrão.

Obrigatório: false

Digite: string

Padrão válido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

Um ID arbitrário para a solicitação. Essa propriedade é usada para mapear a solicitação de entrada para uma resposta de saída. Quando especificada, a propriedade id no objeto de resposta é definida para esse valor. Se você não usa esse atributo, é possível omitir essa propriedade ou especificar uma string vazia.

Obrigatório: false

Digite: string

Padrão válido: .*

Exemplo de entrada
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

Filtro de tópico: kinesisfirehose/message/binary/#

Use esse tópico para enviar uma mensagem que contenha dados binários. O conector não analisa dados binários. Os dados são transmitidos da forma que se encontram.

Para mapear a solicitação de entrada para uma resposta de saída, substitua o curinga # na mensagem do tópico por um ID de solicitação arbitrário. Por exemplo, se você publicar uma mensagem no kinesisfirehose/message/binary/request123, a propriedade id no objeto de resposta será definida como request123.

Se você não deseja mapear uma solicitação para uma resposta, é possível publicar suas mensagens em kinesisfirehose/message/binary/. Lembre-se de incluir uma barra no final.

Version 1
Filtro de tópico: kinesisfirehose/message

Use esse tópico para enviar uma mensagem que contenha dados JSON.

Propriedades de mensagens
request

Os dados a serem enviados para o fluxo de entrega e o fluxo de entrega de destino, se diferentes do fluxo padrão.

Obrigatório: true

Tipo: object que inclui as seguintes propriedades:

data

Os dados a serem enviados para o fluxo de entrega.

Obrigatório: true

Digite: string

delivery_stream_arn

O ARN do fluxo de entrega do Kinesis de destino. Inclua essa propriedade para substituir o fluxo de entrega padrão.

Obrigatório: false

Digite: string

Padrão válido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

Um ID arbitrário para a solicitação. Essa propriedade é usada para mapear a solicitação de entrada para uma resposta de saída. Quando especificada, a propriedade id no objeto de resposta é definida para esse valor. Se você não usa esse atributo, é possível omitir essa propriedade ou especificar uma string vazia.

Obrigatório: false

Digite: string

Padrão válido: .*

Exemplo de entrada
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

Filtro de tópico: kinesisfirehose/message/binary/#

Use esse tópico para enviar uma mensagem que contenha dados binários. O conector não analisa dados binários. Os dados são transmitidos da forma que se encontram.

Para mapear a solicitação de entrada para uma resposta de saída, substitua o curinga # na mensagem do tópico por um ID de solicitação arbitrário. Por exemplo, se você publicar uma mensagem no kinesisfirehose/message/binary/request123, a propriedade id no objeto de resposta será definida como request123.

Se você não deseja mapear uma solicitação para uma resposta, é possível publicar suas mensagens em kinesisfirehose/message/binary/. Lembre-se de incluir uma barra no final.

Dados de saída

O conector publica informações de status como dados de saída em um tópico MQTT.

Versions 2 - 5
Filtro de tópico na assinatura

kinesisfirehose/message/status

Exemplo de saída

A resposta contém o status de cada registro de dados enviado no lote.

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
nota

Se o conector detectar um erro que pode ser repetido (por exemplo, erros de conexão), ele tentará publicar novamente no próximo lote. O recuo exponencial é gerenciado pelo SDK. AWS As solicitações que falham com erros repetíveis são adicionadas de volta ao final da fila para publicação.

Version 1
Filtro de tópico na assinatura

kinesisfirehose/message/status

Exemplo de resultado: sucesso
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
Exemplo de resultado: falha
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

Exemplo de uso

Use as seguintes etapas de alto nível para configurar um exemplo de função do Lambda Python 3.7 que pode ser usado para testar o conector.

nota
  1. Certifique-se de cumprir os requisitos para o conector.

    Para o requisito de função de grupo, você deve configurar a função para conceder as permissões necessárias e certificar-se de que a função tenha sido adicionada ao grupo. Para obter mais informações, consulte Gerenciar a função de grupo do Greengrass (console) ou Gerenciar a função de grupo do Greengrass (CLI).

  2. Crie e publique uma função do Lambda que envie dados de entrada para o conector.

    Salve o código de exemplo como arquivo PY. Baixe e descompacte o SDK do AWS IoT Greengrass Core para Python. Crie então um pacote zip que contenha o arquivo PY e a pasta greengrasssdk no nível raiz. Este pacote zip é o pacote de implantação que você transfere por upload para o AWS Lambda.

    Depois de criar a função do Lambda Python 3.7, publique uma versão de função e crie um alias.

  3. Configure o grupo do Greengrass.

    1. Adicione a função do Lambda pelo seu alias (recomendado). Configure o ciclo de vida do Lambda como de longa duração (ou "Pinned": true na CLI).

    2. Adicione o conector e configure seus parâmetros.

    3. Adicione assinaturas que permitam que o conector receba dados de entrada JSON e envie dados de saída em filtros de tópico compatíveis.

      • Defina a função do Lambda como origem, o conector como destino e use um filtro de tópico de entrada compatível.

      • Defina o conector como origem, o AWS IoT Core como destino, e use um filtro de tópico de saída compatível. Você usa essa assinatura para visualizar mensagens de status no AWS IoT console.

  4. Implante o grupo.

  5. No AWS IoT console, na página Teste, inscreva-se no tópico de dados de saída para ver as mensagens de status do conector. A função de exemplo do Lambda é de longa duração e começa a enviar mensagens imediatamente após o grupo ser implantado.

    Ao finalizar o teste, você pode definir o ciclo de vida do Lambda como sob demanda (ou "Pinned": false na CLI) e implantar o grupo. Isso impede o envio de mensagens pela função.

Exemplo

O exemplo a seguir da função do Lambda envia uma mensagem de entrada para o conector. Esta mensagem contém dados JSON.

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

Licenças

O conector do Kinesis Firehose inclui o seguinte licenciamento/software de terceiros:

Esse conector é liberado de acordo com o Contrato de licença de software do Greengrass Core.

Changelog

A tabela a seguir descreve as alterações em cada versão do conector.

Version (Versão)

Alterações

5

Adicionado o parâmetro IsolationMode para configurar o modo de conteinerização para o conector.

4

Atualização do runtime do Lambda para Python 3.7, o que altera o requisito de runtime.

3

Correção para reduzir o excesso de registro em log e outras correções de erros secundárias.

2

Foi adicionado suporte para enviar registros de dados em lote para o Firehose em um intervalo especificado.

  • Também requer a ação firehose:PutRecordBatch na função do grupo.

  • Novos parâmetros MemorySize, DeliveryStreamQueueSize e PublishInterval.

  • A mensagem de saída contém uma matriz de respostas de status para os registros de dados publicados.

1

Versão inicial.

Um grupo do Greengrass só pode conter uma versão do conector por vez. Para obter informações sobre como fazer upgrade de uma versão do conector, consulte Atualizar a versões do conector.

Consulte também