AWS クラウド でサポートされている送信先のエクスポート設定 - AWS IoT Greengrass

AWS クラウド でサポートされている送信先のエクスポート設定

ユーザー定義の Greengrass コンポーネントは、ストリームマネージャー SDK で StreamManagerClient を使用して、ストリームマネージャーとやり取りします。コンポーネントは、[creates a stream] (ストリームの作成) または[updates a stream] (ストリームの更新) を行う際に、MessageStreamDefinition オブジェクト (エクスポート定義などのストリームプロパティ) を渡します。ExportDefinition オブジェクトには、そのストリームに定義されたエクスポート設定が含まれています。ストリームマネージャーは、これらのエクスポート設定を使用して、ストリームをエクスポートする場所と方法を決定します。


      ExportDefinition プロパティタイプのオブジェクトモデル図。

1 つのストリームに 0 または 1 つ以上のエクスポートを定義できます。この場合、1 つの送信先タイプに複数のエクスポートを定義することも可能です。例えば、ストリームを 2 つの AWS IoT Analytics チャネルと 1 つの Kinesis データストリームにエクスポートできます。

エクスポートに失敗した場合、ストリームマネージャーは最大 5 分間隔で AWS クラウド へのデータのエクスポートを継続的に再試行します。再試行回数に上限はありません。

注記

StreamManagerClient を利用すると、ターゲットの送信先を使用して、ストリームを HTTP サーバーにエクスポートできます。このターゲットは、テストのみを目的としています。また、安定しておらず、実稼働環境での使用はサポートされていません。

これらの AWS クラウド リソースを維持する責任があります。

AWS IoT Analytics チャネル

ストリームマネージャーは、AWS IoT Analytics への自動エクスポートをサポートしています。AWS IoT Analytics による高度なデータ分析が、ビジネス上の意思決定や、機械学習モデルの改善に役立ちます。詳細については、「AWS IoT Analytics ユーザーガイド」の「AWS IoT Analytics とは?」を参照してください。

ストリームマネージャー SDK では、Greengrass コンポーネントは、IoTAnalyticsConfigを使用して、この宛先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

要件

このエクスポート先には以下の要件があります。

  • AWS IoT Analytics のターゲットチャネルは、Greengrass コアデバイスと同じ AWS アカウント と AWS リージョン にある必要があります。

  • コアデバイスが AWS サービスを操作できるように認証する は、ターゲットのチャネルに iotanalytics:BatchPutMessage 権限を許可する必要があります。例:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

AWS IoT Analytics にエクスポートする

AWS IoT Analytics にエクスポートするストリームを作成するには、Greengrass コンポーネントで 1 つ以上の IoTAnalyticsConfig オブジェクトを含むエクスポート定義を使用して [create a stream] (ストリームを作成) します。このオブジェクトによって、ターゲットチャネル、バッチサイズ、バッチ間隔、優先度などのエクスポート設定を定義します。

Greengrass コンポーネントがデバイスからデータを受信するとき、ターゲットストリームにデータの BLOB を含むメッセージを追加します。

その後、ストリームマネージャーは、ストリームのエクスポート設定で定義されたバッチ設定と優先度に基づいてデータをエクスポートします。

Amazon Kinesis Data Streams

ストリームマネージャーは、Amazon Kinesis Data Streams への自動エクスポートをサポートしています。Kinesis Data Streams は、一般的に、大量のデータを集約して、データウェアハウスまたは MapReduce クラスターに読み込むために使用されます。詳細については、「Amazon Kinesis デベロッパーガイド」の「Amazon Kinesis Data Streams とは」を参照してください。

ストリームマネージャー SDK では、Greengrass コンポーネントは、KinesisConfigを使用して、この宛先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

要件

このエクスポート先には以下の要件があります。

  • Kinesis Data Streams のターゲットストリームは、Greengrass コアデバイスと同じ AWS アカウント と AWS リージョン にある必要があります。

  • コアデバイスが AWS サービスを操作できるように認証する は、ターゲットデータストリームに kinesis:PutRecords 権限を許可する必要があります。例:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

Kinesis Data Streams へのエクスポート

Kinesis Data Streams にエクスポートするストリームを作成するには、Greengrass コンポーネントで 1 つ以上の KinesisConfig オブジェクトを含むエクスポート定義を使用して [create a stream] (ストリームを作成) します。このオブジェクトによって、ターゲットデータストリーム、バッチサイズ、バッチ間隔、優先度などのエクスポート設定を定義します。

Greengrass コンポーネントがデバイスからデータを受信するとき、ターゲットストリームにデータの BLOB を含むメッセージを追加します。その後、ストリームマネージャーは、ストリームのエクスポート設定で定義されたバッチ設定と優先度に基づいてデータをエクスポートします。

ストリームマネージャーは、Amazon Kinesis にアップロードされた各レコードのパーティションキーとして、一意のランダムな UUID を生成します。

AWS IoT SiteWise アセットプロパティ

ストリームマネージャーは、AWS IoT SiteWise への自動エクスポートをサポートしています。AWS IoT SiteWiseを使用すると、産業機器からのデータを大規模に収集、整理、分析できます。詳細については、「AWS IoT SiteWise ユーザーガイド」の「AWS IoT SiteWise とは?」を参照してください。

ストリームマネージャー SDK では、Greengrass コンポーネントは、IoTSiteWiseConfigを使用して、この宛先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

注記

AWS には、AWS IoT SiteWise コンポーネントも用意されており、OPC-UA ソースからのデータストリーミングに使用できるあらかじめ構築されたソリューションを提供します。詳細については、「IoT SiteWise OPC-UA コレクター」を参照してください。

要件

このエクスポート先には以下の要件があります。

  • AWS IoT SiteWise のターゲットアセットプロパティは、Greengrass コアデバイスと同じ AWS アカウント と AWS リージョン にある必要があります。

    注記

    AWS IoT SiteWise がサポートする AWS リージョン のリストについては、「AWS 全般のリファレンス」の「AWS IoT SiteWise エンドポイントとクォータ」を参照してください。

  • コアデバイスが AWS サービスを操作できるように認証する は、ターゲットのアセットプロパティに iotsitewise:BatchPutAssetPropertyValue 権限を許可する必要があります。次の例では、ポリシーで iotsitewise:assetHierarchyPath 条件キーを使用して、ターゲットルートアセットとその子へのアクセスを許可しています。ポリシーから Condition を削除して、すべての AWS IoT SiteWise アセットへのアクセスを許可したり、個々のアセットの ARN を指定したりできます。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

    重要なセキュリティ情報については、「AWS IoT SiteWise ユーザーガイド」の「BatchPutAssetPropertyValue 認証」を参照してください。

AWS IoT SiteWise にエクスポートする

AWS IoT SiteWise にエクスポートするストリームを作成するには、Greengrass コンポーネントで 1 つ以上の IoTSiteWiseConfig オブジェクトを含むエクスポート定義を使用して [create a stream] (ストリームを作成) します。このオブジェクトによって、バッチサイズ、バッチ間隔、優先度などのエクスポート設定を定義します。

Greengrass コンポーネントがデバイスからアセットプロパティデータを受信するとき、ターゲットストリームにデータを含むメッセージを追加します。メッセージは、JSON シリアル化された PutAssetPropertyValueEntry オブジェクトであり、これには、1 つ以上のアセットプロパティに対するプロパティ値が含まれています。詳細については、AWS IoT SiteWise のエクスポート先に関する「メッセージの追加」を参照してください。

注記

AWS IoT SiteWise にデータを送信する場合、データは BatchPutAssetPropertyValue アクションの要件を満たしている必要があります。詳細は、「AWS IoT SiteWise API リファレンス」の「BatchPutAssetPropertyValue」を参照してください。

その後、ストリームマネージャーは、ストリームのエクスポート設定で定義されたバッチ設定と優先度に基づいてデータをエクスポートします。

ストリームマネージャーの設定と Greengrass コンポーネントロジックを調整して、エクスポート対策を設計できます。例:

  • ほぼリアルタイムのエクスポートでは、少ないバッチサイズと短い間隔を設定して、受信したデータをストリームに追加します。

  • バッチ処理の最適化、帯域幅の制約軽減、コスト最小化のために、Greengrass コンポーネントは、データをストリームに追加する前に、単一のアセットプロパティに対して受信した timestamp-quality-value (TQV) データポイントをプールできます。対策を 1 つ挙げるとすれば、それは、同じプロパティのエントリを複数送信するのではなく、最大 10 の異なるプロパティとアセットの組み合わせ、またはプロパティエイリアスのエントリを 1 つのメッセージでバッチ処理することです。これにより、ストリームマネージャーは AWS IoT SiteWise クォータ内にとどまることができます。

Amazon S3 オブジェクト

ストリームマネージャーは、Amazon S3 への自動エクスポートをサポートしています。Amazon S3 を使用すると、膨大なデータの保存と取得を行えます。詳細については、「Amazon Simple Storage Service デベロッパーガイド」の「Amazon S3 とは」を参照してください。

ストリームマネージャー SDK では、Greengrass コンポーネントは、S3ExportTaskExecutorConfigを使用して、この宛先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

要件

このエクスポート先には以下の要件があります。

  • ターゲット Amazon S3 バケットは Greengrass コアデバイスと同じ AWS アカウント にある必要があります。

  • [Greengrass container] (Greengrass コンテナ) モードで実行される Lambda 関数が入力ファイルを入力ファイルディレクトリに書き込む場合は、書き込み権限を持つコンテナ内のボリュームとしてディレクトリをマウントする必要があります。これにより、ファイルがルートファイルシステムに書き込まれ、コンテナの外部で実行されるストリームマネージャーコンポーネントに表示されるようになります。

  • Docker コンテナコンポーネントが入力ファイルを入力ファイルディレクトリに書き込む場合は、書き込み権限を持つコンテナ内のボリュームとしてディレクトリをマウントする必要があります。これにより、ファイルがルートファイルシステムに書き込まれ、コンテナの外部で実行されるストリームマネージャーコンポーネントに表示されるようになります。

  • コアデバイスが AWS サービスを操作できるように認証する で、ターゲットのバケットに次の権限を許可する必要があります。例:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

Amazon S3 へのエクスポート

Amazon S3 にエクスポートするストリームを作成するには、Greengrass コンポーネントで S3ExportTaskExecutorConfig オブジェクトを使用してエクスポートポリシーを設定します。このポリシーによって、マルチパートアップロードのしきい値や優先度といったエクスポート設定を定義します。Amazon S3 エクスポートでは、ストリームマネージャーが、コアデバイス上のローカルファイルから読み取るデータをアップロードします。アップロードを開始するには、Greengrass コンポーネントでエクスポートタスクをターゲットストリームに追加します。エクスポートタスクには、入力ファイルとターゲット Amazon S3 オブジェクトの情報が含まれます。ストリームマネージャーは、ストリームに追加された順にタスクを実行します。

注記

ターゲットバケットは、AWS アカウント に存在しなければなりません。指定したキーのオブジェクトが存在しない場合は、ストリームマネージャーによってオブジェクトが作成されます。

ストリームマネージャーは、マルチパートアップロードしきい値プロパティ、最小パーツサイズ設定、入力ファイルサイズを使用して、データのアップロード方法を決定します。マルチパートアップロードのしきい値は、最小パートサイズ以上でなければなりません。データを並行してアップロードする場合は、複数のストリームを作成できます。

ターゲット Amazon S3 オブジェクトを指定するキーには、有効な Java DateTimeFormatter 文字列を !{timestamp:value} プレースホルダーに含めることができます。これらのタイムスタンププレースホルダーを使用すると、入力ファイルデータがアップロードされた時刻に基づいて Amazon S3 のデータを分割できます。例えば、次のキー名は、my-key/2020/12/31/data.txt などの値に解決されます。

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注記

ストリームのエクスポートステータスを監視する場合は、まずステータスストリームを作成して、そのストリームを使用するようにエクスポートストリームを設定します。詳細については、「エクスポートタスクの監視」を参照してください。

入力データの管理

IoT アプリケーションが入力データのライフサイクル管理に使用するコードを作成できます。次のワークフロー例は、Greengrass コンポーネントを使用してこのデータを管理する方法を示しています。

  1. ローカルプロセスは、デバイスまたは周辺機器からデータを受信し、コアデバイスのディレクトリ内にあるファイルにデータを書き込みます。これらが、ストリームマネージャーの入力ファイルとなります。

  2. Greengrass コンポーネントはディレクトリをスキャンし、新しいファイルが作成されると、ターゲットストリームに[appends an export task] (エクスポートタスクを追加) します。このタスクは、JSON シリアル化された S3ExportTaskDefinition オブジェクトであり、これによって、入力ファイルの URL、ターゲットの Amazon S3 バケットとキー、オプションのユーザーメタデータが指定されます。

  3. ストリームマネージャーは、入力ファイルを読み取り、追加されたタスクの順に Amazon S3 にデータをエクスポートします。ターゲットバケットは、AWS アカウント に存在しなければなりません。指定したキーのオブジェクトが存在しない場合は、ストリームマネージャーによってオブジェクトが作成されます。

  4. Greengrass コンポーネントは、ステータスストリームから[reads messages] (メッセージを読み取り)、エクスポートステータスを監視します。エクスポートタスクが完了すると、Greengrass コンポーネントは対応する入力ファイルを削除します。詳細については、「エクスポートタスクの監視」を参照してください。

エクスポートタスクの監視

IoT アプリケーションが Amazon S3 エクスポートのステータス監視に使用するコードを作成できます。Greengrass コンポーネントは、ステータスストリームを作成して、そのストリームにステータス更新を書き込むようにエクスポートストリームを設定する必要があります。1 つのステータスストリームは、Amazon S3 にエクスポートする複数のストリームからステータスの更新を受け取ることができます。

まず、ステータスストリームとして使用するストリームを作成します。ストリームのサイズとリテンションポリシーを設定して、ステータスメッセージのライフスパンを制御できます。例:

  • ステータスメッセージを保存しない場合は、PersistenceMemory に設定します。

  • 新しいステータスメッセージが失われないようにするには、StrategyOnFullOverwriteOldestData に設定します。

次に、ステータスストリームを使用するようにエクスポートストリームを作成または更新します。具体的には、ストリームの S3ExportTaskExecutorConfig エクスポート設定のステータス設定プロパティを設定します。この設定により、エクスポートタスクに関するステータスメッセージをステータスストリームに書き込むようにストリームマネージャーに指示します。StatusConfig オブジェクトで、ステータスストリームの名前と冗長性のレベルを指定します。サポート対象の値を次に示します。最も冗長でないもの (ERROR) から最も冗長なもの (TRACE) を表しています。デフォルトは INFO です。

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

次のワークフロー例は、Greengrass コンポーネントがステータスストリームを使用してエクスポートステータスを監視する方法を示しています。

  1. 前のワークフローで説明したように、Greengrass コンポーネントは、エクスポートタスクに関するステータスメッセージをステータスストリームに書き込むように設定されたストリームに [appends an export task] (エクスポートタスクを追加) します。この追加の操作によって、タスク ID を表すシーケンス番号が返ります。

  2. Greengrass コンポーネントは、ステータスストリームから順番に[reads messages] (メッセージを読み取り) ます。その後、ストリーム名とタスク ID に基づいて、またはメッセージコンテキストからのエクスポートタスクプロパティに基づいてメッセージをフィルタリングします。例えば、Greengrass コンポーネントは、エクスポートタスクの入力ファイル URL でフィルタリングできます。このタスクは、メッセージコンテキストの S3ExportTaskDefinition オブジェクトで表されます。

    次のステータスコードは、エクスポートタスクが完了の状態になったことを示します。

    • Success。アップロードは正常に完了しました。

    • Failure。ストリームマネージャーでエラー (例: 指定したバケットが存在しないなど) が発生しました。問題の解決後に、エクスポートタスクをストリームに再度追加できます。

    • Canceled。ストリームまたはエクスポートの定義が削除された、もしくはタスクの存続期間 (TTL) の有効期限が切れたため、タスクは停止されました。

    注記

    タスクのステータスは InProgress または Warning の場合もあります。ストリームマネージャーは、タスクの実行に影響しないエラーがイベントから返ったときに警告を発行します。例えば、部分的アップロードのクリーンアップが失敗すると、警告を返します。

  3. エクスポートタスクが完了すると、Greengrass コンポーネントは対応する入力ファイルを削除します。

次の例は、Greengrass コンポーネントがステータスメッセージを読み取り、処理する方法を示しています。

Python
import time from stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: read_messages | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: readMessages | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require(*'aws-greengrass-stream-manager-sdk'*); const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: readMessages | StatusMessage