Kinesis Firehose - AWS IoT Greengrass

AWS IoT Greengrass Version 1 est entré dans la phase de durée de vie prolongée le 30 juin 2023. Pour plus d'informations, consultez la politique de AWS IoT Greengrass V1 maintenance. Après cette date, AWS IoT Greengrass V1 ne publiera pas de mises à jour fournissant des fonctionnalités, des améliorations, des corrections de bogues ou des correctifs de sécurité. Les appareils qui fonctionnent AWS IoT Greengrass V1 sous tension ne seront pas perturbés et continueront à fonctionner et à se connecter au cloud. Nous vous recommandons vivement de migrer vers AWS IoT Greengrass Version 2, qui ajoute de nouvelles fonctionnalités importantes et prend en charge des plateformes supplémentaires.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Kinesis Firehose

Le connecteur Kinesis Firehose publie des données via un flux de diffusion Amazon Data Firehose vers des destinations telles qu'Amazon S3, Amazon Redshift ou Amazon Service. OpenSearch

Ce connecteur est un producteur de données pour un flux de diffusion Kinesis. Il reçoit les données d'entrée dans une rubrique MQTT et envoie les données à un flux de diffusion spécifié. Le flux de diffusion envoie ensuite l'enregistrement des données à la destination configurée (par exemple, un compartiment S3).

Ce connecteur est disponible dans les versions suivantes.

Version

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

Pour obtenir des informations sur les changements apportés aux versions, veuillez consulter le Journal des modifications.

Prérequis

Ce connecteur possède les critères suivants :

Version 4 - 5
  • AWS IoT Greengrass Logiciel de base v1.9.3 ou version ultérieure.

  • Python version 3.7 ou 3.8 installé sur le périphérique principal et ajouté à la variable d'environnement PATH.

    Note

    Pour utiliser Python 3.8, exécutez la commande suivante pour créer un lien symbolique entre le dossier d'installation par défaut de Python 3.7 et les fichiers binaires Python 3.8 installés.

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    Ceci configure votre appareil de sorte qu'il réponde aux exigence de Python pour AWS IoT Greengrass.

  • Un flux de diffusion Kinesis configuré. Pour plus d'informations, consultez la section Création d'un flux de diffusion Amazon Data Firehose dans le manuel du développeur Amazon Kinesis Firehose.

  • Le rôle de groupe Greengrass est configuré pour autoriser les firehose:PutRecordBatch actions firehose:PutRecord et sur le flux de diffusion cible, comme illustré dans l'exemple de politique IAM suivant.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Ce connecteur vous permet de remplacer le flux de diffusion par défaut de façon dynamique dans la charge utile des messages d'entrée. Si votre implémentation utilise cette fonctionnalité, la politique IAM doit inclure tous les flux cibles en tant que ressources. Vous pouvez octroyer un accès précis ou conditionnel aux ressources (par exemple, en utilisant un schéma d'attribution de nom avec caractère générique *).

    Pour l'exigence de rôle de groupe, vous devez configurer le rôle de manière à accorder les autorisations requises et à vous assurer que le rôle a été ajouté au groupe. Pour plus d’informations, consultez Gestion du rôle de groupe Greengrass (console) ou Gestion du rôle de groupe Greengrass (interface de ligne de commande).

Versions 2 - 3
  • AWS IoT Greengrass Logiciel de base v1.7 ou version ultérieure.

  • Python version 2.7 installé sur le périphérique principal et ajouté à la variable d'environnement PATH.

  • Un flux de diffusion Kinesis configuré. Pour plus d'informations, consultez la section Création d'un flux de diffusion Amazon Data Firehose dans le manuel du développeur Amazon Kinesis Firehose.

  • Le rôle de groupe Greengrass est configuré pour autoriser les firehose:PutRecordBatch actions firehose:PutRecord et sur le flux de diffusion cible, comme illustré dans l'exemple de politique IAM suivant.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Ce connecteur vous permet de remplacer le flux de diffusion par défaut de façon dynamique dans la charge utile des messages d'entrée. Si votre implémentation utilise cette fonctionnalité, la politique IAM doit inclure tous les flux cibles en tant que ressources. Vous pouvez octroyer un accès précis ou conditionnel aux ressources (par exemple, en utilisant un schéma d'attribution de nom avec caractère générique *).

    Pour l'exigence de rôle de groupe, vous devez configurer le rôle de manière à accorder les autorisations requises et à vous assurer que le rôle a été ajouté au groupe. Pour plus d’informations, consultez Gestion du rôle de groupe Greengrass (console) ou Gestion du rôle de groupe Greengrass (interface de ligne de commande).

Version 1
  • AWS IoT Greengrass Logiciel de base v1.7 ou version ultérieure.

  • Python version 2.7 installé sur le périphérique principal et ajouté à la variable d'environnement PATH.

  • Un flux de diffusion Kinesis configuré. Pour plus d'informations, consultez la section Création d'un flux de diffusion Amazon Data Firehose dans le manuel du développeur Amazon Kinesis Firehose.

  • Le rôle de groupe Greengrass est configuré pour autoriser l'firehose:PutRecordaction sur le flux de diffusion cible, comme illustré dans l'exemple de politique IAM suivant.

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    Ce connecteur vous permet de remplacer le flux de diffusion par défaut de façon dynamique dans la charge utile des messages d'entrée. Si votre implémentation utilise cette fonctionnalité, la politique IAM doit inclure tous les flux cibles en tant que ressources. Vous pouvez octroyer un accès précis ou conditionnel aux ressources (par exemple, en utilisant un schéma d'attribution de nom avec caractère générique *).

    Pour l'exigence de rôle de groupe, vous devez configurer le rôle de manière à accorder les autorisations requises et à vous assurer que le rôle a été ajouté au groupe. Pour plus d’informations, consultez Gestion du rôle de groupe Greengrass (console) ou Gestion du rôle de groupe Greengrass (interface de ligne de commande).

Paramètres du connecteur

Ce connecteur fournit les paramètres suivants :

Versions 5
DefaultDeliveryStreamArn

L'ARN du flux de diffusion Firehose par défaut auquel envoyer les données. Le flux de destination peut être remplacé par la propriété delivery_stream_arn dans la charge utile du message d'entrée.

Note

Le rôle de groupe doit autoriser les actions appropriées sur toutes les flux cibles de la diffusion. Pour de plus amples informations, veuillez consulter Prérequis.

Nom d'affichage dans la AWS IoT console : ARN du flux de diffusion par défaut

Nécessaire : true

Type : string

Schéma valide : arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

Le nombre maximal d'enregistrements à conserver en mémoire avant que de nouveaux enregistrements pour le même flux de diffusion soient rejetés. La valeur minimale est de 2000.

Nom affiché dans la AWS IoT console : nombre maximum d'enregistrements à mettre en mémoire tampon (par flux)

Nécessaire : true

Type : string

Schéma valide : ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

Quantité de mémoire (en Ko) allouée à ce connecteur.

Nom affiché dans la AWS IoT console : Taille de la mémoire

Nécessaire : true

Type : string

Schéma valide : ^[0-9]+$

PublishInterval

Intervalle (en secondes) pendant lequel les enregistrements sont publiés dans Firehose. Pour désactiver la mise en lots, définissez cette valeur sur 0.

Nom d'affichage dans la AWS IoT console : Intervalle de publication

Nécessaire : true

Type : string

Valeurs valides : 0 - 900

Schéma valide : [0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

Mode conteneurisation de ce connecteur. La valeur par défaut estGreengrassContainer, ce qui signifie que le connecteur s'exécute dans un environnement d'exécution isolé à l'intérieur du AWS IoT Greengrass conteneur.

Note

Le paramètre de conteneurisation par défaut pour le groupe ne s'applique pas aux connecteurs.

Nom affiché dans la AWS IoT console : mode d'isolation du conteneur

Nécessaire : false

Type : string

Valeurs valides : GreengrassContainer ou NoContainer

Schéma valide : ^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

L'ARN du flux de diffusion Firehose par défaut auquel envoyer les données. Le flux de destination peut être remplacé par la propriété delivery_stream_arn dans la charge utile du message d'entrée.

Note

Le rôle de groupe doit autoriser les actions appropriées sur toutes les flux cibles de la diffusion. Pour de plus amples informations, veuillez consulter Prérequis.

Nom d'affichage dans la AWS IoT console : ARN du flux de diffusion par défaut

Nécessaire : true

Type : string

Schéma valide : arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

Le nombre maximal d'enregistrements à conserver en mémoire avant que de nouveaux enregistrements pour le même flux de diffusion soient rejetés. La valeur minimale est de 2000.

Nom affiché dans la AWS IoT console : nombre maximum d'enregistrements à mettre en mémoire tampon (par flux)

Nécessaire : true

Type : string

Schéma valide : ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

Quantité de mémoire (en Ko) allouée à ce connecteur.

Nom affiché dans la AWS IoT console : Taille de la mémoire

Nécessaire : true

Type : string

Schéma valide : ^[0-9]+$

PublishInterval

Intervalle (en secondes) pendant lequel les enregistrements sont publiés dans Firehose. Pour désactiver la mise en lots, définissez cette valeur sur 0.

Nom d'affichage dans la AWS IoT console : Intervalle de publication

Nécessaire : true

Type : string

Valeurs valides : 0 - 900

Schéma valide : [0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

L'ARN du flux de diffusion Firehose par défaut auquel envoyer les données. Le flux de destination peut être remplacé par la propriété delivery_stream_arn dans la charge utile du message d'entrée.

Note

Le rôle de groupe doit autoriser les actions appropriées sur toutes les flux cibles de la diffusion. Pour de plus amples informations, veuillez consulter Prérequis.

Nom d'affichage dans la AWS IoT console : ARN du flux de diffusion par défaut

Nécessaire : true

Type : string

Schéma valide : arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

Exemple de création d'un connecteur (AWS CLI)

La commande CLI suivante crée un ConnectorDefinition avec une version initiale contenant le connecteur.

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

Dans la AWS IoT Greengrass console, vous pouvez ajouter un connecteur depuis la page Connecteurs du groupe. Pour de plus amples informations, veuillez consulter Mise en route avec les connecteurs Greengrass (console).

Données d'entrée

Ce connecteur accepte de diffuser le contenu sur les rubriques MQTT, puis renvoie le contenu vers le flux de diffusion cible. Il accepte deux types de données d'entrée :

  • les données JSON dans la rubrique kinesisfirehose/message.

  • les données binaires dans la rubrique kinesisfirehose/message/binary/#.

Versions 2 - 5
Filtre de rubriques : kinesisfirehose/message

Utilisez cette rubrique pour envoyer un message qui contient des données JSON.

Propriétés des messages
request

Données à envoyer au flux de diffusion et au flux de diffusion cible, s'il est différent du flux par défaut.

Nécessaire : true

Type : object qui inclut les propriétés suivantes :

data

Données à envoyer au flux de diffusion.

Nécessaire : true

Type : string

delivery_stream_arn

L'ARN du flux de diffusion Kinesis cible. Incluez cette propriété pour remplacer le flux de diffusion par défaut.

Nécessaire : false

Type : string

Schéma valide : arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

ID arbitraire de la demande. Cette propriété est utilisée pour mapper une demande d'entrée à une réponse de sortie. Lorsque spécifiée, la propriété id dans l'objet de réponse est définie sur cette valeur. Si vous n'utilisez pas cette fonctionnalité, vous pouvez omettre cette propriété ou spécifier une chaîne vide.

Nécessaire : false

Type : string

Schéma valide : .*

Exemple d'entrée
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

Filtre de rubriques : kinesisfirehose/message/binary/#

Utilisez cette rubrique pour envoyer un message qui contient des données binaires. Le connecteur n'analyse pas les données binaires. Les données sont diffusées en l'état.

Pour mapper la demande d'entrée à une réponse de sortie, remplacez le caractère générique # dans la rubrique du message par un ID de demande arbitraire. Par exemple, si vous publiez un message dans kinesisfirehose/message/binary/request123, la propriété id dans l'objet de réponse est définie sur request123.

Si vous ne souhaitez pas mapper une demande à une réponse, vous pouvez publier vos messages dans kinesisfirehose/message/binary/. Veillez à inclure la barre oblique de fin (/).

Version 1
Filtre de rubriques : kinesisfirehose/message

Utilisez cette rubrique pour envoyer un message qui contient des données JSON.

Propriétés des messages
request

Données à envoyer au flux de diffusion et au flux de diffusion cible, s'il est différent du flux par défaut.

Nécessaire : true

Type : object qui inclut les propriétés suivantes :

data

Données à envoyer au flux de diffusion.

Nécessaire : true

Type : string

delivery_stream_arn

L'ARN du flux de diffusion Kinesis cible. Incluez cette propriété pour remplacer le flux de diffusion par défaut.

Nécessaire : false

Type : string

Schéma valide : arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

ID arbitraire de la demande. Cette propriété est utilisée pour mapper une demande d'entrée à une réponse de sortie. Lorsque spécifiée, la propriété id dans l'objet de réponse est définie sur cette valeur. Si vous n'utilisez pas cette fonctionnalité, vous pouvez omettre cette propriété ou spécifier une chaîne vide.

Nécessaire : false

Type : string

Schéma valide : .*

Exemple d'entrée
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

Filtre de rubriques : kinesisfirehose/message/binary/#

Utilisez cette rubrique pour envoyer un message qui contient des données binaires. Le connecteur n'analyse pas les données binaires. Les données sont diffusées en l'état.

Pour mapper la demande d'entrée à une réponse de sortie, remplacez le caractère générique # dans la rubrique du message par un ID de demande arbitraire. Par exemple, si vous publiez un message dans kinesisfirehose/message/binary/request123, la propriété id dans l'objet de réponse est définie sur request123.

Si vous ne souhaitez pas mapper une demande à une réponse, vous pouvez publier vos messages dans kinesisfirehose/message/binary/. Veillez à inclure la barre oblique de fin (/).

Données de sortie

Ce connecteur publie des informations d'état sous forme de données de sortie dans une rubrique MQTT.

Versions 2 - 5
Filtre de rubrique dans l'abonnement

kinesisfirehose/message/status

Exemple de sortie

La réponse contient le statut de chaque enregistrement de données envoyé dans le lot.

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
Note

Si le connecteur détecte une erreur réessayable (par exemple, des erreurs de connexion), il réessaie de publier dans le lot suivant. Le recul exponentiel est géré par le AWS SDK. Les demandes qui échouent avec des erreurs pouvant être retentées sont ajoutées à la fin de la file d'attente pour la publication.

Version 1
Filtre de rubrique dans l'abonnement

kinesisfirehose/message/status

Exemple de sortie : réussite
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
Exemple de sortie : échec
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

Exemple d'utilisation

Suivez les étapes de haut niveau suivantes pour configurer un exemple de fonction Lambda en Python 3.7 que vous pouvez utiliser pour tester le connecteur.

Note
  1. Veillez à répondre aux conditions requises pour le connecteur.

    Pour l'exigence de rôle de groupe, vous devez configurer le rôle de manière à accorder les autorisations requises et à vous assurer que le rôle a été ajouté au groupe. Pour plus d’informations, consultez Gestion du rôle de groupe Greengrass (console) ou Gestion du rôle de groupe Greengrass (interface de ligne de commande).

  2. Créez et publiez une fonction Lambda qui envoie des données d'entrée au connecteur.

    Enregistrez l'exemple de code en tant que fichier PY. Téléchargez et décompressez le SDK AWS IoT Greengrass de base pour Python. Ensuite, créez un package zip contenant le fichier PY et le dossier greengrasssdk au niveau racine. Ce package zip est le package de déploiement vers lequel vous effectuez le téléchargement AWS Lambda.

    Après avoir créé la fonction Lambda de Python 3.7, publiez une version de la fonction et créez un alias.

  3. Configurez votre groupe Greengrass.

    1. Ajoutez la fonction Lambda par son alias (recommandé). Configurez le cycle de vie Lambda comme étant de longue durée (ou dans "Pinned": true la CLI).

    2. Ajoutez le connecteur et configurez ses paramètres.

    3. Ajoutez des abonnements qui permettent au connecteur de recevoir des données d'entrée JSON et d'envoyer des données de sortie sur des filtres de rubrique pris en charge.

      • Définissez la fonction Lambda comme source, le connecteur comme cible et utilisez un filtre de rubrique d'entrée compatible.

      • Définissez le connecteur en tant que source, AWS IoT Core en tant que cible et utilisez un filtre de rubrique de sortie pris en charge. Vous utilisez cet abonnement pour afficher les messages d'état dans la AWS IoT console.

  4. Déployez le groupe.

  5. Dans la AWS IoT console, sur la page Test, abonnez-vous à la rubrique relative aux données de sortie pour consulter les messages d'état provenant du connecteur. L'exemple de fonction Lambda a une longue durée de vie et commence à envoyer des messages immédiatement après le déploiement du groupe.

    Lorsque vous avez terminé les tests, vous pouvez définir le cycle de vie Lambda à la demande (ou "Pinned": false dans la CLI) et déployer le groupe. Cela empêche la fonction d'envoyer des messages.

Exemple

L'exemple suivant de fonction Lambda envoie un message d'entrée au connecteur. Ce message contient des données JSON.

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

Licences

Le connecteur Kinesis Firehose inclut les logiciels/licences tiers suivants :

Ce connecteur est publié dans le cadre du contrat de licence logicielle Greengrass Core.

Journal des modifications

Le tableau suivant décrit les modifications apportées à chaque version du connecteur.

Version

Modifications

5

Ajout du paramètre IsolationMode pour configurer le mode de conteneurisation du connecteur.

4

Mise à niveau de l'environnement d'exécution Lambda vers Python 3.7, ce qui modifie les exigences d'exécution.

3

Correctif permettant de réduire la journalisation excessive et d'autres correctifs de bogues mineurs.

2

Ajout du support pour l'envoi d'enregistrements de données par lots à Firehose à un intervalle spécifié.

  • Requiert également l'action firehose:PutRecordBatch dans le rôle de groupe.

  • Nouveaux paramètres MemorySize, DeliveryStreamQueueSize et PublishInterval.

  • Le message de sortie contient un tableau des réponses d'état pour les enregistrements de données publiés.

1

Première version.

Un groupe Greengrass ne peut contenir qu'une seule version du connecteur à la fois. Pour de plus amples informations sur la mise à niveau d'une version de connecteur, veuillez consulter Mise à niveau des versions du connecteur.

Consultez aussi