本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
用 StreamManagerClient 於使用串流
在 Greengrass 核心裝置上執行的使用者定義 Greengrass 元件可以使用串流管理員 SDK 中的StreamManagerClient
物件在串流管理員中建立串流,然後與串流互動。元件建立串流時,會定義串流的AWS 雲端目的地、優先順序以及其他匯出和資料保留原則。要將數據發送到流管理器,組件將數據附加到流。如果為串流定義了匯出目的地,串流管理員會自動匯出串流。
通常情況下,流管理器的客戶端是用戶定義的 Greengrass 組件。如果您的業務案例需要它,您還可以允許在 Greengrass 核心(例如,Docker 容器)上運行的非組件進程與流管理器進行交互。如需詳細資訊,請參閱 用戶端身分驗證。
本主題中的程式碼片段說明用戶端如何呼叫StreamManagerClient
方法來處理串流。如需有關方法及其引數的實作詳細資訊,請使用每個程式碼片段後面列出的 SDK 參考連結。
如果您在 Lambda 函數中使用串流管理員,則 Lambda 函數應該在函數處理常式StreamManagerClient
之外實例化。如果在處理常式內執行個體化,則該函數在每次叫用時都會建立 client
以及與串流管理員的連線。
如果您在處理常式內將 StreamManagerClient
執行個體化,則必須在 client
完成其工作時明確呼叫 close()
方法。否則,client
會將連線保持在開啟狀態,並將另一個執行緒保持在執行狀態,直到指令碼結束為止。
StreamManagerClient
支援下列操作:
建立訊息串流
要創建一個流,用戶定義的 Greengrass 組件調用創建方法,並在一個對象傳遞。MessageStreamDefinition
此對象指定流的唯一名稱,並定義當達到最大流大小時,流管理器應該如何處理新的數據。您可以使用 MessageStreamDefinition
及其資料類型 (例如 ExportDefinition
、StrategyOnFull
和 Persistence
) 來定義其他串流屬性。其中包含:
-
用於自動匯出的目標AWS IoT AnalyticsAWS IoT SiteWise、Kinesis Data Streams 和 Amazon S3 目的地。如需詳細資訊,請參閱 匯出支援AWS 雲端目的地的組態。
-
匯出優先順序。串流管理員會先匯出優先順序較高的串流,然後再匯出較低的串流。
-
Kinesis Data Streams 和AWS IoT SiteWise目標的AWS IoT Analytics最大批次大小和批次間隔。符合任一條件時,串流管理員會匯出訊息。
-
Time-to-live (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後,資料可能不會立即刪除。
-
串流持久性。選擇此選項可將串流儲存至檔案系統,以便在核心重新啟動期間保留資料,或將串流儲存在記憶體中。
-
起始序列號。指定要在匯出時用作開始郵件的郵件序號。
如需詳細資訊MessageStreamDefinition
,請參閱目標語言的 SDK 參考資料:
StreamManagerClient
也提供可用來將串流匯出至 HTTP 伺服器的目標目的地。此目標僅供測試之用。它不穩定或不支持在生產環境中使用。
建立串流之後,您的 Greengrass 元件可以將訊息附加至串流以傳送資料以供匯出,並從串流讀取訊息以進行本機處理。您建立的串流數量取決於您的硬體功能和商業案例。一種策略是為 Kinesis 資料串流中AWS IoT Analytics的每個目標通道建立串流,不過您可以為串流定義多個目標。串流的生命週期相當耐久。
要求
此操作具有以下要求:
範例
以下程式碼片段會建立名為 StreamName
的串流。它定義了MessageStreamDefinition
和下屬數據類型流屬性。
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 雲端.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 發套件參考資料:建立訊息串流 MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 開發套件參考資料:createMessageStream| MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSiteWise(null)
.withS3(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 開發套件參考資料:createMessageStream| MessageStreamDefinition
如需設定匯出目的地的詳細資訊,請參閱匯出支援AWS 雲端目的地的組態。
附加訊息
要將數據發送到流管理器進行導出,您的 Greengrass 組件將數據附加到目標流。匯出目的地決定要傳遞給此方法的資料類型。
要求
此操作具有以下要求:
範例
AWS IoT Analytics或 Kinesis Data Streams 匯出目的地
下列程式碼片段附加一個訊息到名為 StreamName
的串流。對於AWS IoT Analytics或 Kinesis Data Streams 目的地,您的 Greengrass 元件會附加一組資料。
此代碼片段具有以下要求:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:附加消息
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考資料:appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟件開發套件參考:appendMessage
AWS IoT SiteWise出口目的地
下列程式碼片段附加一個訊息到名為 StreamName
的串流。對於AWS IoT SiteWise目的地,您的 Greengrass 組件附加一個序列化對象。PutAssetPropertyValueEntry
如需詳細資訊,請參閱 匯出至 AWS IoT SiteWise。
當您將資料傳送至時AWS IoT SiteWise,您的資料必須符合BatchPutAssetPropertyValue
動作的要求。如需詳細資訊,請參閱 AWS IoT SiteWise API 參考中的 BatchPutAssetPropertyValue。
此代碼片段具有以下要求:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:附加消息 | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
// than 10 minutes in the past. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考資料:appendMessage | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟件開發套件參考:appendMessage | PutAssetPropertyValueEntry
Amazon S3 出口目的地
下列程式碼片段會將匯出工作附加至名為StreamName
的資料流。對於 Amazon S3 目的地,您的 Greengrass 元件會附加序列化S3ExportTaskDefinition
物件,其中包含有關來源輸入檔案和目標 Amazon S3 物件的資訊。如果指定的物件不存在,「串流管理員」會為您建立該物件。如需詳細資訊,請參閱 匯出到 Amazon S3。
此代碼片段具有以下要求:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 發套件參考:附加訊息 ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 開發套件參考資料:appendMessage ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 開發套件參考資料:appendMessage ExportTaskDefinition
讀取訊息
讀取串流中的訊息。
要求
此操作具有以下要求:
範例
下列程式碼片段可從名為 StreamName
的串流讀取訊息。讀取方法需要一個選用的 ReadMessagesOptions
物件以指定序號,從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考資料:讀取訊息 | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 開發套件參考資料:readMessages | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 開發套件參考資料:readMessages | ReadMessagesOptions
列出串流
在流管理器中獲取流列表。
要求
此操作具有以下要求:
範例
下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 發套件參考:清單流
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
開發套件參考資料:listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 開發套件參考資料:listStreams
描述訊息串流
取得串流的中繼資料,包括串流定義、大小和匯出狀態。
要求
此操作具有以下要求:
範例
下列程式碼片段可獲取名為 StreamName
的串流相關的中繼資料,包括串流定義、大小和匯出工具狀態。
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考資料:描述訊息串流
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
開發套件參考資料:describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 開發套件參考資料:describeMessageStream
更新訊息串流
更新現有流的屬性。如果串流建立後您的需求有所變更,您可能會想要更新串流。例如:
您的 Greengrass 組件遵循以下高級過程來更新流:
-
取得串流的說明。
-
更新對應物件MessageStreamDefinition
和從屬物件上的目標性質。
-
通過在更新的MessageStreamDefinition
。請務必包含更新串流的完整物件定義。未定義的屬性還原為預設值。
您可以指定要在匯出作業中用作開始郵件的郵件序號。
要求
此操作具有以下要求:
範例
下列程式碼片段會更新名為的串流StreamName
。它會更新匯出至 Kinesis 資料串流的串流的多個屬性。
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
開 Python 套件參考資料:updateMessageStream| MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS 雲端.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考資料:更新訊息串流 | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS 雲端.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考資料:updateMessageStreamMessageStreamDefinition
更新串流的限制
更新串流時會套用下列限制條件。除非在下列清單中註明,否則更新會立即生效。
-
您無法更新資料流的持續性。若要變更此行為,請刪除串流並建立定義新持續性原則的串流。
-
您只能在下列情況下更新串流的大小上限:
-
您可以將串流區段大小更新為小於串流大小上限的值。更新後的設定會套用至新區段。
-
存留時間 (TTL) 屬性的更新會套用至新的附加作業。如果您減少此值,串流管理員也可能會刪除超過 TTL 的現有區段。
-
完整屬性的策略更新會套用至新的附加作業。如果您將策略設定為覆寫最舊的資料,串流管理員也可能會根據新設定覆寫現有區段。
-
寫入時清除屬性的更新會套用至新郵件。
-
匯出組態的更新會套用至新的匯出。更新要求必須包含您要支援的所有匯出組態。否則,流管理器將刪除它們。
-
更新匯出組態時,請指定目標匯出組態的識別碼。
-
若要新增匯出組態,請為新匯出組態指定唯一識別碼。
-
若要刪除匯出組態,請省略匯出組態。
-
若要更新串流中匯出組態的起始序號,您必須指定小於最新序號的值。若要尋找此資訊,請描述串流,然後檢查傳回MessageStreamInfo
物件的儲存狀態。
刪除訊息串流
刪除串流。刪除串流時,磁碟中該串流的所有儲存資料都會刪除。
要求
此操作具有以下要求:
範例
下列程式碼片段會刪除名為 StreamName
的串流。
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
開 Python 套件參考資料:deleteMessageStream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 參考資料:刪除訊息串流
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 開發套件參考資料:deleteMessageStream
另請參閱