翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
StreamManagerClient を使用してストリームを操作する
Greengrass コアデバイスで実行されるユーザー定義の Greengrass コンポーネントは、ストリームマネージャー SDK の StreamManagerClient
オブジェクトを使用し、ストリームマネージャーでストリームを作成してストリームとやり取りします。コンポーネントがストリームを作成するとき、ストリームに対して AWS クラウド 送信先、優先順位、他のエクスポートとデータ保持ポリシーを定義します。ストリームマネージャーにデータを送信するため、コンポーネントはデータをストリームに追加します。ストリームにエクスポート先が定義されている場合、ストリームマネージャーは自動的にストリームをエクスポートします。
通常、ストリームマネージャーのクライアントはユーザー定義の Greengrass コンポーネントです。ビジネスケースで必要な場合、Greengrass コア (例えば、Docker コンテナなど) で実行されている非コンポーネントプロセスがストリームマネージャーとやり取りを可能にすることもできます。詳細については、「クライアント承認」を参照してください。
このトピックのスニペットは、クライアントが StreamManagerClient
手法を使用してストリームを操作する方法を示しています。手法とその引数の実装に関する詳細については、各スニペットの下に記載されている SDK リファレンスへのリンクを使用してください。
Lambda 関数でストリームマネージャーを使用する場合、Lambda 関数は StreamManagerClient
を関数ハンドらの外側でインスタンス化する必要があります。ハンドラでインスタンス化されると、関数は呼び出されるたびに client
およびストリームマネージャへの接続を作成します。
ハンドラで StreamManagerClient
のインスタンス化を行う場合は、client
が作業を完了したときに、close()
メソッドを明示的に呼び出す必要があります。それ以外の場合、client
は接続を開いたままにし、スクリプトが終了するまで別のスレッドを実行します。
StreamManagerClient
では次の操作がサポートされています。
メッセージストリームの作成
ストリームを作成するには、ユーザー定義の Greengasss コンポーネントが作成手法を呼び出して MessageStreamDefinition
オブジェクトに渡します。このオブジェクトは、ストリームの一意の名前を指定し、最大ストリームサイズに達したときにストリームマネージャーが新しいデータを処理する方法を定義します。MessageStreamDefinition
とそのデータ型 (ExportDefinition
、StrategyOnFull
、Persistence
など) を使用して、他のストリームプロパティを定義できます。具体的には次のとおりです。
-
ターゲット AWS IoT Analytics、Kinesis データストリーム、AWS IoT SiteWise、自動エクスポート用の Amazon S3 送信先。詳細については、「AWS クラウド でサポートされている送信先のエクスポート設定」を参照してください。
-
エクスポートの優先度。ストリームマネージャーは、プライオリティの低いストリームよりも先にプライオリティの高いストリームをエクスポートします。
-
AWS IoT Analytics の最大バッチサイズとバッチ間隔、Kinesis データストリーム、AWS IoT SiteWise 送信先。ストリームマネージャーは、いずれかの条件が満たされたときにメッセージをエクスポートします。
-
T ime-to-live (TTL)。ストリームデータが処理可能であることを保証する時間。この期間内にデータを消費できることを確認する必要があります。これは削除ポリシーではありません。TTL 期間の直後にデータが削除されない場合があります。
-
ストリームの永続性。ストリームをファイルシステムに保存して、コアを再起動してもデータを保持するか、ストリームをメモリに保存するかを選択します。
-
開始するシーケンス番号。エクスポートの開始メッセージとして使用するメッセージのシーケンス番号を指定します。
MessageStreamDefinition
の詳細については、対象言語の SDK リファレンスを参照してください。
StreamManagerClient
は、ストリームを HTTP サーバーにエクスポートするため、ターゲットの送信先も提供します。このターゲットは、テストのみを目的としています。本場環境での使用は安定しておらず、サポートされていません。
ストリームが作成されると、Greengrass コンポーネントはストリームにメッセージを追加し、エクスポート用データを送信して、ローカル処理用にストリームからメッセージを読み取ります。作成するストリームの数は、ハードウェアの機能とビジネスケースによって異なります。1 つの戦略は、AWS IoT Analytics または Kinesis データストリームのターゲットチャネルごとにストリームを作成することです。ただし、1 つのストリームに複数のターゲットを定義できます。ストリームは寿命に耐久性があります。
要件
この操作には次の要件があります:
例
次のスニペットでは、StreamName
という名前のストリームが作成されます。MessageStreamDefinition
のストリームプロパティとと下位のデータタイプを定義します。
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS クラウド.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: create_message_stream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: createMessageStream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSiteWise(null)
.withS3(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: createMessageStream | MessageStreamDefinition
エクスポート先設定の詳細については、「AWS クラウド でサポートされている送信先のエクスポート設定」を参照してください。
メッセージの追加
エクスポート用にストリームマネージャーにデータを送信するため、Greengrass コンポーネントはデータをターゲットストリームに追加します。エクスポート先によって、この手法に渡すデータタイプが決まります。
要件
この操作には次の要件があります:
例
AWS IoT Analytics または Kinesis データストリームのエクスポート先
次のスニペットは、StreamName
という名前のストリームにメッセージを追加します。AWS IoT Analytics または Kinesis データストリストリームの送信先の場合、Greengrass コンポーネントがデータの BLOB を追加します。
このスニペットには次の要件があります:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: append_message
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: appendMessage
AWS IoT SiteWise エクスポート先
次のスニペットは、StreamName
という名前のストリームにメッセージを追加します。AWS IoT SiteWise 送信先の場合、Greengrass コンポーネントはシリアル化された PutAssetPropertyValueEntry
オブジェクトを追加します。詳細については、「AWS IoT SiteWise にエクスポートする」を参照してください。
AWS IoT SiteWise にデータを送信するとき、データは BatchPutAssetPropertyValue
アクションの要件を満す必要があります。詳細については、AWS IoT SiteWise API リファレンスの「BatchPutAssetPropertyValue」を参照してください。
このスニペットには次の要件があります:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: append_message | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
// than 10 minutes in the past. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: appendMessage | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: appendMessage | PutAssetPropertyValueEntry
Amazon S3 のエクスポート先
次のスニペットは、StreamName
という名前のストリームにエクスポートタスクを追加します。Amazon S3 の送信先の場合、Greengrass コンポーネントはシリアル化された S3ExportTaskDefinition
オブジェクトを追加します。これには、ソース入力ファイルとターゲット Amazon S3 オブジェクトに関する情報が含まれています。指定されたオブジェクトが存在しない場合、ストリームマネージャーがユーザーに代わってそのオブジェクトを作成します。詳細については、「Amazon S3 へのエクスポート」を参照してください。
このスニペットには次の要件があります:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: append_message | S3ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: appendMessage | S3ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: appendMessage | S3ExportTaskDefinition
メッセージの読み取り
ストリームのメッセージを読み取ります。
要件
この操作には次の要件があります:
例
次のスニペットは、StreamName
という名前のストリームからメッセージを読み取ります。読み取りメソッドは、読み取りを開始するシーケンス番号、読み取る最小値と最大値、メッセージを読み取るためのタイムアウトを指定するオプションの ReadMessagesOptions
オブジェクトを受け取ります。
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: read_messages | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: readMessages | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: readMessages | ReadMessagesOptions
ストリームの一覧表示
ストリームマネージャーでストリームのリストを取得します。
要件
この操作には次の要件があります:
例
次のスニペットは、ストリームマネージャーのストリームのリストを (名前で) 取得します。
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: list_streams
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: listStreams
メッセージストリームの説明
ストリーム定義、サイズ、エクスポート状態を含め、ストリームに関するメタデータを取得します。
要件
この操作には次の要件があります:
例
次のスニペットは、ストリームの定義、サイズ、エクスポーターのステータスなど、StreamName
という名前のストリームに関するメタデータを取得します。
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: describe_message_stream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: describeMessageStream
メッセージストリームの更新
既存ストリームのプロパティを更新します。ストリームを作成した後に要件が変更された場合、ストリームの更新が必要となる場合があります。例:
-
AWS クラウド 送信先の新しいエクスポート設定を追加します。
-
ストリームの最大サイズを増やして、データのエクスポートまたは保持方法を変更します。例えば、ストリームサイズとフル設定の戦略を組み合わせると、ストリームマネージャーが処理する前に、データが削除または拒否されることがあります。
-
エクスポートの一時停止と再開。例えば、エクスポートタスクが長時間実行されているときにアップロードするデータ量を抑える場合。
Greengrass コンポーネントはこの高レベルプロセスに準拠してストリームを更新します:
-
ストリームの概要を取得します。
-
対応する MessageStreamDefinition
と下位オブジェクトのターゲットプロパティを更新します。
-
更新済み MessageStreamDefinition
として渡します。更新済みストリームの完全なオブジェクト定義を必ず含めてください。未定義のプロパティはデフォルト値に戻ります。
エクスポートで開始メッセージとして使用するメッセージのシーケンス番号を指定できます。
要件
この操作には次の要件があります:
例
次のスニペットは、StreamName
という名前のストリームを更新します。Kinesis データストリームにエクスポートするストリームの複数プロパティを更新します。
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: updateMessageStream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS クラウド.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: update_message_stream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS クラウド.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: updateMessageStream | MessageStreamDefinition
ストリーム更新の制約
ストリームを更新する際に次の制約が適用されます。次のリストに記載がない限り、更新は直ちに有効になります。
-
ストリームの永続性を更新することはできません。この動作を変更するには、ストリームを削除して、新しい永続性ポリシーを定義するストリームを作成します。
-
ストリームの最大サイズは、次の条件を満たしている場合に限って更新できます:
-
ストリーム セグメント サイズを、ストリームの最大サイズよりも小さい値に更新できます。更新した設定は、新しいセグメントに適用されます。
-
有効期限 (TTL) プロパティの更新は、新しい追加操作に適用されます。この値を減らす場合、ストリームマネージャーは TTL を超える既存のセグメントを削除することもあります。
-
フルプロパティの戦略の更新は、新しい追加操作に適用されます。最も古いデータを上書きするように戦略を設定する場合、ストリームマネージャーは新しい設定に基づいて既存のセグメントも上書きすることがあります。
-
書き込み時のフラッシュのプロパティ更新は、新しいメッセージに適用されます。
-
エクスポート設定の更新は、新しいエクスポートに適用されます。更新リクエストは、サポートするすべてのエクスポート設定を含める必要があります。それ以外の場合、ストリームマネージャーが削除します。
-
エクスポート設定を更新するとき、ターゲットのエクスポート設定の識別子を指定します。
-
エクスポート設定を追加するには、新しいエクスポート設定に対して一意の識別子を指定します。
-
エクスポート設定を削除するには、エクスポート設定を省略します。
-
ストリームのエクスポート設定の開始シーケンス番号を更新するには、最後のシーケンス番号よりも小さい値を指定する必要があります。この情報を見つけるには、ストリームを記述して、返された MessageStreamInfo
オブジェクトのストレージ状態を確認します。
メッセージストリームの削除
ストリームを削除します。ストリームを削除すると、ストリームに保存されているすべてのデータがディスクから削除されます。
要件
この操作には次の要件があります:
例
次のスニペットは、StreamName
という名前のストリームを削除します。
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK リファレンス: deleteMessageStream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK リファレンス: delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK リファレンス: deleteMessageStream
以下も参照してください。