將資料串流匯出至AWS 雲端(console) - AWS IoT Greengrass

AWS IoT Greengrass Version 1 於 2023 年 6 月 30 日進入延長使用壽命階段。如需詳細資訊,請參閱AWS IoT Greengrass V1 維護政策。在此日期之後, AWS IoT Greengrass V1 將不會發行提供功能、增強功能、錯誤修正或安全性修補程式的更新。在上運行的設備 AWS IoT Greengrass V1 不會中斷,並將繼續運行並連接到雲。我們強烈建議您移轉至 AWS IoT Greengrass Version 2,這會增加重要的新功能,並支援其他平台

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將資料串流匯出至AWS 雲端(console)

本教學課程將告訴您如何使用AWS IoT用於配置和部署AWS IoT Greengrass啟用串流管理員的群組。群組包含使用者定義的 Lambda 函數,可在串流管理員中寫入串流,然後自動匯出至AWS 雲端。

串流管理員可讓擷取、處理和匯出大量資料串流變得更有效率且可靠。在此教學中,您會建立TransferStream使用 IoT 資料的 Lambda 函數。Lambda 函數使用AWS IoT GreengrassCore SDK 在串流管理員中建立一個串流,然後對其進行讀取和寫入。串流管理員然後會將串流匯出至 Kinesis Data Streams。下圖顯示此工作流程。

串流管理工作流程圖

本教學課程的重點是說明使用者定義的 Lambda 函數如何使用StreamManagerClient在中的物件AWS IoT GreengrassCore 開發套件與串流管理員交動。為求簡化,您在本教學課程中建立的 Python Lambda 函數會產生模擬的裝置資料。

先決條件

為完成此教學課程您需要:

  • Greengrass 群組和 Greengrass 核心 (1.10 版或更新版本)。如需如何建立 Greengrass Core 群組和核心的資訊,請參閱開始使用 AWS IoT Greengrass。入門教學課程也包含 AWS IoT Greengrass Core 軟體的安裝步驟。

    注意

    不支援串流管理員 OpenWrt 分佈。

  • 在核心裝置上安裝 Java 8 執行時間 (JDK 8)。

    • 針對以 Debian 為基礎的發行版本 (包括 Raspbian) 或以 Ubuntu 為基礎的發行版本,請執行下列命令:

      sudo apt install openjdk-8-jdk
    • 針對以 Red Hat 為基礎的發行版本 (包括 Amazon Linux),請執行下列命令:

      sudo yum install java-1.8.0-openjdk

      如需詳細資訊,請參閱 OpenJDK 文件上的如何下載和安裝預先建置的 OpenJDK 套件

  • AWS IoT Greengrass適用於 Python 1.5.0 或更新版本。使用StreamManagerClient中的AWS IoT Greengrass核心開發套件,您必須:

    • 在核心裝置上安裝 Python 3.7 或更新版本。

    • 在 Lambda 函數部署套件中包含 SDK 及其相依性。本教學課程提供相關指示。

    提示

    您可以搭配使用 StreamManagerClient 與 Java 或 NodeJS。如需範例程式碼,請參閱AWS IoT Greengrass核心 SDK for JavaAWS IoT Greengrass適用於 Node.js 的核心 SDK上 GitHub。

  • 名為的目的地資料流MyKinesisStream在 Amazon Kinesis Data Streams 中建立AWS 區域作為 Greengrass 群組。如需詳細資訊,請參閱「」建立串流中的Amazon Kinesis 開發人員指南

    注意

    在本教學課程中,串流管理員會將資料匯出至 Kinesis 資料串流,對您的AWS 帳戶。如需定價的資訊,請參閱Kinesis Data Streams 定價

    若要避免產生費用,您可以執行本教學課程而不建立 Kinesis 資料串流。在此情況下,您可以檢查日誌以查看串流管理員嘗試將串流匯出至 Kinesis Data Streams。

  • 將 IAM 政策新增至Greengrass 群組角色這允許kinesis:PutRecords針對目標資料串流執行動作,如下列範例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

本教學課程所述以下高階執行步驟:

此教學課程需約 20 分鐘完成。

步驟 1:建立 Lambda 函數部署套件

在此步驟中,您將建立包含 Python 函數程式碼和相依性的 Lambda 函數部署套件。您稍後會在建立套件Lambda 函數AWS Lambda。Lambda 函數使用AWS IoT GreengrassCore SDK 可建立本機串流並與其互動。

注意

您使用者定義的 Lambda 函數必須使用AWS IoT Greengrass核心開發套件與串流管理員交動。如需有關 Greengrass 串流管理員需求的詳細資訊,請參閱 Greengrass 串流管理員需求

  1. 下載AWS IoT Greengrass核心 SDKv1.5.0 或更新版本。

  2. 解壓縮下載的封裝,以取得軟體開發套件。SDK 為 greengrasssdk 資料夾。

  3. 安裝套件相依性,以包含 Lambda 函數部署套件中的軟體開發套件。

    1. 前往包含 requirements.txt 檔案的軟體開發套件目錄。這個檔案會列出相依性。

    2. 安裝軟體開發套件相依性。例如,執行下列 pip 命令,將它們安裝在目前的目錄中:

      pip install --target . -r requirements.txt
  4. 將以下 Python 程式碼函數儲存在名為 transfer_stream.py 的本機檔案中。

    提示

    如需使用 Java 和 NodeJS 的範例程式碼,請參閱AWS IoT Greengrass核心 SDK for JavaAWS IoT Greengrass適用於 Node.js 的核心 SDK上 GitHub。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 將下列項目壓縮成名為 transfer_stream_python.zip 的檔案。這是您的 Lambda 函數部署套件。

    • transfer_stream.py。應用程式邏輯。

    • greengrasssdk。適用於發佈 MQTT 訊息之 Python Greengrass Lambda 函數所需的程式庫。

      串流管理員作業在 1.5.0 版或更新版本中提供AWS IoT Greengrass適用於 Python 的核心開發套件

    • 您為安裝的相依檔案AWS IoT Greengrass適用於 Python 的 Core 開發套件 (例如,cbor2目錄)。

    當您建立 zip 檔案時,請只包含這些項目,而非包含的資料夾。

步驟 2:建立 Lambda 函數

在此步驟中,您會使用AWS Lambda主控台,可建立 Lambda 函數並設定其使用您的部署套件。然後,您會發佈函數版本和建立別名。

  1. 首先,建立 Lambda 函數。

    1. 於 AWS Management Console,請選擇服務,開啟 AWS Lambda 主控台。

    2. 選擇CREATE FUNCTION然後選擇Author from scratch (從頭開始撰寫)

    3. Basic information (基本資訊) 區段中,使用下列值:

      • 針對 Function name (函數名稱),請輸入 TransferStream

      • 針對 Runtime (執行時間),選擇 Python 3.7

      • 適用於許可,請保留預設設定。這會建立授與基本 Lambda 權限的執行角色。此角色不為所用AWS IoT Greengrass。

    4. 請在頁面最下方選擇CREATE FUNCTION

  2. 接下來,註冊處理常式並上傳您的 Lambda 函數部署套件。

    1. 在「」程式碼索引標籤下代碼來源,選擇上傳來源。從下拉式選單中選擇.zip 檔案

      突出顯示 .zip 文件的從下拉列表中上傳。
    2. 選擇上傳,然後選擇您的transfer_stream_python.zip部署套件。然後選擇 Save (儲存)

    3. 在「」程式碼功能的標籤,在執行時間設定,選擇Edit (編輯),然後輸入下列值。

      • 針對 Runtime (執行時間),選擇 Python 3.7

      • 對於 Handler (處理常式),輸入 transfer_stream.function_handler

    4. 選擇 Save (儲存)。

      注意

      所以此測試按鈕AWS Lambda主控台不使用此函數。所以此AWS IoT Greengrass核心 SDK 不包含在中獨立執行您的 Greengrass Lambda 函數所需的模組AWS Lambda主控台。這些模塊(例如,greengrass_common) 會在函式部署到 Greengrass 核心後提供給函式。

  3. 現在,發佈您的 Lambda 函數的第一個版本並建立版本的別名

    注意

    Greengrass 組可以通過別名(推薦)或版本引用 Lambda 函數。使用別名可讓您更輕鬆地管理程式碼更新,因為當函數程式碼更新時,您不需要變更訂閱資料表或群組定義。相反,您只需將別名指向新函數版本即可。

    1. 請從操作功能表中選擇發行新版本

    2. 針對 Version description (版本描述),輸入 First version,然後選擇 Publish (發佈)

    3. 在「」TransferStream:1組態頁面,從動作功能表中,選擇建立別名

    4. 建立警示頁面上使用下列值:

      • 對於 Name (名稱),輸入 GG_TransferStream

      • 對於 Version (版本),選擇 1

      注意

      AWS IoT Greengrass不支援 Lambda 函數$LATE版本。

    5. 選擇 Create (建立)。

現在,您可以開始將 Lambda 函數新增至您的 Greengrass 群組。

步驟 3:將 Lambda 函數新增至 Greengrass 群組

在此步驟中,您將 Lambda 函數新增至群組,然後設定其生命週期和環境變數。如需詳細資訊,請參閱 使用群組特定組態來控制 Greengrass Lambda 函數的執行

  1. 在 中AWS IoT主控台導覽窗格, 下Manage (管理),展開Greengrass 裝置,然後選擇群組 (V1)

  2. 選擇目標群組。

  3. 在群組組態頁面上,選擇Lambda 函數索引標籤。

  4. UNDING我 Lambda 函數,選擇Add

  5. 在「」新增 Lambda 函數頁面中,選擇Lambda 函數為您的 Lambda 函數。

  6. 對於Lambda 版本,選擇別名:GG _TransferStream

    現在,設定決定 Greengrass 群組中 Lambda 函數行為的屬性。

  7. 在 中Lambda 函數組態區段中,進行下列變更:

    • Memory limit (記憶體限制) 設為 32 MB。

    • 適用於Pinned,選擇True

    注意

    一個長期(或釘住)Lambda 函數自動啟動AWS IoT Greengrass啟動並持續在其自己的容器中執行。這與一個形成鮮明對比隨需Lambda 函數,在沒有需要執行工作時,其被呼叫和停用會在沒有要執行的任務時停止。如需詳細資訊,請參閱 Greengrass Lambda 函數的生命週期組態

  8. 選擇新增 Lambda 函數

步驟 4:啟用串流管理員

在此步驟中,您將確定串流管理員已啟用。

  1. 在群組組態頁面上,選擇Lambda 函數索引標籤。

  2. UNDING系統 Lambda 函數,SELECT串流管理員,然後檢查狀態。如果是已停用,請選擇 Edit (編輯)。然後,選擇 Enable (啟用)Save (儲存)。您可以使用此教學課程的預設參數設定。如需詳細資訊,請參閱 設定 AWS IoT Greengrass 串流管理員

注意

在您使用主控台啟用串流管理員並部署群組時,串流管理員的記憶體大小預設為 4194304 KB (4 GB)。建議您將記憶體大小設定為至少 128000 KB。

步驟 5:設定本機記錄

在此步驟中,您要設定AWS IoT Greengrass系統元件、使用者定義的 Lambda 函數和連接器,可將記錄寫入核心裝置的檔案系統。您可以使用記錄檔針對可能遇到的任何問題進行疑難排解。如需詳細資訊,請參閱 使用 AWS IoT Greengrass 日誌進行監控

  1. Local logs configuration (本機日誌組態) 下,檢查是否已設定本機記錄。

  2. 如果未對 Greengrass 系統元件或使用者定義的 Lambda 函數設定日誌,請選擇Edit (編輯)

  3. 選擇使用者 Lambda 函數記錄層級Greengrass 系統日誌層級

  4. 保留記錄層級和磁碟空間限制的預設值,然後選擇 Save (儲存)

步驟 6:部署 Greengrass 群組

將群組部署到核心裝置。

  1. 請確定AWS IoT Greengrass核心正在執行。如果需要,請在您的 Raspberry Pi 終端機執行以下命令。

    1. 檢查精靈是否有在運作:

      ps aux | grep -E 'greengrass.*daemon'

      若輸出的 root 含有 /greengrass/ggc/packages/ggc-version/bin/daemon 項目,則精靈有在運作。

      注意

      路徑的版本取決於安裝在您的核心裝置中的 AWS IoT Greengrass 核心軟體版本。

    2. 啟動協助程式:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 在群組組態頁面上,選擇部署

    1. 在 中Lambda 函數標籤的系統 Lambda 函數區段中,選取IP 偵測器並選擇Edit (編輯)

    2. 在 中編輯 IP 偵測器設定對話方塊中,選取自動偵測並覆寫 MQTT 代理程式端點

    3. 選擇 Save (儲存)。

      這可讓裝置自動取得核心的連接資訊,例如 IP 位址、DNS、連接埠編號。建議使用自動偵測,但是 AWS IoT Greengrass 也支援手動指定端點。只會在第一次部署群組時收到復原方法的提示。

      注意

      如果出現提示,請授予建立Greengrass 服務角色並將其與您的關聯AWS 帳戶在目前AWS 區域。此角色允許AWS IoT Greengrass存取您的資源AWS服務。

      部署頁面會顯示部署時間戳記、版本 ID 和狀態。部署完成時,針對部署顯示的狀態應為已完成

      如需故障診斷協助,請參閱AWS IoT Greengrass 疑難排解

步驟 7:測試應用程式。

所以此TransferStreamLambda 函數會產生模擬的裝置資料。它會將資料寫入串流管理員匯出至目標 Kinesis 資料串流的串流。

  1. 在 Amazon Kinesis 主控台下Kinesis Data Streams,選擇MyKinesisStream

    注意

    如果您在沒有目標 Kinesis 資料串流的情況下執行教學課程,檢查記錄檔對於流管理器(GGStreamManager。如果日誌檔案包含錯誤消息中的 export stream MyKinesisStream doesn't exist,則測試成功。此錯誤表示服務嘗試匯出至串流,但串流不存在。

  2. 在「」MyKinesisStream頁面,選擇監控。如果測試成功,您應該會看到 Put Records (Put 記錄) 圖表中的資料。視您的連線而定,可能需要一分鐘才會顯示資料。

    重要

    完成測試後,請刪除 Kinesis 資料串流以免產生更多費用。

    或者,執行下列命令來停止 Greengrass 協助程式。這樣可以防止核心傳送訊息,直到您準備好繼續測試為止。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 移除TransferStreamLambda 函數從核心。

    1. 在 中AWS IoT主控台導覽窗格, 下Manage (管理),展開Greengrass 裝置,然後選擇群組 (V1)

    2. UNDINGGreengrass 群組,選擇群組。

    3. 在「」Lambda頁面上,選擇省略號 (...) 為TransferStream函數,然後選擇移除功能

    4. Actions (動作) 中,選擇 Deploy (部署)

若要檢視記錄資訊或針對串流問題進行疑難排解,請檢查 TransferStreamGGStreamManager 函數的記錄。您必須具有在檔案系統上讀取 AWS IoT Greengrass 記錄檔的 root 權限。

  • TransferStream 會將日誌項目寫入 greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log

  • GGStreamManager 會將日誌項目寫入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果您需要更多疑難排解資訊,您可以設定記錄層級為了使用者 Lambda 日誌除錯記錄檔然後再部署群組。

另請參閱