使用 StreamManagerClient ストリームとの共同作業を - AWS IoT Greengrass

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

使用 StreamManagerClient ストリームとの共同作業を

ユーザー定義 Lambda 機能を実行する AWS IoT Greengrass コア は、 StreamManagerClient オブジェクトの AWS IoT Greengrass Core SDK ストリームを ストリーム マネージャ そして、ストリームと対話します。Lambda 関数がストリームを作成するとき、ストリームの AWS クラウド送信先、優先順位、その他のエクスポートおよびデータ保持ポリシーを定義します。ストリームマネージャにデータを送信するには、 Lambda 関数は、ストリームにデータを追加します。ストリームのエクスポート先が定義されている場合、ストリームマネージャはストリームを自動的にエクスポートします。

注記

通常、ストリームマネージャーのクライアントはユーザー定義 Lambda 関数です。ビジネスケースで必要な場合は、Greengrass コア (Docker コンテナなど) で実行されている非Lambdaプロセスがストリームマネージャーと対話できるようにすることもできます。詳細については、を参照してください クライアント認証

このトピックのスニペットは、クライアントがどのように StreamManagerClient ストリームを操作する方法。メソッドとその引数に関する実装の詳細については、各スニペットの後にリストされているSDK参照へのリンクを使用してください。完全なPythonを含むチュートリアル用 Lambda 関数、「」を参照カンスウ AWSクラウドへのデータストリームのエクスポート (コンソール) 又は データ ストリームを AWS クラウド(CLI).

あなたの Lambda 機能がインスタンス化されます StreamManagerClient 関数ハンドラの外部にあります。ハンドラでインスタンス化されると、関数は呼び出されるたびに client およびストリームマネージャへの接続を作成します。

注記

ハンドラで StreamManagerClient のインスタンス化を行う場合は、client が作業を完了したときに、close() メソッドを明示的に呼び出す必要があります。それ以外の場合、client は接続を開いたままにし、スクリプトが終了するまで別のスレッドを実行します。

StreamManagerClient では次の操作がサポートされています。

メッセージストリームの作成

ストリームを作成するには、ユーザー定義の Lambda 関数は、createメソッドを呼び出して、 MessageStreamDefinition オブジェクト。このオブジェクトは、ストリームの一意の名前を指定し、ストリームマネージャが最大ストリームサイズに達したときに新しいデータをどのように処理するかを定義します。MessageStreamDefinition とそのデータ型 (ExportDefinitionStrategyOnFullPersistence など) を使用して、他のストリームプロパティを定義できます。具体的には以下のとおりです。

  • ターゲット AWS IoT Analytics、 Kinesis Data Streams、 AWS IoT SiteWise、および Amazon S3 自動エクスポートの宛先。詳細については、サポートされている構成をエクスポート AWS クラウドの送信先 を参照してください。

  • エクスポートの優先度。ストリームマネージャーは、プライオリティの低いストリームよりも先にプライオリティの高いストリームをエクスポートします。

  • の最大バッチサイズとバッチ間隔 AWS IoT Analytics、 Kinesis Data Streams、および AWS IoT SiteWise 目的地。ストリームマネージャーは、いずれかの条件が満たされたときにメッセージをエクスポートします。

  • 有効期限 (TTL) ストリームデータが処理可能であることを保証する時間。この期間内にデータを消費できることを確認する必要があります。これは削除ポリシーではありません。TTL 期間の直後にデータが削除されない場合があります。

  • ストリームの永続性。ストリームをファイルシステムに保存して、コアを再起動してもデータを保持するか、ストリームをメモリに保存するかを選択します。

  • 開始シーケンス番号。エクスポートの開始メッセージとして使用するメッセージのシーケンス番号を指定します。

詳細情報については、 MessageStreamDefinition」を参照してください。

注記

は、ストリームを HTTP サーバーにエクスポートするために使用できるターゲット送信先StreamManagerClientも提供します。このターゲットは、テストのみを目的としています。本番環境での使用には安定しておらず、またサポートされていません。

ストリームが作成された後、 Lambda 関数は、 メッセージの追加 エクスポート用にデータを送信し、 メッセージの読み取り ローカル処理用ストリームから。作成するストリームの数は、ハードウェアの機能とビジネスケースによって異なります。1つの戦略は、 AWS IoT Analytics または Kinesis データ ストリーム。ただし、ストリームに複数のターゲットを定義することができます。ストリームは寿命に耐久性があります。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

注記

でストリームを作成する AWS IoT SiteWise 又は Amazon S3 エクスポート先には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.11.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.6.0  |Java:   1.5.0  |  Node.js: 1.7.0

Examples

次のスニペットは、 StreamName. これは、 MessageStreamDefinition および下位データタイプ。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud. kinesis=None, iot_analytics=None, iot_sitewise=None, s3=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: 作成_メッセージ_ストリーム | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: createMessageStream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: createMessageStream | MessageStreamDefinition

エクスポート先の構成についての詳細は、「 サポートされている構成をエクスポート AWS クラウドの送信先.

 

メッセージの追加

エクスポートのためにストリームマネージャにデータを送信するには、 Lambda 関数は、ターゲット ストリームにデータを追加します。エクスポート先は、このメソッドに渡すデータ型を決定します。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

注記

にメッセージを追加する AWS IoT SiteWise 又は Amazon S3 エクスポート先には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.11.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.6.0  |Java:   1.5.0  |  Node.js: 1.7.0

Examples

AWS IoT Analytics 又は Kinesis Data Streams エクスポート先

次のスニペットは、 StreamName. 対象 AWS IoT Analytics 又は Kinesis Data Streams 目的地、 Lambda 関数は、データのブロックを追加します。

このスニペットには、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: メッセージの追加

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: appendMessage

AWS IoT SiteWise エクスポート先

次のスニペットは、 StreamName. 対象 AWS IoT SiteWise 目的地、 Lambda シリアル化された PutAssetPropertyValueEntry オブジェクト。詳細については、AWS IoT SiteWise にエクスポートする を参照してください。

注記

にデータを送信する場合、データはAWS IoT SiteWiseアクションの要件を満たしている必要があります。BatchPutAssetPropertyValue詳細については、AWS IoT SiteWise API リファレンスの「BatchPutAssetPropertyValue」を参照してください。

このスニペットには、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.11.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.6.0  |Java:   1.5.0  |  Node.js: 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: メッセージの追加 | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: appendMessage | PutAssetPropertyValueEntry

Amazon S3 エクスポート先

次のスニペットは、 StreamName. 対象 Amazon S3 目的地、 Lambda シリアル化された S3ExportTaskDefinition ソース入力ファイルおよびターゲットに関する情報を含むオブジェクト Amazon S3 オブジェクト。指定オブジェクトが存在しない場合は、Stream Manager によって作成されます。詳細については、Amazon S3 にエクスポートする を参照してください。

このスニペットには、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.11.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.6.0  |Java:   1.5.0  |  Node.js: 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: メッセージの追加 | S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: appendMessage | S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: appendMessage | S3ExportTaskDefinition

 

メッセージの読み取り

ストリームからメッセージを読み取る。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

Examples

次のスニペットは、 StreamName. 読み取りメソッドには、オプションの ReadMessagesOptions ]オブジェクトに、読み取りを開始するシーケンス番号、読み取る最小数と最大数、メッセージの読み取りタイムアウトを指定します。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: 既読メッセージ | ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: readMessages | ReadMessagesOptions

 

ストリームのリスト表示

ストリームマネージャでストリームのリストを取得します。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

Examples

次のスニペットは、ストリームマネージャーのストリームのリストを (名前で) 取得します。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: list_streams(リスト_ストリーム)

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: listStreams

 

メッセージストリームの説明

ストリームの定義、サイズ、エクスポート ステータスなど、ストリームに関するメタデータを取得します。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

Examples

次のスニペットは、ストリームの定義、サイズ、エクスポーターのステータスなど、StreamName という名前のストリームに関するメタデータを取得します。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: 説明_メッセージ_ストリーム

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: describeMessageStream

 

メッセージストリームを更新

既存のストリームのプロパティを更新します。ストリームの作成後に要件が変更された場合、ストリームを更新できます。以下に例を示します。

  • 新しい 構成のエクスポート 対象 AWS クラウドの宛先。

  • ストリームの最大サイズを大きくして、データのエクスポートまたは保持方法を変更します。たとえば、フル設定の戦略とストリーム サイズの組み合わせにより、ストリーム マネージャがデータを処理する前に、データが削除されたり、拒否されたりする可能性があります。

  • エクスポートの一時停止と再開。たとえば、エクスポートタスクが長時間実行されていて、アップロードデータを調整したい場合など。

あなたの Lambda 関数は、ストリームを更新するために、この高レベル プロセスに従います。

  1. ストリームの説明を取得します。

  2. 対応する MessageStreamDefinition および下位オブジェクト。

  3. 更新された MessageStreamDefinition. 更新されたストリームのオブジェクト定義をすべて含めるようにしてください。未定義のプロパティは、デフォルト値に戻ります。

    エクスポートの開始メッセージとして使用するメッセージのシーケンス番号を指定できます。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.11.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.6.0  |Java:   1.5.0  |  Node.js: 1.7.0

Examples

次のスニペットは、 StreamName. エクスポート先のストリームの複数のプロパティを更新します。 Kinesis Data Streams.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: updateMessageStream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: 更新メッセージストリーム | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: updateMessageStream | MessageStreamDefinition

ストリーム更新の制約

ストリームを更新する場合は、次の制約が適用されます。以下のリストに記載されている場合を除き、更新は直ちに有効になります。

  • ストリームの永続性を更新できません。この動作を変更するには、 ストリームの削除 および ストリームの作成 新しい永続ポリシーを定義します。

  • ストリームの最大サイズを更新できるのは、次の条件の場合のみです。

    • 最大サイズは、ストリームの現在のサイズ以上でなければなりません。この情報を見つけるには、ストリームを記述し、返されたオブジェクトのストレージステータスを確認します。MessageStreamInfo

    • 最大サイズはストリームのセグメント サイズ以上でなければなりません。

  • ストリーム セグメント サイズをストリームの最大サイズ未満の値に更新できます。更新された設定は、新しいセグメントに適用されます。

  • time to live(TTL)プロパティの更新は、新しい追加操作に適用されます。この値を減らすと、ストリーム マネージャは TTL を超える既存のセグメントも削除します。

  • 完全プロパティに関する戦略の更新は、新しい追加操作に適用されます。最も古いデータを上書きするように戦略を設定すると、ストリーム マネージャも新しい設定に基づいて既存のセグメントを上書きすることがあります。

  • [flush on write]プロパティの更新は、新しいメッセージに適用されます。

  • エクスポート設定の更新は、新しいエクスポートに適用されます。更新要求には、サポートしたいすべてのエクスポート構成を含める必要があります。そうでない場合、ストリームマネージャはそれらを削除します。

    • エクスポート構成を更新する場合は、ターゲット エクスポート構成の識別子を指定します。

    • エクスポート構成を追加するには、新しいエクスポート構成の一意の識別子を指定します。

    • エクスポート構成を削除するには、エクスポート構成を省略します。

  • 宛先 更新 ストリーム内のエクスポート設定の開始シーケンス番号を指定する場合は、最新のシーケンス番号よりも小さい値を指定する必要があります。この情報を見つけるには、ストリームを記述し、返されたオブジェクトのストレージステータスを確認します。MessageStreamInfo

 

メッセージストリームの削除

ストリームを削除します。ストリームを削除すると、ストリームに保存されているすべてのデータがディスクから削除されます。

Requirements

この操作には、次の要件があります。

  • 最小AWS IoT Greengrassコアバージョン: 1.10.0

  • 最小AWS IoT Greengrass Core SDKバージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

Examples

次のスニペットは、StreamName という名前のストリームを削除します。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDKリファレンス: deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDKリファレンス: delete_メッセージ_ストリーム

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDKリファレンス: deleteMessageStream

以下の資料も参照してください。