匯出支援AWS 雲端目的地的組態 - AWS IoT Greengrass

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

匯出支援AWS 雲端目的地的組態

使用者定義 Greengrass 元件會在串流管理員 SDK StreamManagerClient 中使用,與串流管理員互動。當組件建立串流更新串流時,它會傳遞代表資料流屬性的MessageStreamDefinition物件,包括匯出定義。該ExportDefinition對象包含為流定義的導出配置。串流管理員會使用這些匯出設定來決定匯出串流的位置和方式。

屬性類型的對象模 ExportDefinition 型圖。

您可以在串流上定義零個或多個匯出設定,包括單一目標類型的多個匯出設定。例如,您可以將串流匯出到兩個AWS IoT Analytics通道和一個 Kinesis 資料串流。

對於失敗的匯出嘗試,串流管理員會持續重試將資料匯出至最多五分鐘的間隔。AWS 雲端重試次數沒有上限。

注意

StreamManagerClient也提供可用來將串流匯出至 HTTP 伺服器的目標目的地。此目標僅供測試之用。它不穩定或不支持在生產環境中使用。

您有責任維護這些AWS 雲端資源。

AWS IoT Analytics 頻道

流管理器支持自動導出到AWS IoT Analytics. AWS IoT Analytics可讓您對資料執行進階分析,以協助制定商業決策並改善機器學習模型。如需詳細資訊,請參閱什麼是AWS IoT Analytics?《AWS IoT Analytics使用者指南》中。

在串流管理員 SDK 中,您的 Greengrass 元件會使用IoTAnalyticsConfig來定義此目標類型的匯出設定。如需詳細資訊,請參閱您目標語言的 SDK 參考資料:

  • 開發套件AnalyticsConfig中的 IoT

  • Java SDK AnalyticsConfig 中的 IoT

  • Node.js 開發套件AnalyticsConfig中的 IoT

要求

此匯出目的地具有下列需求:

  • 中的目標通道AWS IoT Analytics必須AWS 帳戶與 AWS 區域 Greengrass 核心裝置位於相同的通道中。

  • 授權核心裝置與 AWS 服務互動必須允許以通道為目標的iotanalytics:BatchPutMessage權限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    您可以授與資源的細微或條件式存取權,例如使用萬用字元*命名配置。如需詳細資訊,請參閱 IAM 使用者指南中的新增和移除 IAM 政策

匯出至 AWS IoT Analytics

若要建立匯出至的串流AWS IoT Analytics,您的 Greengrass 元件會建立包含一或多個物件的匯出定義串流IoTAnalyticsConfig此物件會定義匯出設定,例如目標通道、批次大小、批次間隔和優先順序。

當 Greengrass 組件從設備接收數據時,它們會將包含數據 Blob 的消息附加到目標流中。

然後,串流管理員會根據串流的匯出設定中定義的批次設定和優先順序匯出資料。

Amazon Kinesis 數據流

串流管理員支援自動匯出至 Amazon Kinesis Data Streams。Kinesis Data Streams 通常用於彙總大量資料,並將其載入資料倉儲或 MapReduce 叢集。如需詳細資訊,請參閱什麼是 Amazon Kinesis Data Streams? 在 Amazon Kinesis 開發人員指南中。

在串流管理員 SDK 中,您的 Greengrass 元件會使用KinesisConfig來定義此目標類型的匯出設定。如需詳細資訊,請參閱您目標語言的 SDK 參考資料:

要求

此匯出目的地具有下列需求:

  • Kinesis 資料串流中的目標串流必須AWS 帳戶與 AWS 區域 Greengrass 核心裝置相同。

  • 授權核心裝置與 AWS 服務互動必須允許以資料串流為目標的kinesis:PutRecords權限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    您可以授與資源的細微或條件式存取權,例如使用萬用字元*命名配置。如需詳細資訊,請參閱 IAM 使用者指南中的新增和移除 IAM 政策

匯出至 Kinesis Data Streams

若要建立匯出至 Kinesis Data Streams,您的 Greengrass 元件會建立包含一或多個物件的匯出定義串流KinesisConfig此物件會定義匯出設定,例如目標資料串流、批次大小、批次間隔和優先順序。

當 Greengrass 組件從設備接收數據時,它們會將包含數據 Blob 的消息附加到目標流中。然後,串流管理員會根據串流的匯出設定中定義的批次設定和優先順序匯出資料。

串流管理員會針對上傳到 Amazon Kinesis 的每筆記錄,產生一個唯一的隨機 UUID 做為分區金鑰。

AWS IoT SiteWise資產性質

流管理器支持自動導出到AWS IoT SiteWise. AWS IoT SiteWise可讓您大規模收集、組織和分析來自工業設備的資料。如需詳細資訊,請參閱什麼是AWS IoT SiteWise?《AWS IoT SiteWise使用者指南》中。

在串流管理員 SDK 中,您的 Greengrass 元件會使用IoTSiteWiseConfig來定義此目標類型的匯出設定。如需詳細資訊,請參閱您目標語言的 SDK 參考資料:

  • 開發套件SiteWiseConfig中的 IoT

  • Java SDK SiteWiseConfig 中的 IoT

  • Node.js 開發套件SiteWiseConfig中的 IoT

注意

AWS也提供AWS IoT SiteWise元件,這些元件提供預先建置的解決方案,可用來串流來自 OPC-UA 來源的資料。如需詳細資訊,請參閱 IoT SiteWise OPC-UA 收集器

需求

此匯出目的地具有下列需求:

  • 中的目標資產屬性AWS IoT SiteWise必須AWS 帳戶與 AWS 區域 Greengrass 核心裝置位於相同的內容。

    注意

    如需AWS IoT SiteWise支援的AWS 區域清單,請參閱AWS一般參考中的AWS IoT SiteWise端點和配額

  • 授權核心裝置與 AWS 服務互動必須允許以資產屬性為目標的iotsitewise:BatchPutAssetPropertyValue權限。下列範例策略使用iotsitewise:assetHierarchyPath條件索引鍵來授與目標根資產及其子系的存取權。您可以Condition從政策中移除,以允許存取所有資AWS IoT SiteWise產或指定個別資產的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    您可以授與資源的細微或條件式存取權,例如使用萬用字元*命名配置。如需詳細資訊,請參閱 IAM 使用者指南中的新增和移除 IAM 政策

    如需重要的安全性資訊,請參閱AWS IoT SiteWise使用指南中的 BatchPutAssetPropertyValue 授權

匯出至 AWS IoT SiteWise

若要建立匯出至的串流AWS IoT SiteWise,您的 Greengrass 元件會建立包含一或多個物件的匯出定義串流IoTSiteWiseConfig此物件會定義匯出設定,例如批次大小、批次間隔和優先順序。

當 Greengrass 組件從設備接收資產屬性數據時,它們會將包含數據的消息附加到目標流中。消息是 JSON 序列化PutAssetPropertyValueEntry對象,其中包含一個或多個資產屬性的屬性值。如需詳細資訊,請參閱為AWS IoT SiteWise匯出目的地附加郵件

注意

當您將資料傳送至時AWS IoT SiteWise,您的資料必須符合BatchPutAssetPropertyValue動作的要求。如需詳細資訊,請參閱 AWS IoT SiteWise API 參考中的 BatchPutAssetPropertyValue

然後,串流管理員會根據串流的匯出設定中定義的批次設定和優先順序匯出資料。

您可以調整串流管理員設定和 Greengrass 元件邏輯來設計匯出策略。例如:

  • 對於近乎即時的匯出,請設定較低的批次大小和間隔設定,並在收到串流時將資料附加至串流。

  • 為了最佳化批次處理、減輕頻寬限制或將成本降至最低,Greengrass 元件可以在將資料附加至串流之前,為單一資產屬性集區收到的 timestamp-quality-value (TQV) 資料點。其中一種策略是在一則訊息中批次處理最多 10 個不同屬性資產組合或屬性別名的項目,而不是針對相同屬性傳送一個以上的項目。這有助於流管理器保持在AWS IoT SiteWise配額內。

Amazon S3 對象

串流管理員支援自動匯出至 Amazon S3。您可以使用 Amazon S3 存放和擷取大量資料。如需詳細資訊,請參閱什麼是 Amazon S3?Amazon 簡單存儲服務開發人員指南中。

在串流管理員 SDK 中,您的 Greengrass 元件會使用S3ExportTaskExecutorConfig來定義此目標類型的匯出設定。如需詳細資訊,請參閱您目標語言的 SDK 參考資料:

  • 開發套件ExportTaskExecutorConfig中 Python S3

  • Java 開發套件ExportTaskExecutorConfig中的 S3

  • Node.js 開發套件ExportTaskExecutorConfig中的 S3

要求

此匯出目的地具有下列需求:

  • 目標 Amazon S3 儲存貯體必須與 Greengrass 核心裝置位於AWS 帳戶相同的位置。

  • 如果在 Greengrass 容器模式下執行的 Lambda 函數將輸入檔案寫入輸入檔案目錄,您必須將目錄掛載為具有寫入權限的容器中的磁碟區。這樣可確保檔案會寫入根檔案系統,並且可以看到在容器外部執行的串流管理員元件。

  • 如果 Docker 容器組件將輸入文件寫入輸入文件目錄,則必須將該目錄掛載為具有寫入權限的容器中的卷。這樣可確保檔案會寫入根檔案系統,並且可以看到在容器外部執行的串流管理員元件。

  • 授權核心裝置與 AWS 服務互動必須允許目標值區的下列權限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    您可以授與資源的細微或條件式存取權,例如使用萬用字元*命名配置。如需詳細資訊,請參閱 IAM 使用者指南中的新增和移除 IAM 政策

匯出到 Amazon S3

若要建立匯出至 Amazon S3 的串流,您的 Greengrass 元件會使用該S3ExportTaskExecutorConfig物件來設定匯出政策。此原則會定義匯出設定,例如分段上傳臨界值和優先順序。對於 Amazon S3 匯出,串流管理員會上傳從核心裝置上的本機檔案讀取的資料。若要啟動上傳,您的 Greengrass 元件會將匯出工作附加至目標資料流。匯出任務包含輸入檔案和目標 Amazon S3 物件的相關資訊。流管理器按照它們被附加到流的順序運行任務。

注意

目標值區必須已存在於您的AWS 帳戶. 如果指定索引鍵的物件不存在,串流管理員會為您建立物件。

流管理器使用多部分上傳閾值屬性,最小部分大小設置和輸入文件的大小來確定如何上傳數據。多部分上傳臨界值必須大於或等於最小零件大小。如果您想要 parallel 上傳資料,可以建立多個串流。

指定目標 Amazon S3 物件的金鑰可以在!{timestamp:value}預留位置中包含有效的 Java DateTimeFormatter 字串。您可以使用這些時間戳記預留位置,根據輸入檔案資料上傳的時間在 Amazon S3 中分割資料。例如,下列索引鍵名稱會解析為一個值,例如my-key/2020/12/31/data.txt

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要監視串流的匯出狀態,請先建立狀態串流,然後設定匯出串流以使用它。如需詳細資訊,請參閱 監控匯出工作

管理輸入資料

您可以撰寫 IoT 應用程式用來管理輸入資料生命週期的程式碼。下列範例工作流程顯示如何使用 Greengrass 元件來管理此資料。

  1. 本機處理程序會從裝置或周邊設備接收資料,然後將資料寫入核心裝置上目錄中的檔案。這些都是流管理器的輸入文件。

  2. Greengrass 組件掃描目錄,並在創建新文件時將導出任務附加到目標流。工作是 JSON 序列化S3ExportTaskDefinition物件,用於指定輸入檔案的 URL、目標 Amazon S3 儲存貯體和金鑰,以及選用的使用者中繼資料。

  3. 串流管理員會讀取輸入檔案,並按照附加任務的順序將資料匯出至 Amazon S3。目標值區必須已存在於您的AWS 帳戶. 如果指定索引鍵的物件不存在,串流管理員會為您建立物件。

  4. Greengrass 元件會從狀態串流讀取訊息,以監視匯出狀態。匯出工作完成後,Greengrass 元件可以刪除對應的輸入檔案。如需詳細資訊,請參閱 監控匯出工作

監控匯出工作

您可以撰寫 IoT 應用程式用來監控 Amazon S3 匯出狀態的程式碼。您的 Greengrass 元件必須建立狀態串流,然後將匯出串流設定為將狀態更新寫入狀態串流。單一狀態串流可以從匯出至 Amazon S3 的多個串流接收狀態更新。

首先,建立用作狀態串流的串流。您可以設定串流的大小和保留原則,以控制狀態訊息的壽命。例如:

  • Memory如果您不想儲存狀態訊息,請設Persistence定為。

  • 設定StrategyOnFull為可OverwriteOldestData讓新的狀態訊息不會遺失。

然後,建立或更新匯出串流以使用狀態串流。具體來說,設置流的S3ExportTaskExecutorConfig導出配置的 status 配置屬性。此設置告訴流管理器寫入有關導出任務到狀態流的狀態消息。在StatusConfig物件中,指定狀態串流的名稱和詳細程度層級。下列支援的值範圍從最小冗長 (ERROR) 到最詳細 (TRACE)。預設值為 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

下列範例工作流程顯示 Greengrass 元件如何使用狀態串流監視匯出狀態。

  1. 如先前的工作流程所述,Greengrass 元件會將匯出工作附加至設定為寫入有關匯出工作至狀態串流的狀態訊息的串流。附加作業會傳回代表工作 ID 的序號。

  2. Greengrass 組件從狀態流中按順序讀取消息,然後根據流名稱和任務 ID 或基於從消息上下文的導出任務屬性過濾消息。例如,Greengrass 組件可以依匯出工作的輸入檔案 URL 進行篩選,此 URL 由訊息內容中的S3ExportTaskDefinition物件表示。

    下列狀態碼表示匯出任務已達到完成狀態:

    • Success。 上傳已成功完成。

    • Failure。 流管理器遇到錯誤,例如,指定的存儲桶不存在。解決問題後,您可以再次將匯出工作附加至串流。

    • Canceled。 因為已刪除串流或匯出定義,或任務的 time-to-live (TTL) 期間到期,因此作業已停止。

    注意

    工作的狀態也可能為InProgressWarning。當事件傳回不會影響工作執行的錯誤時,串流管理員會發出警告。例如,如果清除部分上傳失敗,就會傳回警告。

  3. 匯出工作完成後,Greengrass 元件可以刪除對應的輸入檔案。

下列範例顯示 Greengrass 元件可能如何讀取和處理狀態訊息。

Python
import time from stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 參考資料:讀取訊息 | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 開發套件參考資料:readMessages | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require(*'aws-greengrass-stream-manager-sdk'*); const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 開發套件參考資料:readMessages | StatusMessage