AWS IoT Greengrass Version 1 trat am 30. Juni 2023 in die erweiterte Lebensphase ein. Weitere Informationen finden Sie in der AWS IoT Greengrass V1 Wartungsrichtlinie. Nach diesem Datum AWS IoT Greengrass V1 werden keine Updates mehr veröffentlicht, die Funktionen, Verbesserungen, Bugfixes oder Sicherheitspatches bieten. Geräte, die auf laufen, werden AWS IoT Greengrass V1 nicht gestört und funktionieren weiterhin und stellen eine Verbindung zur Cloud her. Wir empfehlen Ihnen dringend, zu migrieren AWS IoT Greengrass Version 2, da dies wichtige neue Funktionen und Unterstützung für zusätzliche Plattformen bietet.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwenden von StreamManagerClient um mit Streams zu arbeiten
Benutzerdefinierte Lambda-Funktionen, die auf demAWS IoT Greengrasscore kann dieStreamManagerClient
-Objekt imAWS IoT GreengrassCore-SDKum Streams zu erstellenStream-Managerund interagiere dann mit den Streams. Wenn eine Lambda-Funktion einen Stream erstellt, definiert sie dieAWS CloudZiele, Priorisierung sowie andere Export- und Datenaufbewahrungsrichtlinien für den Stream. Um Daten an den Stream-Manager zu senden, hängen Lambda-Funktionen die Daten an den Stream an. Wenn ein Exportziel für den Stream definiert ist, exportiert der Stream-Manager den Stream automatisch.
Typischerweise sind Clients des Stream-Managers benutzerdefinierte Lambda-Funktionen. Wenn Ihr Geschäftsfall dies erfordert, können Sie zulassen, dass nicht auf dem Greengrass Core ausgeführte -Prozesse (z. B. ein Docker-Container) mit dem Stream-Manager interagieren. Weitere Informationen finden Sie unter Client-Authentifizierung.
Die Snippets in diesem Thema zeigen, wie Kunden anrufenStreamManagerClient
Methoden zum Arbeiten mit Streams. Um Implementierungsdetails zu den Methoden und ihren Argumenten zu erhalten, verwenden Sie die Links zur SDK-Referenz, die nach jedem Snippet aufgeführt sind. Tutorials, die eine vollständige Python-Lambda-Funktion enthalten, finden Sie unterExportieren von Daten-Streams inAWS Cloud(Konsole)oderExportieren von Daten-Streams inAWS Cloud(CLI)aus.
Ihre Lambda-Funktion sollte instanziierenStreamManagerClient
außerhalb des Funktionshandlers. Wenn sie in dem Handler instanziiert wird, erstellt die Funktion bei jedem Aufruf eine client
und eine Verbindung zum Stream-Manager.
Wenn Sie StreamManagerClient
in dem Handler instanziieren, müssen Sie die close()
-Methode explizit aufrufen, wenn die client
seine Arbeit abschließt. Andernfalls hält der client
die Verbindung offen, und ein anderer Thread läuft, bis das Skript beendet wird.
StreamManagerClient
unterstützt die folgenden Operationen:
Erstellen eines Nachrichten-Streams
Um einen Stream zu erstellen, ruft eine benutzerdefinierte Lambda-Funktion die create-Methode auf und übergibtMessageStreamDefinition
-Objekt. Dieses Objekt gibt den eindeutigen Namen für den Stream an und definiert, wie der Stream-Manager neue Daten verarbeiten soll, wenn die maximale Stream-Größe erreicht ist. Sie können mit MessageStreamDefinition
seinen Datentypen (z. B. ExportDefinition
, StrategyOnFull
, und Persistence
) andere Stream-Eigenschaften definieren. Dazu zählen:
-
Das ZielAWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWiseund Amazon S3-Destinationen für automatische Exporte. Weitere Informationen finden Sie unter Exportieren von Konfigurationen für unterstützteAWS CloudDestinationen.
-
Export-Priorität. Stream-Manager exportiert Streams mit höherer Priorität vor Streams mit niedrigerer Priorität.
-
Maximale Stapelgröße und Stapelintervall fürAWS IoT Analytics, Kinesis Data Streams undAWS IoT SiteWiseZielen. Der Stream-Manager exportiert Nachrichten, wenn eine der Bedingungen erfüllt ist.
-
Time-to-Live (TTL). Die Zeitspanne, um sicherzustellen, dass die Streamdaten für die Verarbeitung verfügbar sind. Sie sollten sicherstellen, dass die Daten innerhalb dieses Zeitraums verbraucht werden können. Dies ist keine Löschrichtlinie. Die Daten werden möglicherweise nicht unmittelbar nach dem TTL-Zeitraum gelöscht.
-
Streampersistenz. Wählen Sie, ob Streams im Dateisystem gespeichert werden sollen, um Daten über Core-Neustarts hinweg zu speichern oder Streams im Speicher zu speichern.
-
Startsequenznummer. Geben Sie die Sequenznummer der Nachricht an, die als Startnachricht im Export verwendet werden soll.
Weitere Informationen zuMessageStreamDefinition
finden Sie in der SDK-Referenz für Ihre Zielsprache:
StreamManagerClient
Bietet auch ein Zielziel, mit dem Sie Streams auf einen HTTP-Server exportieren können. Dieses Ziel dient nur zu Testzwecken. Es ist nicht stabil oder wird nicht für den Einsatz in Produktionsumgebungen unterstützt.
Nachdem ein Stream erstellt wurde, können Ihre Lambda-FunktionenAnhängen von Nachrichtenzum Stream um Daten für den Export zu senden undLesen von Nachrichtenaus dem Stream zur lokalen Verarbeitung. Die Anzahl der Streams, die Sie erstellen, hängt von Ihren Hardwarefunktionen und Ihrem Geschäftsfall ab. Eine Strategie besteht darin, einen Stream für jeden Zielkanal in zu erstellenAWS IoT Analyticsoder Kinesis-Datenstream, obwohl Sie mehrere Ziele für einen Stream definieren können. Ein Stream hat eine dauerhafte Lebensdauer.
Voraussetzungen
Dieser Vorgang hat folgende Anforderungen:
Erstellen von Streams mit einemAWS IoT SiteWiseoder Amazon S3 S3-Exportziel hat die folgenden Anforderungen:
Beispiele
Das folgende Snippet erstellt einen Stream mit dem Namen StreamName
. Es definiert Stream-Eigenschaften imMessageStreamDefinition
und untergeordnete Datentypen.
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:create_message_stream|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:CreateMessageStream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:CreateMessageStream|MessageStreamDefinition
Weitere Informationen zum Konfigurieren von Exportzielen finden Sie unterExportieren von Konfigurationen für unterstützteAWS CloudDestinationenaus.
Anhängen einer Nachricht
Um Daten zum Export an den Stream-Manager zu senden, hängen Ihre Lambda-Funktionen die Daten an den Ziel-Stream an. Das Exportziel bestimmt den Datentyp, der an diese Methode übergeben werden soll.
Voraussetzungen
Dieser Vorgang hat die folgenden Anforderungen:
Nachrichten an ein anhängenAWS IoT SiteWiseoder Amazon S3 S3-Exportziel hat die folgenden Anforderungen:
Beispiele
AWS IoT Analyticsoder Exportieren von Kinesis Data Streams
Das folgende Snippet fügt eine Nachricht an den Stream namens StreamName
an. FürAWS IoT Analyticsoder Kinesis Data Streams Destinationen, Ihre Lambda-Funktionen hängen einen Datenblock an.
Dieser Snippet hat die folgenden Anforderungen:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:append_message
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:AppendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:AppendMessage
AWS IoT SiteWiseExportieren von -
Das folgende Snippet fügt eine Nachricht an den Stream namens StreamName
an. FürAWS IoT SiteWiseDestinationen, Ihre Lambda-Funktionen hängen eine serialisiertePutAssetPropertyValueEntry
-Objekt. Weitere Informationen finden Sie unter Exportieren in AWS IoT SiteWise.
Wenn Sie Daten an sendenAWS IoT SiteWisemüssen Ihre Daten die Anforderungen desBatchPutAssetPropertyValue
Aktion Weitere Informationen finden Sie unter BatchPutAssetPropertyValue in der AWS IoT SiteWise-API-Referenz.
Dieser Snippet hat die folgenden Anforderungen:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:append_message|PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:AppendMessage|PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:AppendMessage|PutAssetPropertyValueEntry
Amazon S3 S3-Exportziele
Das folgende Snippet fügt eine Exportaufgabe an den Stream mit dem Namen anStreamName
aus. Für Amazon S3 S3-Destinationen hängen Ihre Lambda-Funktionen eine serialisierteS3ExportTaskDefinition
-Objekt, das Informationen zur Quelleingabedatei und zum Ziel-Amazon S3-Objekt enthält. Wenn das angegebene Objekt nicht existiert, erstellt Stream Manager es für Sie. Weitere Informationen finden Sie unter Exportieren nach Amazon S3.
Dieser Snippet hat die folgenden Anforderungen:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:append_message|s3ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:AppendMessage|s3ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:AppendMessage|s3ExportTaskDefinition
Lesen von Nachrichten
Lesen Sie Nachrichten aus einem Stream.
Voraussetzungen
Dieser Vorgang hat die folgenden Anforderungen:
Beispiele
Das folgende Snippet liest Nachrichten aus dem Stream namens StreamName
. Die Read-Methode verwendet ein optionales ReadMessagesOptions
-Objekt, das die Sequenznummer angibt, von der aus mit dem Lesen begonnen werden soll, die minimale und maximale Anzahl zu lesen und ein Timeout für das Lesen von Nachrichten.
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:read_messages|ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:readMessages|ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:readMessages|ReadMessagesOptions
Auflisten von Streams
Rufen Sie die Liste der Streams im Stream-Manager ab.
Voraussetzungen
Dieser Vorgang hat die folgenden Anforderungen:
Beispiele
Das folgende Snippet ruft eine Liste der Streams (nach Namen) im Stream-Manager ab.
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:list_streams
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:listStreams
Beschreiben eines Nachrichten-Streams
Rufen Sie Metadaten zu einem Stream ab, einschließlich Definition, Größe und Exportstatus des Streams.
Voraussetzungen
Dieser Vorgang hat die folgenden Anforderungen:
Beispiele
Das folgende Snippet ruft Metadaten über den Stream mit dem Namen StreamName
ab, einschließlich Definition, Größe und Exporterstatus des Streams.
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:describe_message_stream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:describeMessageStream
Aktualisieren eines Nachrichten-Streams
Aktualisieren Sie die Eigenschaften eines vorhandenen Streams. Möglicherweise möchten Sie einen Stream aktualisieren, wenn sich Ihre Anforderungen ändern, nachdem der Stream erstellt wurde. Zum Beispiel:
-
Fügen Sie ein neues hinzuExportieren der Konfigurationfür einAWS CloudZiel.
-
Erhöhen Sie die maximale Größe eines Streams, um zu ändern, wie Daten exportiert oder gespeichert werden. Beispielsweise kann die Stream-Größe in Kombination mit Ihrer Strategie für vollständige Einstellungen dazu führen, dass Daten gelöscht oder abgelehnt werden, bevor der Stream-Manager sie verarbeiten kann.
-
Pausieren und fortsetzen Sie Exporte, z. B. wenn Exportaufgaben lange laufen und Sie Ihre Upload-Daten rationieren möchten.
Ihre Lambda-Funktionen folgen diesem hochrangigen Prozess, um einen Stream zu aktualisieren:
-
Holen Sie sich die Beschreibung des Streams.
-
Aktualisieren Sie die Zieleigenschaften auf dem entsprechendenMessageStreamDefinition
und untergeordnete Objekte.
-
Übergeben Sie das aktualisierteMessageStreamDefinition
aus. Stellen Sie sicher, dass Sie die vollständigen Objektdefinitionen für den aktualisierten Stream angeben. Undefinierte Eigenschaften werden auf die Standardwerte zurückgesetzt.
Sie können die Sequenznummer der Nachricht angeben, die als Startnachricht im Export verwendet werden soll.
Voraussetzungen
Dieser Vorgang hat die folgenden Anforderungen:
Beispiele
Das folgende Snippet aktualisiert den Stream mit dem NamenStreamName
aus. Es aktualisiert mehrere Eigenschaften eines Streams, der in Kinesis Data Streams exportiert wird.
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:updateMessageStream|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:update_message_stream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:updateMessageStream|MessageStreamDefinition
Einschränkungen für das Aktualisieren von Streams
Beim Aktualisieren von Streams gelten die folgenden Einschränkungen. Sofern in der folgenden Liste nicht angegeben, werden Aktualisierungen sofort wirksam.
-
Sie können die Persistenz eines Streams nicht aktualisieren. Um dieses Verhalten zu ändern,Löschen des StreamsundErstellen eines Streamsdas definiert die neue Persistenzrichtlinie.
-
Sie können die maximale Größe eines Streams nur unter den folgenden Bedingungen aktualisieren:
-
Die maximale Größe muss größer oder gleich der aktuellen Stream-Größe sein. Um diese Informationen zu finden,Beschreiben des Streamsund überprüfen Sie dann den Speicherstatus der zurückgegebenenMessageStreamInfo
-Objekt.
-
Die maximale Größe muss größer oder gleich der Segmentgröße des Streams sein.
-
Sie können die Stream-Segmentgröße auf einen Wert aktualisieren, der kleiner als die maximale Größe des Streams ist. Die aktualisierte Einstellung gilt für neue Segmente.
-
Aktualisierungen der Time to Live (TTL) -Eigenschaft gelten für neue Anfügevorgänge. Wenn Sie diesen Wert verringern, löscht der Stream-Manager möglicherweise auch vorhandene Segmente, die die TTL überschreiten.
-
Aktualisierungen der Strategie für vollständige Eigenschaft gelten für neue Anfügevorgänge. Wenn Sie die Strategie zum Überschreiben der ältesten Daten festlegen, überschreibt der Stream-Manager möglicherweise auch vorhandene Segmente basierend auf der neuen Einstellung.
-
Aktualisierungen der Eigenschaft „Flush on Write“ gelten für neue Nachrichten.
-
Aktualisierungen für Exportkonfigurationen gelten für neue Exporte. Die Aktualisierungsanforderung muss alle Exportkonfigurationen enthalten, die Sie unterstützen möchten. Andernfalls löscht der Stream-Manager sie.
-
Wenn Sie eine Exportkonfiguration aktualisieren, geben Sie den Bezeichner der Ziel-Exportkonfiguration an.
-
Um eine Exportkonfiguration hinzuzufügen, geben Sie einen eindeutigen Bezeichner für die neue Exportkonfiguration an.
-
Um eine Exportkonfiguration zu löschen, lassen Sie die Exportkonfiguration weg.
-
BisaktualisierenDie Startsequenznummer einer Exportkonfiguration in einem Stream müssen Sie einen Wert angeben, der kleiner als die letzte Sequenznummer ist. Um diese Informationen zu finden,Beschreiben des Streamsund überprüfen Sie dann den Speicherstatus der zurückgegebenenMessageStreamInfo
-Objekt.
Löschen eines Nachrichten-Streams
Löscht einen Stream. Wenn Sie einen Stream löschen, werden alle gespeicherten Daten für den Stream von der Festplatte gelöscht.
Voraussetzungen
Dieser Vorgang hat folgende Anforderungen:
Beispiele
Das folgende Snippet löscht den Stream mit dem Namen StreamName
.
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
SDK-Referenz:deleteMessageStream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java-SDK-Referenz:delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK-Referenz:deleteMessageStream
Weitere Informationen finden Sie auch unter