Utiliser StreamManagerClient pour travailler avec des flux - AWS IoT Greengrass

AWS IoT Greengrass Version 1 est entré dans la phase de durée de vie prolongée le 30 juin 2023. Pour plus d'informations, consultez la politique de AWS IoT Greengrass V1 maintenance. Après cette date, AWS IoT Greengrass V1 ne publiera pas de mises à jour fournissant des fonctionnalités, des améliorations, des corrections de bogues ou des correctifs de sécurité. Les appareils qui fonctionnent AWS IoT Greengrass V1 sous tension ne seront pas perturbés et continueront à fonctionner et à se connecter au cloud. Nous vous recommandons vivement de migrer vers AWS IoT Greengrass Version 2, qui ajoute de nouvelles fonctionnalités importantes et prend en charge des plateformes supplémentaires.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser StreamManagerClient pour travailler avec des flux

Fonctions Lambda définies par l'utilisateur exécutées sur leAWS IoT GreengrassCore peut utiliser leStreamManagerClientdans leAWS IoT GreengrassKit SDK Corepour créer des flux dansgestionnaire de fluxpuis interagissez avec les flux. Lorsqu'une fonction Lambda crée un flux, elle définit le paramètreAWS Clouddestinations, hiérarchisation et autres stratégies d'exportation et de conservation des données pour le flux. Pour envoyer des données au gestionnaire de flux, les fonctions Lambda ajoutent les données au flux. Si une destination d'exportation est définie pour le flux, le gestionnaire de flux exporte automatiquement le flux.

Note

Généralement, les clients du gestionnaire de flux sont des fonctions Lambda définies par l'utilisateur. Si votre analyse de rentabilisation l'exige, vous pouvez également autoriser les processus non Lambda exécutés sur le noyau Greengrass (par exemple, un conteneur Docker) à interagir avec le gestionnaire de flux. Pour plus d'informations, consultez Authentification client.

Les extraits de cette rubrique vous montrent comment les clients appellent.StreamManagerClientméthodes pour travailler avec des flux Pour obtenir des détails d'implémentation concernant les méthodes et leurs arguments, utilisez les liens vers la référence SDK répertoriée après chaque extrait de code. Pour obtenir des didacticiels comprenant une fonction Python Lambda complète, reportez-vous à la sectionExportation de flux de données versAWS Cloud(console)ouExportation de flux de données vers leAWS Cloud(CLI).

Votre fonction Lambda doit instancierStreamManagerClienten dehors du gestionnaire de fonctions Si la fonction est instanciée dans le gestionnaire, elle crée un client et une connexion au gestionnaire de flux chaque fois qu'elle est appelée.

Note

Si vous effectuez une instanciation StreamManagerClient dans le gestionnaire, vous devez appeler explicitement la méthode close() lorsque le client termine son travail. Sinon, le client maintient la connexion ouverte et un autre thread actif jusqu'à ce que le script se termine.

StreamManagerClient prend en charge les opérations suivantes :

Créer un flux de messages

Pour créer un flux, une fonction Lambda définie par l'utilisateur appelle la méthode create et transmet unMessageStreamDefinitionobjet. Cet objet spécifie le nom unique du flux et définit la façon dont le gestionnaire de flux doit gérer les nouvelles données lorsque la taille maximale du flux est atteinte. Vous pouvez utiliser MessageStreamDefinition et ses types de données (tels que ExportDefinition, StrategyOnFull et Persistence) pour définir d'autres propriétés de flux. Il s'agit des licences suivantes :

  • cibleAWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWise, et les destinations Amazon S3 pour les exportations automatiques. Pour plus d'informations, consultez Configurations d'exportation pour prises en chargeAWS Clouddestinations.

  • Exportez la priorité. Le gestionnaire de flux exporte les flux de priorité supérieure avant les flux de priorité inférieure.

  • Taille maximale du lot et intervalle de lot pourAWS IoT Analytics, Kinesis Data Streams etAWS IoT SiteWisedestinations. Le gestionnaire de flux exporte les messages lorsque l'une ou l'autre des conditions est remplie.

  • Durée de vie (TTL). Temps nécessaire pour garantir que les données du flux sont disponibles pour le traitement. Vous devez vous assurer que les données peuvent être consommées pendant cette période. Il ne s'agit pas d'une stratégie de suppression. Les données peuvent ne pas être supprimées immédiatement après la période de TTL.

  • Persistance des flux. Choisissez d'enregistrer les flux dans le système de fichiers afin de conserver les données lors des redémarrages du noyau ou d'enregistrer les flux en mémoire.

  • Numéro de séquence de début Spécifiez le numéro de séquence du message à utiliser comme message de départ dans l'exportation.

Pour plus d'informations surMessageStreamDefinition, consultez la référence SDK de votre langue cible :

Note

StreamManagerClientfournit également une destination cible que vous pouvez utiliser pour exporter des flux vers un serveur HTTP. Cette cible n'est destinée qu'à des fins de test. Il n'est pas stable ni pris en charge pour une utilisation dans des environnements de production.

Après la création d'un flux, vos fonctions Lambda peuventajouter des messagesvers le flux pour envoyer des données pour exportation etLire des messagesdepuis le flux pour le traitement local. Le nombre de flux que vous créez dépend de vos capacités matérielles et de votre analyse de rentabilisation. Une stratégie consiste à créer un flux pour chaque canal cible dansAWS IoT Analyticsou Kinesis, bien que vous puissiez définir plusieurs cibles pour un flux. Un flux a un cycle de vie durable.

Prérequis

Cette opération possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Note

Création de flux avec unAWS IoT SiteWiseou la destination d'exportation Amazon S3 possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.11.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.6.0 | Java : 1.5.0 | Node.js : 1.7.0

Exemples

L'extrait de code suivant crée un flux nommé StreamName. Il définit les propriétés de flux dans leMessageStreamDefinitionet les types de données subordonnés.

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :create_message_stream|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :createMessageStream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :createMessageStream|MessageStreamDefinition

Pour plus d'informations sur la configuration des destinations d'exportation, consultezConfigurations d'exportation pour prises en chargeAWS Clouddestinations.

 

Ajouter un message

Pour envoyer des données vers le gestionnaire de flux pour exportation, vos fonctions Lambda ajoutent les données au flux cible. La destination d'exportation détermine le type de données à transmettre à cette méthode.

Prérequis

Cette opération est soumise aux exigences suivantes :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Note

Ajout de messages avec unAWS IoT SiteWiseou la destination d'exportation Amazon S3 possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.11.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.6.0 | Java : 1.5.0 | Node.js : 1.7.0

Exemples

AWS IoT Analyticsou des destinations d'exportation Kinesis Data Streams

L'extrait de code suivant ajoute un message au flux nommé StreamName. PourAWS IoT Analyticsou les destinations Kinesis Data Streams, vos fonctions Lambda ajoutent un blob de données.

Cet extrait de code possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :append_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :appendMessage

AWS IoT SiteWisedestinations d'exportation

L'extrait de code suivant ajoute un message au flux nommé StreamName. PourAWS IoT SiteWisedestinations, vos fonctions Lambda ajoutent un objet sérialiséPutAssetPropertyValueEntryobjet. Pour plus d'informations, consultez Exportation vers AWS IoT SiteWise.

Note

Lorsque vous envoyez des données àAWS IoT SiteWise, vos données doivent répondre aux exigences duBatchPutAssetPropertyValueaction. Pour de plus amples informations, veuillez consulter BatchPutasSetPropertyValue dans la référence de l'API AWS IoT SiteWise.

Cet extrait de code possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.11.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.6.0 | Java : 1.5.0 | Node.js : 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :append_message|PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :appendMessage|PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :appendMessage|PutAssetPropertyValueEntry

Destinations d'exportation Amazon S3

L'extrait de code suivant ajoute une tâche d'exportation au flux nomméStreamName. Pour les destinations Amazon S3, vos fonctions Lambda ajoutent un objet sérialiséS3ExportTaskDefinitionqui contient les informations sur le fichier d'entrée source et l'objet Amazon S3 cible. Si l'objet spécifié n'existe pas, Stream Manager le crée pour vous. Pour plus d'informations, consultez Exporter vers Amazon S3.

Cet extrait de code possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.11.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.6.0 | Java : 1.5.0 | Node.js : 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :append_message|Définition de la tâche d'exportation S3

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :appendMessage|Définition de la tâche d'exportation S3

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :appendMessage|Définition de la tâche d'exportation S3

 

Lire des messages

Lire des messages à partir d'un flux.

Prérequis

Cette opération est soumise aux exigences suivantes :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Exemples

L'extrait de code suivant lit les messages du flux nommé StreamName. La méthode read utilise un objet ReadMessagesOptions facultatif qui spécifie le numéro de séquence à partir duquel commencer la lecture, les nombres minimum et maximum à lire et un délai d'expiration pour la lecture des messages.

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :read_messages|ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :readMessages|ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :readMessages|ReadMessagesOptions

 

Afficher la liste des flux

Obtenez la liste des flux dans le gestionnaire de flux.

Prérequis

Cette opération est soumise aux exigences suivantes :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Exemples

L'extrait de code suivant récupère une liste des flux (par nom) dans le gestionnaire de flux.

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :list_streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :listStreams

 

Décrire le flux de messages

Obtenez des métadonnées sur un flux, en particulier la définition, la taille et l'état d'exportation du flux.

Prérequis

Cette opération est soumise aux exigences suivantes :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Exemples

L'extrait de code suivant récupère des métadonnées sur le flux nommé StreamName, en particulier la définition, la taille et les statuts d'exportation du flux.

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :describe_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :describeMessageStream

 

Met à jour le flux

Met à jour les propriétés d'un flux existant. Il est possible que vous souhaitiez mettre à jour un flux si vos besoins changent après la création du flux. Par exemple :

  • Ajouter un nouveauConfiguration d'exportationpour unAWS Clouddestination.

  • Augmentez la taille maximale d'un flux pour modifier la façon dont les données sont exportées ou conservées. Par exemple, la taille du flux associée à votre stratégie sur les paramètres complets peut entraîner la suppression ou le rejet des données avant que le gestionnaire de flux puisse les traiter.

  • Suspendre et reprendre les exportations ; par exemple, si les tâches d'exportation sont longues et que vous souhaitez rationner vos données de chargement.

Vos fonctions Lambda suivent ce processus de haut niveau pour mettre à jour un flux :

  1. Obtenez la description du flux.

  2. Mettre à jour les propriétés cibles sur les propriétés correspondantesMessageStreamDefinitionet des objets subordonnés.

  3. Passez dans la mise à jourMessageStreamDefinition. Assurez-vous d'inclure les définitions d'objets complètes pour le flux mis à jour. Les propriétés non définies reprennent les valeurs par défaut.

    Vous pouvez spécifier le numéro de séquence du message à utiliser comme message de départ dans l'exportation.

Prérequis

Cette opération est soumise aux exigences suivantes :

  • MinimumAWS IoT GreengrassVersion Core : 1.11.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.6.0 | Java : 1.5.0 | Node.js : 1.7.0

Exemples

L'extrait de code suivant met à jour le flux nomméStreamName. Il met à jour plusieurs propriétés d'un flux exporté vers Kinesis Data Streams.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :Mettre à jour le flux de messages|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :update_message_stream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :Mettre à jour le flux de messages|MessageStreamDefinition

Contraintes de mise à jour des flux

Les contraintes suivantes s'appliquent quand vous mettez à jour les flux. Sauf indication dans la liste suivante, les mises à jour prennent effet immédiatement.

  • Vous ne pouvez pas mettre à jour la persistance d'un flux. Pour modifier ce comportement,supprimer le fluxetcréer un fluxqui définit la nouvelle politique de persistance.

  • Vous pouvez mettre à jour la taille maximale d'un flux uniquement dans les conditions suivantes :

    • La taille maximale doit être supérieure ou égale à la taille actuelle du flux. Pour trouver ces informations,décrire le fluxpuis vérifiez l'état de stockage du fichier renvoyéMessageStreamInfoobjet.

    • La taille maximale doit être supérieure ou égale à la taille du segment du flux.

  • Vous pouvez mettre à jour la taille du segment de flux à une valeur inférieure à la taille maximale du flux. Le paramètre mis à jour s'applique aux nouveaux segments.

  • Les mises à jour de la propriété de durée de vie (TTL) s'appliquent aux nouvelles opérations d'ajout. Si vous diminuez cette valeur, le gestionnaire de flux peut également supprimer des segments existants qui dépassent le TTL.

  • Les mises à jour de la stratégie sur la propriété complète s'appliquent aux nouvelles opérations d'ajout. Si vous définissez la stratégie pour écraser les données les plus anciennes, le gestionnaire de flux peut également écraser les segments existants en fonction du nouveau paramètre.

  • Les mises à jour de la propriété Flush on write s'appliquent aux nouveaux messages.

  • Les mises à jour des configurations d'exportation s'appliquent aux nouvelles exportations. La demande de mise à jour doit inclure toutes les configurations d'exportation que vous souhaitez prendre en charge. Sinon, le gestionnaire de flux les supprime.

    • Lorsque vous mettez à jour une configuration d'exportation, spécifiez l'identifiant de la configuration d'exportation cible.

    • Pour ajouter une configuration d'exportation, spécifiez un identifiant unique pour la nouvelle configuration d'exportation.

    • Pour supprimer une configuration d'exportation, omettez la configuration d'exportation.

  • Pourmise à journuméro de séquence de départ d'une configuration d'exportation dans un flux, vous devez spécifier une valeur inférieure au dernier numéro de séquence. Pour trouver ces informations,décrire le fluxpuis vérifiez l'état de stockage du fichier renvoyéMessageStreamInfoobjet.

 

Supprimer le flux de messages

Supprime un flux. Lorsque vous supprimez un flux, toutes les données stockées dans le flux sont supprimées du disque.

Prérequis

Cette opération possède les critères suivants :

  • MinimumAWS IoT GreengrassVersion Core : 1.10.0

  • MinimumAWS IoT GreengrassVersion du kit SDK Core : Python : 1.5.0 | Java : 1.4.0 | Node.js : 1.6.0

Exemples

L'extrait de code suivant supprime le flux nommé StreamName.

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Référence du kit SDK Python :deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Référence du kit SDK Java :delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Référence du kit SDK Node.js :deleteMessageStream

Consulter aussi