Kinesis Firehose - AWS IoT Greengrass

AWS IoT Greengrass Version 1 è entrato nella fase di estensione della vita utile il 30 giugno 2023. Per ulteriori informazioni, consulta la politica AWS IoT Greengrass V1 di manutenzione. Dopo questa data, AWS IoT Greengrass V1 non rilascerà aggiornamenti che forniscano funzionalità, miglioramenti, correzioni di bug o patch di sicurezza. I dispositivi che funzionano AWS IoT Greengrass V1 non subiranno interruzioni e continueranno a funzionare e a connettersi al cloud. Ti consigliamo vivamente di eseguire la migrazione a AWS IoT Greengrass Version 2, che aggiunge nuove importanti funzionalità e supporto per piattaforme aggiuntive.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Kinesis Firehose

Il connettore Kinesis Firehose pubblica i dati tramite un flusso di distribuzione Amazon Data Firehose verso destinazioni come Amazon S3, Amazon Redshift o Amazon Service. OpenSearch

Questo connettore è un produttore di dati per un flusso di distribuzione Kinesis. Riceve i dati di input in un argomento MQTT e invia i dati al flusso di distribuzione specificato. Il flusso di distribuzione invia il record dei dati alla destinazione configurata (per esempio, un bucket S3).

Questo connettore ha le seguenti versioni.

Versione

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

Per informazioni sulle modifiche di ogni versione, consulta Changelog.

Requisiti

Questo connettore presenta i seguenti requisiti:

Version 4 - 5
  • AWS IoT Greengrass Software principale v1.9.3 o successivo.

  • Python versione 3.7 o 3.8 installata sul dispositivo principale e aggiunta alla variabile di ambiente PATH.

    Nota

    Per usare Python 3.8, esegui il seguente comando per creare un collegamento simbolico dalla cartella di installazione predefinita di Python 3.7 ai binari Python 3.8 installati.

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    Questo configura il dispositivo in modo che soddisfi il requisito Python per AWS IoT Greengrass.

  • Un flusso di distribuzione Kinesis configurato. Per ulteriori informazioni, consulta Creazione di un flusso di distribuzione di Amazon Data Firehose nella Amazon Kinesis Firehose Developer Guide.

  • Il ruolo del gruppo Greengrass è configurato per consentire le firehose:PutRecordBatch azioni firehose:PutRecord e sul flusso di consegna di destinazione, come mostrato nel seguente esempio di politica IAM.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Questo connettore consente di sostituire dinamicamente il flusso di distribuzione predefinito nel payload del messaggio di input. Se l'implementazione utilizza questa funzionalità, la policy IAM dovrebbe includere tutti i flussi di destinazione come risorse. Puoi concedere alle risorse un accesso granulare o condizionale (ad esempio, utilizzando uno schema di denominazione con il carattere jolly *).

    Per il requisito del ruolo di gruppo, è necessario configurare il ruolo in modo da concedere le autorizzazioni necessarie e assicurarsi che il ruolo sia stato aggiunto al gruppo. Per ulteriori informazioni, consulta Gestione del ruolo del gruppo Greengrass (console) o Gestione del ruolo del gruppo Greengrass (CLI).

Versions 2 - 3
  • AWS IoT Greengrass Software di base v1.7 o successivo.

  • Python versione 2.7 installato sul dispositivo principale e aggiunto alla variabile di ambiente PATH.

  • Un flusso di distribuzione Kinesis configurato. Per ulteriori informazioni, consulta Creazione di un flusso di distribuzione di Amazon Data Firehose nella Amazon Kinesis Firehose Developer Guide.

  • Il ruolo del gruppo Greengrass è configurato per consentire le firehose:PutRecordBatch azioni firehose:PutRecord e sul flusso di consegna di destinazione, come mostrato nel seguente esempio di politica IAM.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Questo connettore consente di sostituire dinamicamente il flusso di distribuzione predefinito nel payload del messaggio di input. Se l'implementazione utilizza questa funzionalità, la policy IAM dovrebbe includere tutti i flussi di destinazione come risorse. Puoi concedere alle risorse un accesso granulare o condizionale (ad esempio, utilizzando uno schema di denominazione con il carattere jolly *).

    Per il requisito del ruolo di gruppo, è necessario configurare il ruolo in modo da concedere le autorizzazioni necessarie e assicurarsi che il ruolo sia stato aggiunto al gruppo. Per ulteriori informazioni, consulta Gestione del ruolo del gruppo Greengrass (console) o Gestione del ruolo del gruppo Greengrass (CLI).

Version 1
  • AWS IoT Greengrass Software di base v1.7 o successivo.

  • Python versione 2.7 installato sul dispositivo principale e aggiunto alla variabile di ambiente PATH.

  • Un flusso di distribuzione Kinesis configurato. Per ulteriori informazioni, consulta Creazione di un flusso di distribuzione di Amazon Data Firehose nella Amazon Kinesis Firehose Developer Guide.

  • Il ruolo del gruppo Greengrass è configurato per consentire l'firehose:PutRecordazione sul flusso di consegna di destinazione, come mostrato nel seguente esempio di politica IAM.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Questo connettore consente di sostituire dinamicamente il flusso di distribuzione predefinito nel payload del messaggio di input. Se l'implementazione utilizza questa funzionalità, la policy IAM dovrebbe includere tutti i flussi di destinazione come risorse. Puoi concedere alle risorse un accesso granulare o condizionale (ad esempio, utilizzando uno schema di denominazione con il carattere jolly *).

    Per il requisito del ruolo di gruppo, è necessario configurare il ruolo in modo da concedere le autorizzazioni necessarie e assicurarsi che il ruolo sia stato aggiunto al gruppo. Per ulteriori informazioni, consulta Gestione del ruolo del gruppo Greengrass (console) o Gestione del ruolo del gruppo Greengrass (CLI).

Parametri del connettore

Questo connettore fornisce i seguenti parametri:

Versions 5
DefaultDeliveryStreamArn

L'ARN del flusso di distribuzione Firehose predefinito a cui inviare i dati. Il flusso di destinazione può essere ignorato con la proprietà delivery_stream_arn del payload del messaggio di input.

Nota

Il ruolo del gruppo deve consentire le operazioni appropriate su tutti i flussi di distribuzione di destinazione. Per ulteriori informazioni, consulta Requisiti.

Nome visualizzato nella AWS IoT console: flusso di distribuzione predefinito ARN

Obbligatorio: true

Tipo: string

Schema valido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

Il numero massimo di record da conservare in memoria prima che vengano rifiutati nuovi record per lo stesso flusso di distribuzione. Il valore minimo è 2000.

Nome visualizzato nella AWS IoT console: numero massimo di record da memorizzare nel buffer (per stream)

Obbligatorio: true

Tipo: string

Schema valido: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

La quantità di memoria (in KB) da allocare al connettore.

Nome visualizzato nella AWS IoT console: dimensione della memoria

Obbligatorio: true

Tipo: string

Schema valido: ^[0-9]+$

PublishInterval

L'intervallo (in secondi) per la pubblicazione dei record su Firehose. Per disabilitare il batch, impostare questo valore su 0.

Nome visualizzato nella AWS IoT console: Intervallo di pubblicazione

Obbligatorio: true

Tipo: string

Valori validi: 0 - 900

Schema valido: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

Modalità di containerizzazione per questo connettore. L'impostazione predefinita èGreengrassContainer, il che significa che il connettore viene eseguito in un ambiente di runtime isolato all'interno del AWS IoT Greengrass contenitore.

Nota

L'impostazione predefinita della containerizzazione per il gruppo non si applica ai connettori.

Nome visualizzato nella AWS IoT console: modalità di isolamento del contenitore

Obbligatorio: false

Tipo: string

Valori validi: GreengrassContainer o NoContainer

Schema valido: ^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

L'ARN del flusso di distribuzione Firehose predefinito a cui inviare i dati. Il flusso di destinazione può essere ignorato con la proprietà delivery_stream_arn del payload del messaggio di input.

Nota

Il ruolo del gruppo deve consentire le operazioni appropriate su tutti i flussi di distribuzione di destinazione. Per ulteriori informazioni, consulta Requisiti.

Nome visualizzato nella AWS IoT console: flusso di distribuzione predefinito ARN

Obbligatorio: true

Tipo: string

Schema valido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

Il numero massimo di record da conservare in memoria prima che vengano rifiutati nuovi record per lo stesso flusso di distribuzione. Il valore minimo è 2000.

Nome visualizzato nella AWS IoT console: numero massimo di record da memorizzare nel buffer (per stream)

Obbligatorio: true

Tipo: string

Schema valido: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

La quantità di memoria (in KB) da allocare al connettore.

Nome visualizzato nella AWS IoT console: dimensione della memoria

Obbligatorio: true

Tipo: string

Schema valido: ^[0-9]+$

PublishInterval

L'intervallo (in secondi) per la pubblicazione dei record su Firehose. Per disabilitare il batch, impostare questo valore su 0.

Nome visualizzato nella AWS IoT console: Intervallo di pubblicazione

Obbligatorio: true

Tipo: string

Valori validi: 0 - 900

Schema valido: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

L'ARN del flusso di distribuzione Firehose predefinito a cui inviare i dati. Il flusso di destinazione può essere ignorato con la proprietà delivery_stream_arn del payload del messaggio di input.

Nota

Il ruolo del gruppo deve consentire le operazioni appropriate su tutti i flussi di distribuzione di destinazione. Per ulteriori informazioni, consulta Requisiti.

Nome visualizzato nella AWS IoT console: flusso di distribuzione predefinito ARN

Obbligatorio: true

Tipo: string

Schema valido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

Esempio di creazione di connettore (AWS CLI)

Il seguente comando CLI crea un ConnectorDefinition con una versione iniziale che contiene il connettore.

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

Nella AWS IoT Greengrass console, è possibile aggiungere un connettore dalla pagina Connettori del gruppo. Per ulteriori informazioni, consulta Nozioni di base sui connettori Greengrass (console).

Dati di input

Questo connettore accetta i contenuti in streaming negli argomenti MQTT e quindi invia i contenuti nel flusso di distribuzione di destinazione. Accetta due tipi di dati di input:

  • Dati JSON nell'argomento kinesisfirehose/message.

  • Dati binari nell'argomento kinesisfirehose/message/binary/#.

Versions 2 - 5
Filtro di argomenti: kinesisfirehose/message

Utilizzare questo argomento per inviare un messaggio contenente dati JSON.

Proprietà dei messaggi
request

I dati da inviare al flusso di distribuzione e al flusso di distribuzione di destinazione, se diverso da quello predefinito.

Obbligatorio: true

Tipo: object che include le seguenti proprietà:

data

I dati da inviare al flusso di distribuzione.

Obbligatorio: true

Tipo: string

delivery_stream_arn

L'ARN del flusso di distribuzione Kinesis di destinazione. Includi questa proprietà per sovrascrivere il flusso di distribuzione predefinito.

Richiesto: false

Tipo: string

Schema valido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

Un ID arbitrario della richiesta. Questa proprietà viene utilizzata per associare una richiesta di input a una risposta di output. Quando specificato, la proprietà id nell'oggetto della risposta è impostata su questo valore. Se non utilizzi questa funzione, puoi omettere la proprietà oppure specificare una stringa vuota.

Richiesto: false

Tipo: string

Schema valido: .*

Input di esempio
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

Filtro di argomenti: kinesisfirehose/message/binary/#

Utilizzare questo argomento per inviare un messaggio contenente dati binari. Il connettore non analizza i dati binari. I dati sono trasferiti in streaming così come sono.

Per associare la richiesta di input a una risposta di output, sostituisci il carattere jolly # nell'argomento del messaggio con un ID richiesta arbitrario. Ad esempio, se pubblichi un messaggio in kinesisfirehose/message/binary/request123, la proprietà id nell'oggetto di risposta viene impostata su request123.

Se non desideri associare una richiesta a una risposta, puoi pubblicare i messaggi in kinesisfirehose/message/binary/. Assicurarsi di includere la barra finale.

Version 1
Filtro di argomenti: kinesisfirehose/message

Utilizzare questo argomento per inviare un messaggio contenente dati JSON.

Proprietà dei messaggi
request

I dati da inviare al flusso di distribuzione e al flusso di distribuzione di destinazione, se diverso da quello predefinito.

Richiesto: true

Tipo: object che include le seguenti proprietà:

data

I dati da inviare al flusso di distribuzione.

Obbligatorio: true

Tipo: string

delivery_stream_arn

L'ARN del flusso di distribuzione Kinesis di destinazione. Includi questa proprietà per sovrascrivere il flusso di distribuzione predefinito.

Richiesto: false

Tipo: string

Schema valido: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

Un ID arbitrario della richiesta. Questa proprietà viene utilizzata per associare una richiesta di input a una risposta di output. Quando specificato, la proprietà id nell'oggetto della risposta è impostata su questo valore. Se non utilizzi questa funzione, puoi omettere la proprietà oppure specificare una stringa vuota.

Richiesto: false

Tipo: string

Schema valido: .*

Input di esempio
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

Filtro di argomenti: kinesisfirehose/message/binary/#

Utilizzare questo argomento per inviare un messaggio contenente dati binari. Il connettore non analizza i dati binari. I dati sono trasferiti in streaming così come sono.

Per associare la richiesta di input a una risposta di output, sostituisci il carattere jolly # nell'argomento del messaggio con un ID richiesta arbitrario. Ad esempio, se pubblichi un messaggio in kinesisfirehose/message/binary/request123, la proprietà id nell'oggetto di risposta viene impostata su request123.

Se non desideri associare una richiesta a una risposta, puoi pubblicare i messaggi in kinesisfirehose/message/binary/. Assicurarsi di includere la barra finale.

Dati di output

Questo connettore pubblica le informazioni di stato come dati di output su un argomento MQTT.

Versions 2 - 5
Filtro argomento in sottoscrizione

kinesisfirehose/message/status

Output di esempio

La risposta contiene lo stato di ogni record di dati inviati nel batch.

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
Nota

Se il connettore rileva un errore riutilizzabile (ad esempio errori di connessione), riprova la pubblicazione nel batch successivo. Il backoff esponenziale viene gestito dall'SDK. AWS Le richieste respinte con errori riproducibili vengono aggiunti alla fine della coda per ulteriore pubblicazione.

Version 1
Filtro argomento in sottoscrizione

kinesisfirehose/message/status

Output di esempio: Operazione riuscita
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
Esempio di output: Errore
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

Esempio di utilizzo

Usa i seguenti passaggi di alto livello per configurare una funzione Lambda di esempio di Python 3.7 che puoi usare per provare il connettore.

Nota
  1. Assicurarsi di soddisfare i requisiti per il connettore.

    Per il requisito del ruolo di gruppo, è necessario configurare il ruolo in modo da concedere le autorizzazioni necessarie e assicurarsi che il ruolo sia stato aggiunto al gruppo. Per ulteriori informazioni, consulta Gestione del ruolo del gruppo Greengrass (console) o Gestione del ruolo del gruppo Greengrass (CLI).

  2. Crea e pubblica una funzione Lambda che invia dati di input al connettore.

    Salvare il codice di esempio come file PY. Scarica e decomprimi il AWS IoT Greengrass Core SDK per Python. Quindi, crea un pacchetto zip che contiene il file PY e la cartella greengrasssdk a livello root. Questo pacchetto zip è il pacchetto di distribuzione in cui carichi. AWS Lambda

    Dopo aver creato la funzione Python 3.7 Lambda, pubblica una versione della funzione e crea un alias.

  3. Configurare il gruppo Greengrass.

    1. Aggiungi la funzione Lambda tramite il relativo alias (consigliato). Configura il ciclo di vita Lambda come longevo (o nella "Pinned": true CLI).

    2. Aggiungere il connettore e configurarne i relativi parametri.

    3. Aggiungere sottoscrizioni che consentono al connettore di ricevere i dati di input JSON e inviare i dati di output nei filtri degli argomenti supportati.

      • Imposta la funzione Lambda come origine, il connettore come destinazione e utilizza un filtro per argomenti di input supportato.

      • Imposta il connettore come origine, AWS IoT Core come destinazione e utilizza un filtro per l’argomento di output supportato. Utilizzi questo abbonamento per visualizzare i messaggi di stato nella AWS IoT console.

  4. Distribuisci il gruppo.

  5. Nella AWS IoT console, nella pagina Test, sottoscrivi l'argomento relativo ai dati di output per visualizzare i messaggi di stato dal connettore. La funzione Lambda di esempio è di lunga durata e inizia a inviare messaggi subito dopo l'implementazione del gruppo.

    Al termine del test, puoi impostare il ciclo di vita Lambda su richiesta (o nella CLI) e "Pinned": false distribuire il gruppo. Ciò impedisce alla funzione di inviare messaggi.

Esempio

L'esempio seguente della funzione Lambda invia un messaggio di input al connettore. Questo messaggio contiene dati JSON.

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

Licenze

Il connettore Kinesis Firehose include i seguenti software/licenze di terze parti:

Questo connettore è rilasciato ai sensi del contratto di licenza del software Greengrass Core.

Changelog

La tabella seguente descrive le modifiche apportate a ciascuna versione del connettore.

Versione

Modifiche

5

Aggiunto il parametro IsolationMode per configurare la modalità di containerizzazione per il connettore.

4

È stato aggiornato il runtime Lambda a Python 3.7, che modifica i requisiti di runtime.

3

Correzione per ridurre la registrazione eccessiva e altre correzioni di bug minori.

2

È stato aggiunto il supporto per l'invio di record di dati in batch a Firehose a intervalli specificati.

  • Nel ruolo del gruppo è necessaria anche l'operazione firehose:PutRecordBatch.

  • Nuovi parametri MemorySize, DeliveryStreamQueueSize e PublishInterval.

  • Il messaggio di output contiene una serie di risposte di stato per i record di dati pubblicati.

1

Versione iniziale.

Un gruppo Greengrass può contenere una sola versione del connettore alla volta. Per informazioni sull'aggiornamento di una versione del connettore, consulta Aggiornamento delle versioni dei connettori.

Consulta anche