AWS IoT Greengrass Version 1 est entré dans la phase de durée de vie prolongée le 30 juin 2023. Pour plus d'informations, consultez la politique de AWS IoT Greengrass V1 maintenance. Après cette date, AWS IoT Greengrass V1 ne publiera pas de mises à jour fournissant des fonctionnalités, des améliorations, des corrections de bogues ou des correctifs de sécurité. Les appareils qui fonctionnent AWS IoT Greengrass V1 sous tension ne seront pas perturbés et continueront à fonctionner et à se connecter au cloud. Nous vous recommandons vivement de migrer vers AWS IoT Greengrass Version 2, qui ajoute de nouvelles fonctionnalités importantes et prend en charge des plateformes supplémentaires.
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utiliser StreamManagerClient pour travailler avec des flux
Fonctions Lambda définies par l'utilisateur exécutées sur leAWS IoT GreengrassCore peut utiliser leStreamManagerClient
dans leAWS IoT GreengrassKit SDK Corepour créer des flux dansgestionnaire de fluxpuis interagissez avec les flux. Lorsqu'une fonction Lambda crée un flux, elle définit le paramètreAWS Clouddestinations, hiérarchisation et autres stratégies d'exportation et de conservation des données pour le flux. Pour envoyer des données au gestionnaire de flux, les fonctions Lambda ajoutent les données au flux. Si une destination d'exportation est définie pour le flux, le gestionnaire de flux exporte automatiquement le flux.
Généralement, les clients du gestionnaire de flux sont des fonctions Lambda définies par l'utilisateur. Si votre analyse de rentabilisation l'exige, vous pouvez également autoriser les processus non Lambda exécutés sur le noyau Greengrass (par exemple, un conteneur Docker) à interagir avec le gestionnaire de flux. Pour plus d'informations, consultez Authentification client.
Les extraits de cette rubrique vous montrent comment les clients appellent.StreamManagerClient
méthodes pour travailler avec des flux Pour obtenir des détails d'implémentation concernant les méthodes et leurs arguments, utilisez les liens vers la référence SDK répertoriée après chaque extrait de code. Pour obtenir des didacticiels comprenant une fonction Python Lambda complète, reportez-vous à la sectionExportation de flux de données versAWS Cloud(console)ouExportation de flux de données vers leAWS Cloud(CLI).
Votre fonction Lambda doit instancierStreamManagerClient
en dehors du gestionnaire de fonctions Si la fonction est instanciée dans le gestionnaire, elle crée un client
et une connexion au gestionnaire de flux chaque fois qu'elle est appelée.
Si vous effectuez une instanciation StreamManagerClient
dans le gestionnaire, vous devez appeler explicitement la méthode close()
lorsque le client
termine son travail. Sinon, le client
maintient la connexion ouverte et un autre thread actif jusqu'à ce que le script se termine.
StreamManagerClient
prend en charge les opérations suivantes :
Créer un flux de messages
Pour créer un flux, une fonction Lambda définie par l'utilisateur appelle la méthode create et transmet unMessageStreamDefinition
objet. Cet objet spécifie le nom unique du flux et définit la façon dont le gestionnaire de flux doit gérer les nouvelles données lorsque la taille maximale du flux est atteinte. Vous pouvez utiliser MessageStreamDefinition
et ses types de données (tels que ExportDefinition
, StrategyOnFull
et Persistence
) pour définir d'autres propriétés de flux. Il s'agit des licences suivantes :
-
cibleAWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWise, et les destinations Amazon S3 pour les exportations automatiques. Pour plus d'informations, consultez Configurations d'exportation pour prises en chargeAWS Clouddestinations.
-
Exportez la priorité. Le gestionnaire de flux exporte les flux de priorité supérieure avant les flux de priorité inférieure.
-
Taille maximale du lot et intervalle de lot pourAWS IoT Analytics, Kinesis Data Streams etAWS IoT SiteWisedestinations. Le gestionnaire de flux exporte les messages lorsque l'une ou l'autre des conditions est remplie.
-
Durée de vie (TTL). Temps nécessaire pour garantir que les données du flux sont disponibles pour le traitement. Vous devez vous assurer que les données peuvent être consommées pendant cette période. Il ne s'agit pas d'une stratégie de suppression. Les données peuvent ne pas être supprimées immédiatement après la période de TTL.
-
Persistance des flux. Choisissez d'enregistrer les flux dans le système de fichiers afin de conserver les données lors des redémarrages du noyau ou d'enregistrer les flux en mémoire.
-
Numéro de séquence de début Spécifiez le numéro de séquence du message à utiliser comme message de départ dans l'exportation.
Pour plus d'informations surMessageStreamDefinition
, consultez la référence SDK de votre langue cible :
StreamManagerClient
fournit également une destination cible que vous pouvez utiliser pour exporter des flux vers un serveur HTTP. Cette cible n'est destinée qu'à des fins de test. Il n'est pas stable ni pris en charge pour une utilisation dans des environnements de production.
Après la création d'un flux, vos fonctions Lambda peuventajouter des messagesvers le flux pour envoyer des données pour exportation etLire des messagesdepuis le flux pour le traitement local. Le nombre de flux que vous créez dépend de vos capacités matérielles et de votre analyse de rentabilisation. Une stratégie consiste à créer un flux pour chaque canal cible dansAWS IoT Analyticsou Kinesis, bien que vous puissiez définir plusieurs cibles pour un flux. Un flux a un cycle de vie durable.
Prérequis
Cette opération possède les critères suivants :
Création de flux avec unAWS IoT SiteWiseou la destination d'exportation Amazon S3 possède les critères suivants :
Exemples
L'extrait de code suivant crée un flux nommé StreamName
. Il définit les propriétés de flux dans leMessageStreamDefinition
et les types de données subordonnés.
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :create_message_stream|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :createMessageStream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :createMessageStream|MessageStreamDefinition
Pour plus d'informations sur la configuration des destinations d'exportation, consultezConfigurations d'exportation pour prises en chargeAWS Clouddestinations.
Ajouter un message
Pour envoyer des données vers le gestionnaire de flux pour exportation, vos fonctions Lambda ajoutent les données au flux cible. La destination d'exportation détermine le type de données à transmettre à cette méthode.
Prérequis
Cette opération est soumise aux exigences suivantes :
Ajout de messages avec unAWS IoT SiteWiseou la destination d'exportation Amazon S3 possède les critères suivants :
Exemples
AWS IoT Analyticsou des destinations d'exportation Kinesis Data Streams
L'extrait de code suivant ajoute un message au flux nommé StreamName
. PourAWS IoT Analyticsou les destinations Kinesis Data Streams, vos fonctions Lambda ajoutent un blob de données.
Cet extrait de code possède les critères suivants :
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :append_message
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :appendMessage
AWS IoT SiteWisedestinations d'exportation
L'extrait de code suivant ajoute un message au flux nommé StreamName
. PourAWS IoT SiteWisedestinations, vos fonctions Lambda ajoutent un objet sérialiséPutAssetPropertyValueEntry
objet. Pour plus d'informations, consultez Exportation vers AWS IoT SiteWise.
Lorsque vous envoyez des données àAWS IoT SiteWise, vos données doivent répondre aux exigences duBatchPutAssetPropertyValue
action. Pour de plus amples informations, veuillez consulter BatchPutasSetPropertyValue dans la référence de l'API AWS IoT SiteWise.
Cet extrait de code possède les critères suivants :
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :append_message|PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :appendMessage|PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :appendMessage|PutAssetPropertyValueEntry
Destinations d'exportation Amazon S3
L'extrait de code suivant ajoute une tâche d'exportation au flux nomméStreamName
. Pour les destinations Amazon S3, vos fonctions Lambda ajoutent un objet sérialiséS3ExportTaskDefinition
qui contient les informations sur le fichier d'entrée source et l'objet Amazon S3 cible. Si l'objet spécifié n'existe pas, Stream Manager le crée pour vous. Pour plus d'informations, consultez Exporter vers Amazon S3.
Cet extrait de code possède les critères suivants :
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :append_message|Définition de la tâche d'exportation S3
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :appendMessage|Définition de la tâche d'exportation S3
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :appendMessage|Définition de la tâche d'exportation S3
Lire des messages
Lire des messages à partir d'un flux.
Prérequis
Cette opération est soumise aux exigences suivantes :
Exemples
L'extrait de code suivant lit les messages du flux nommé StreamName
. La méthode read utilise un objet ReadMessagesOptions
facultatif qui spécifie le numéro de séquence à partir duquel commencer la lecture, les nombres minimum et maximum à lire et un délai d'expiration pour la lecture des messages.
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :read_messages|ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :readMessages|ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :readMessages|ReadMessagesOptions
Afficher la liste des flux
Obtenez la liste des flux dans le gestionnaire de flux.
Prérequis
Cette opération est soumise aux exigences suivantes :
Exemples
L'extrait de code suivant récupère une liste des flux (par nom) dans le gestionnaire de flux.
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :list_streams
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :listStreams
Décrire le flux de messages
Obtenez des métadonnées sur un flux, en particulier la définition, la taille et l'état d'exportation du flux.
Prérequis
Cette opération est soumise aux exigences suivantes :
Exemples
L'extrait de code suivant récupère des métadonnées sur le flux nommé StreamName
, en particulier la définition, la taille et les statuts d'exportation du flux.
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :describe_message_stream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :describeMessageStream
Met à jour le flux
Met à jour les propriétés d'un flux existant. Il est possible que vous souhaitiez mettre à jour un flux si vos besoins changent après la création du flux. Par exemple :
-
Ajouter un nouveauConfiguration d'exportationpour unAWS Clouddestination.
-
Augmentez la taille maximale d'un flux pour modifier la façon dont les données sont exportées ou conservées. Par exemple, la taille du flux associée à votre stratégie sur les paramètres complets peut entraîner la suppression ou le rejet des données avant que le gestionnaire de flux puisse les traiter.
-
Suspendre et reprendre les exportations ; par exemple, si les tâches d'exportation sont longues et que vous souhaitez rationner vos données de chargement.
Vos fonctions Lambda suivent ce processus de haut niveau pour mettre à jour un flux :
-
Obtenez la description du flux.
-
Mettre à jour les propriétés cibles sur les propriétés correspondantesMessageStreamDefinition
et des objets subordonnés.
-
Passez dans la mise à jourMessageStreamDefinition
. Assurez-vous d'inclure les définitions d'objets complètes pour le flux mis à jour. Les propriétés non définies reprennent les valeurs par défaut.
Vous pouvez spécifier le numéro de séquence du message à utiliser comme message de départ dans l'exportation.
Prérequis
Cette opération est soumise aux exigences suivantes :
Exemples
L'extrait de code suivant met à jour le flux nomméStreamName
. Il met à jour plusieurs propriétés d'un flux exporté vers Kinesis Data Streams.
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :Mettre à jour le flux de messages|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :update_message_stream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :Mettre à jour le flux de messages|MessageStreamDefinition
Contraintes de mise à jour des flux
Les contraintes suivantes s'appliquent quand vous mettez à jour les flux. Sauf indication dans la liste suivante, les mises à jour prennent effet immédiatement.
-
Vous ne pouvez pas mettre à jour la persistance d'un flux. Pour modifier ce comportement,supprimer le fluxetcréer un fluxqui définit la nouvelle politique de persistance.
-
Vous pouvez mettre à jour la taille maximale d'un flux uniquement dans les conditions suivantes :
-
La taille maximale doit être supérieure ou égale à la taille actuelle du flux. Pour trouver ces informations,décrire le fluxpuis vérifiez l'état de stockage du fichier renvoyéMessageStreamInfo
objet.
-
La taille maximale doit être supérieure ou égale à la taille du segment du flux.
-
Vous pouvez mettre à jour la taille du segment de flux à une valeur inférieure à la taille maximale du flux. Le paramètre mis à jour s'applique aux nouveaux segments.
-
Les mises à jour de la propriété de durée de vie (TTL) s'appliquent aux nouvelles opérations d'ajout. Si vous diminuez cette valeur, le gestionnaire de flux peut également supprimer des segments existants qui dépassent le TTL.
-
Les mises à jour de la stratégie sur la propriété complète s'appliquent aux nouvelles opérations d'ajout. Si vous définissez la stratégie pour écraser les données les plus anciennes, le gestionnaire de flux peut également écraser les segments existants en fonction du nouveau paramètre.
-
Les mises à jour de la propriété Flush on write s'appliquent aux nouveaux messages.
-
Les mises à jour des configurations d'exportation s'appliquent aux nouvelles exportations. La demande de mise à jour doit inclure toutes les configurations d'exportation que vous souhaitez prendre en charge. Sinon, le gestionnaire de flux les supprime.
-
Lorsque vous mettez à jour une configuration d'exportation, spécifiez l'identifiant de la configuration d'exportation cible.
-
Pour ajouter une configuration d'exportation, spécifiez un identifiant unique pour la nouvelle configuration d'exportation.
-
Pour supprimer une configuration d'exportation, omettez la configuration d'exportation.
-
Pourmise à journuméro de séquence de départ d'une configuration d'exportation dans un flux, vous devez spécifier une valeur inférieure au dernier numéro de séquence. Pour trouver ces informations,décrire le fluxpuis vérifiez l'état de stockage du fichier renvoyéMessageStreamInfo
objet.
Supprimer le flux de messages
Supprime un flux. Lorsque vous supprimez un flux, toutes les données stockées dans le flux sont supprimées du disque.
Prérequis
Cette opération possède les critères suivants :
Exemples
L'extrait de code suivant supprime le flux nommé StreamName
.
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du kit SDK Python :deleteMessageStream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du kit SDK Java :delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du kit SDK Node.js :deleteMessageStream
Consulter aussi