將資料從 Amazon S3 擷取至 Timestream 以進行 InfluxDB 自動化 - Amazon Timestream

如需類似 Amazon Timestream for LiveAnalytics 的功能,請考慮使用 Amazon Timestream for InfluxDB。它提供簡化的資料擷取和單一位數毫秒查詢回應時間,以進行即時分析。在這裡進一步了解。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將資料從 Amazon S3 擷取至 Timestream 以進行 InfluxDB 自動化

Timestream for LiveAnalytics 匯出工具完成卸載程序後,自動化程序的下一個步驟就會開始。此自動化使用 InfluxDB 的匯入工具,將資料傳輸至其特殊的時間序列結構。程序會轉換 Timestream 的資料模型,以符合 InfluxDB 的測量、標籤和欄位概念。最後,它會使用 InfluxDB 的線路通訊協定有效率地載入資料。

完成遷移的工作流程分為四個階段:

  1. 使用 Timestream for LiveAnalytics 匯出工具卸載資料。

  2. 資料轉換:使用 Amazon Athena 將 LiveAnalytics 的 Timestream 資料轉換為 InfluxDB 行通訊協定格式 (根據基數評估之後定義的結構描述)。

  3. 資料擷取:將線路通訊協定資料集擷取至 Timestream for InfluxDB 執行個體。

  4. 驗證:或者,您可以驗證是否已擷取每個線路通訊協定點 (資料轉換步驟--add-validation-field true期間需要 )。

資料轉換

對於資料轉換,我們開發了指令碼,以使用 Amazon Athena 將 Timestream for LiveAnalytics 匯出的資料parquet 格式轉換為 InfluxDB 的 Line Protocol 格式。Amazon Athena 提供無伺服器查詢服務和經濟實惠的方式來轉換大量時間序列資料,而無需專用運算資源。

指令碼會執行以下操作:

  • 將匯出的 LiveAnalytics Timestream 從 Amazon S3 儲存貯體載入 Amazon Athena 資料表。

  • 執行資料映射,並將儲存在 Athena 資料表中的資料轉換為行通訊協定,並將其存放在 S3 儲存貯體中。

資料映射

下表顯示 Timestream for LiveAnalytics 資料如何對應至線路通訊協定資料。

LiveAnalytics 概念的 Timestream Line Protocol 概念

資料表名稱

測量

Dimensions (尺寸)

Tags (標籤)

測量名稱

標籤 (選用)

措施

欄位

Time (時間)

Timestamp

先決條件和安裝

請參閱轉換指令碼的 README 中的先決條件和安裝章節。

用途

若要從 example_database 中的 Timestream for LiveAnalytics 資料表 example_table 轉換存放在儲存貯體 example_s3_bucket 中的資料,請執行下列命令:

python3 transform.py \ --database-name example_database \ --tables example_table \ --s3-bucket-path example_s3_bucket \ --add-validation-field false

指令碼完成後,

  • 在 Athena 中,將會建立資料表 example_database_example_table,其中包含 LiveAnalytics 資料的 Timestream。

  • 在 Athena 中,將建立資料表 lp_example_database_example_table,其中包含轉換為行通訊協定點的 LiveAnalytics 時間串流資料。

  • 在 S3 儲存貯體 example_s3_bucket 中,在路徑 中 example_database/example_table/unload-<%Y-%m-%d-%H:%M:%S>/line-protocol-output,將儲存行通訊協定資料。

建議

如需指令碼的最新使用方式和輸出的詳細資訊,請參閱轉換指令碼的 README,以進行後續的遷移步驟,例如驗證。如果您排除維度以改善基數,請使用 --dimensions-to-fields引數將特定維度變更為欄位,藉此調整結構描述以減少基數。

新增欄位以進行驗證

如需有關如何新增欄位以進行驗證的資訊,請參閱轉換指令碼的 README 中的新增欄位以進行驗證一節。

資料擷取至 Timestream for InfluxDB

InfluxDB 擷取指令碼會將壓縮的線路通訊協定資料集擷取至 InfluxDB 的 Timestream。包含 gzip 壓縮行通訊協定檔案的目錄會以命令列引數形式與擷取目的地 InfluxDB 儲存貯體一起傳入。此指令碼旨在使用多處理一次擷取多個檔案,以利用資源搭配 InfluxDB 和執行指令碼的機器。

指令碼會執行下列動作:

  • 擷取壓縮的檔案並將其擷取至 InfluxDB。

  • 實作重試機制和錯誤處理。

  • 追蹤成功和失敗的擷取以繼續進行。

  • 從線路通訊協定資料集讀取時最佳化 I/O 操作。

先決條件和安裝

請參閱 GitHub 中擷取指令碼的 README 中的先決條件和安裝一節。

資料準備

擷取所需的壓縮行通訊協定檔案是由資料轉換指令碼產生。請依照下列步驟準備您的資料:

  1. 設定具有足夠儲存空間的 EC2 執行個體,以存放轉換後的資料集。

  2. 將轉換的資料從 S3 儲存貯體同步到本機目錄:

    aws s3 sync \ s3://your-bucket-name/path/to/transformed/data \ ./data_directory
  3. 請確定您擁有資料目錄中所有檔案的讀取存取權。

  4. 執行下列擷取指令碼,將資料擷取至 Timestream for InfluxDB。

用途

python influxdb_ingestion.py <bucket_name> <data_directory> [options]

基本用量

python influxdb_ingestion.py my_bucket ./data_files

擷取速率

我們已針對擷取率執行一些測試。使用 C5N.9XL EC2 執行個體執行具有 10 名工作者的擷取指令碼,以及將 ~500 GB 線路通訊協定擷取至 8XL Timestream for InfluxDB 執行個體的擷取測試:

  • 3K IOPS 15.86 GB/小時。

  • 12K IOPS 70.34 GB/小時。

  • 16K IOPS 71.28 GB/小時。

建議

  • 使用具有足夠 CPU 核心的 EC2 執行個體來處理平行處理。

  • 確保執行個體有足夠的儲存空間來保留整個轉換的資料集,並具有額外的擷取空間。

    • 一次擷取的檔案數目等於指令碼執行期間設定的工作者數目。

  • 將 EC2 執行個體放置在與 InfluxDB 執行個體相同的區域和可用區域 (如果可能),以將延遲降至最低。

  • 請考慮使用針對網路操作最佳化的執行個體類型,例如 C5N。

  • 如果需要高擷取率,則建議 Timestream for InfluxDB 執行個體至少使用 12K IOPS。透過增加取決於 Timestream for InfluxDB 執行個體大小的指令碼工作者計數,可以獲得其他最佳化。

如需詳細資訊,請參閱擷取指令碼的 README