ストリームを操作するために StreamManagerClient を使用する - AWS IoT Greengrass

のドキュメントを表示していますAWS IoT Greengrass Version 1。AWS IoT Greengrass Version 2の最新のメジャーバージョンです。AWS IoT Greengrass。の使用方法の詳細については、「」を参照してください。AWS IoT Greengrass V2の詳細については、AWS IoT Greengrass Version 2開発者ガイド

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ストリームを操作するために StreamManagerClient を使用する

で実行されている Lambda 関数AWS IoT GreengrassコアはStreamManagerClientオブジェクトのあるAWS IoT GreengrassCore SDKでストリームを作成するストリームマネージャーし、ストリームと対話します。Lambda 関数がストリームを作成するとき、 AWS クラウド 配信先、優先順位、その他のエクスポートおよびデータ保持ポリシーです。ストリームマネージャーにデータを送信するには、Lambda 関数はストリームにデータを追加します。ストリームにエクスポート先が定義されている場合、ストリームマネージャーは自動的にストリームをエクスポートします。

注記

通常、ストリームマネージャーのクライアントはユーザー定義 Lambda 関数です。ビジネスケースで必要な場合は、Greengrass コアで実行されている非 Lambda プロセス (Docker コンテナなど) がストリームマネージャーと対話できるようにすることもできます。詳細については、「クライアント認証」を参照してください。

このトピックのスニペットは、クライアントがStreamManagerClientストリームで動作するようにします。メソッドとその引数の実装の詳細については、各スニペットの後にリストされている SDK リファレンスへのリンクを使用してください。完全な Python Lambda 関数を含むチュートリアルについては、データストリームを AWS クラウド (コンソール)またはデータストリームを AWS クラウド (CLI)

あなたのLambda 関数はインスタンス化する必要がありますStreamManagerClient関数ハンドラの外部で。ハンドラでインスタンス化されると、関数は呼び出されるたびに client およびストリームマネージャへの接続を作成します。

注記

ハンドラで StreamManagerClient のインスタンス化を行う場合は、client が作業を完了したときに、close() メソッドを明示的に呼び出す必要があります。それ以外の場合、client は接続を開いたままにし、スクリプトが終了するまで別のスレッドを実行します。

StreamManagerClient では次の操作がサポートされています。

メッセージストリームの作成

ストリームを作成するには、ユーザー定義の Lambda 関数が create メソッドを呼び出し、MessageStreamDefinitionオブジェクト。このオブジェクトは、ストリームの一意の名前を指定し、最大ストリームサイズに達したときにストリームマネージャーが新しいデータを処理する方法を定義します。MessageStreamDefinition とそのデータ型 (ExportDefinitionStrategyOnFullPersistence など) を使用して、他のストリームプロパティを定義できます。具体的には次のとおりです。

  • ターゲットAWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWise、および Amazon S3 宛先で自動エクスポートを実行できます。詳細については、「サポート対象のの構成をエクスポート AWS クラウド 送信先」を参照してください。

  • エクスポートの優先度。ストリームマネージャーは、プライオリティの低いストリームよりも先にプライオリティの高いストリームをエクスポートします。

  • 最大バッチサイズとバッチインターバルAWS IoT AnalyticsKinesis Data Streams、AWS IoT SiteWise送信先。ストリームマネージャーは、いずれかの条件が満たされたときにメッセージをエクスポートします。

  • 有効期限 (TTL) ストリームデータが処理可能であることを保証する時間。この期間内にデータを消費できることを確認する必要があります。これは削除ポリシーではありません。TTL 期間の直後にデータが削除されない場合があります。

  • ストリームの永続性。ストリームをファイルシステムに保存して、コアを再起動してもデータを保持するか、ストリームをメモリに保存するかを選択します。

  • 開始シーケンス番号。エクスポートの開始メッセージとして使用するメッセージのシーケンス番号を指定します。

の詳細MessageStreamDefinitionについては、ターゲット言語の SDK リファレンスを参照してください。

注記

StreamManagerClientは、ストリームを HTTP サーバーにエクスポートするために使用できるターゲット宛先も提供します。このターゲットは、テストのみを目的としています。実稼働環境での使用には安定しておらず、サポートされていません。

ストリームが作成されると、Lambda 関数はメッセージの追加をストリームに追加してエクスポート用のデータを送信し、メッセージの読み取りをストリームからローカル処理します。作成するストリームの数は、ハードウェアの機能とビジネスケースによって異なります。1 つの戦略は、ターゲットチャネルごとにストリームを作成することです。AWS IoT Analyticsまたは Kinesis データストリームを使用しますが、1 つのストリームに複数のターゲットを定義できます。ストリームは寿命に耐久性があります。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

注記

を使用したストリームの作成AWS IoT SiteWiseまたは Amazon S3 のエクスポート先には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.11.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Examples

次のスニペットでは、StreamName という名前のストリームが作成されます。これは、ストリームプロパティを定義しますMessageStreamDefinitionおよび従属データ型。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS クラウド . kinesis=None, iot_analytics=None, iot_sitewise=None, s3=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:stream|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド . new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:createMessageStream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド . new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:createMessageStream|MessageStreamDefinition

エクスポート先の構成の詳細については、「」を参照してください。サポート対象のの構成をエクスポート AWS クラウド 送信先

 

メッセージの追加

エクスポートのためにストリームマネージャーにデータを送信するには、Lambda 関数がそのデータをターゲットストリームに追加します。エクスポート先によって、このメソッドに渡すデータ型が決定されます。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

注記

でメッセージを追加するAWS IoT SiteWiseまたは Amazon S3 のエクスポート先には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.11.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Examples

AWS IoT Analyticsまたは Kinesis Data Streams エクスポート先

次のスニペットは、StreamName という名前のストリームにメッセージを追加します。を使用する場合AWS IoT Analyticsまたは Kinesis Data Streams 送信先を指定する場合、Lambda 関数はデータの BLOB を追加します。

このスニペットには以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:appendd

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:appendMessage

AWS IoT SiteWiseエクスポート先

次のスニペットは、StreamName という名前のストリームにメッセージを追加します。を使用する場合AWS IoT SiteWise宛先に入力すると、Lambda 関数はシリアル化されたPutAssetPropertyValueEntryオブジェクト。詳細については、「AWS IoT SiteWise にエクスポートする」を参照してください。

注記

データをに送信するとAWS IoT SiteWiseでは、データはBatchPutAssetPropertyValueaction. 詳は、AWS IoT SiteWise API リファレンスBatchPutAssetPropertyValue を参照してください。

このスニペットには以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.11.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:appendd|PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:appendMessage|PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:appendMessage|PutAssetPropertyValueEntry

Amazon S3 のエクスポート先

次のスニペットは、エクスポートタスクをという名前のストリームに追加します。StreamName。Amazon S3 送信先の場合、Lambda 関数は、シリアル化されたS3ExportTaskDefinitionソース入力ファイルとターゲットの Amazon S3 オブジェクトに関する情報を含むオブジェクト。指定したオブジェクトが存在しない場合は、Stream Manager によって作成されます。詳細については、「Amazon S3 へのエクスポート」を参照してください。

このスニペットには以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.11.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:appendd|ExportTaskdefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:appendMessage|ExportTaskdefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:appendMessage|ExportTaskdefinition

 

メッセージの読み取り

ストリームからメッセージを読み取ります。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Examples

次のスニペットは、StreamName という名前のストリームからメッセージを読み取ります。読み取りメソッドは、読み取りを開始するシーケンス番号、読み取る最小値と最大値、メッセージを読み取るためのタイムアウトを指定するオプションの ReadMessagesOptions オブジェクトを受け取ります。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:messages|ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:readMessages|ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:readMessages|ReadMessagesOptions

 

ストリームのリスト表示

ストリームマネージャーでストリームのリストを取得します。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Examples

次のスニペットは、ストリームマネージャーのストリームのリストを (名前で) 取得します。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:ListStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:ListStreams

 

メッセージストリームの説明

ストリーム定義、サイズ、エクスポートステータスなど、ストリームに関するメタデータを取得します。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Examples

次のスニペットは、ストリームの定義、サイズ、エクスポーターのステータスなど、StreamName という名前のストリームに関するメタデータを取得します。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:describe_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:DescribeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:DescribeMessageStream

 

メッセージストリームの更新

既存のストリームのプロパティを更新します。ストリームの作成後に要件が変更された場合は、ストリームを更新することをお勧めします。次に例を示します。

  • 新しいを追加します。設定をエクスポートするのために AWS クラウド 送信先。

  • ストリームの最大サイズを増やして、データのエクスポートまたは保持方法を変更します。たとえば、ストリームサイズとフル設定の戦略を組み合わせると、ストリームマネージャがデータを処理する前にデータが削除または拒否される可能性があります。

  • エクスポートを一時停止または再開します。たとえば、エクスポートタスクの実行時間が長く、アップロードデータを調整する場合などです。

Lambda 関数は次の高レベルのプロセスに従ってストリームを更新します。

  1. ストリームの説明を取得します。

  2. ターゲットプロパティを、対応するMessageStreamDefinitionおよび従属オブジェクトです。

  3. 更新されたMessageStreamDefinition。更新されたストリームの完全なオブジェクト定義を含めるようにしてください。未定義のプロパティは、デフォルト値に戻ります。

    エクスポートの開始メッセージとして使用するメッセージのシーケンス番号を指定できます。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.11.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Examples

次のスニペットは、StreamName。Kinesis Data Streams に書き出すストリームの複数のプロパティを更新します。

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:UpdateMessageStream|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド . messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:update|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド . messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:UpdateMessageStream|MessageStreamDefinition

ストリームの更新の制約

ストリームを更新する場合は、以下の制約が適用されます。次の一覧に明記されていない限り、更新は直ちに有効になります。

  • ストリームの永続性を更新することはできません。この動作を変更するには、ストリームの削除およびストリームを作成するで、新しい永続性ポリシーを定義します。

  • ストリームの最大サイズは、次の条件を満たしている場合にだけ更新できます。

    • 最大サイズは、ストリームの現在のサイズ以上である必要があります。この情報を見つけるには、ストリームの説明のストレージステータスを確認し、返されたMessageStreamInfoオブジェクト。

    • 最大サイズは、ストリームのセグメントサイズ以上である必要があります。

  • ストリームのセグメントサイズは、ストリームの最大サイズより小さい値に更新できます。更新された設定は、新しいセグメントに適用されます。

  • 有効期限 (TTL) プロパティの更新は、新しい追加操作に適用されます。この値を小さくすると、ストリームマネージャーは TTL を超える既存のセグメントも削除されることがあります。

  • full プロパティのストラテジーの更新は、新しい追加操作に適用されます。最も古いデータを上書きするように戦略を設定した場合、ストリームマネージャは新しい設定に基づいて既存のセグメントを上書きすることもあります。

  • 書き込み時フラッシュプロパティの更新は、新しいメッセージに適用されます。

  • エクスポート設定の更新は、新しいエクスポートに適用されます。更新要求には、サポートするすべてのエクスポート構成を含める必要があります。それ以外の場合、ストリームマネージャーはそれらを削除します。

    • エクスポート構成を更新するときは、ターゲットエクスポート構成の識別子を指定します。

    • エクスポート構成を追加するには、新しいエクスポート構成の一意の識別子を指定します。

    • エクスポート設定を削除するには、エクスポート設定を省略します。

  • 操作の更新ストリーム内のエクスポート設定の開始シーケンス番号を指定する場合は、最新のシーケンス番号より小さい値を指定する必要があります。この情報を見つけるには、ストリームの説明のストレージステータスを確認し、返されたMessageStreamInfoオブジェクト。

 

メッセージストリームの削除

ストリームを削除します。ストリームを削除すると、ストリームに保存されているすべてのデータがディスクから削除されます。

Requirements

この操作には以下の要件があります。

  • MinimumAWS IoT GreengrassCore バージョン: 1.10.0

  • MinimumAWS IoT Greengrassコア SDK バージョン: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Examples

次のスニペットは、StreamName という名前のストリームを削除します。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス:deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス:deleteM

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス:deleteMessageStream

以下の資料も参照してください。