Kinesis Firehose - AWS IoT Greengrass

AWS IoT Greengrass Version 1 於 2023 年 6 月 30 日進入延長使用壽命階段。如需詳細資訊,請參閱AWS IoT Greengrass V1 維護政策。在此日期之後, AWS IoT Greengrass V1 將不會發行提供功能、增強功能、錯誤修正或安全性修補程式的更新。在上運行的設備 AWS IoT Greengrass V1 不會中斷,並將繼續運行並連接到雲。我們強烈建議您移轉至 AWS IoT Greengrass Version 2,這會增加重要的新功能,並支援其他平台

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Kinesis Firehose

Kinesis Firehose 連接器會透過 Amazon 資料 Firehose 交付串流將資料發佈到 Amazon S3、亞馬 Amazon Redshift 或 Amazon 服務等目的地。 OpenSearch

此連接器是 Kinesis 傳送串流的資料生產者。它會接收 MQTT 主題上的輸入資料,然後將資料傳送到指定的交付串流。然後,交付串流會將資料記錄傳送到設定的目的地 (例如,S3 儲存貯體)。

此連接器具有下列版本。

版本

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

如需版本變更的詳細資訊,請參閱 Changelog

要求

此連接器有下列要求:

Version 4 - 5
  • AWS IoT Greengrass 核心軟體 v1.9.3 或更新版本。

  • Python 版本 3.7 或 3.8 安裝在核心設備上,並添加到 PATH 環境變量。

    注意

    要使用 Python 3.8,請運行以下命令,從默認 Python 3.7 安裝文件夾創建一個符號鏈接到已安裝的 Python 3.8 二進製文件。

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    這會設定您的裝置以符合 AWS IoT Greengrass 的 Python 需求。

  • 已設定的 Kinesis 傳送串流。如需詳細資訊,請參閱 Amazon Kinesis Firehose 開發人員指南中的建立 Amazon 資料火管交付串流。

  • 設定為允許目標交付串流上的firehose:PutRecordfirehose:PutRecordBatch動作的 Greengrass 群組角色,如下列範例 IAM 政策所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    此連接器可讓您動態覆寫輸入訊息承載中的預設交付串流。如果您的實施使用此功能,則 IAM 政策應包含所有目標串流作為資源。您可以為資源授予細微或條件式存取 (例如,使用萬用字元 * 命名配置)。

    針對群組角色要求,您必須設定角色以授與必要的許可,並確認已將角色新增至群組。如需詳細資訊,請參閱 管理 Greengrass 群組角色 (主控台)管理 Greengrass 群組角色 (CLI)

Versions 2 - 3
  • AWS IoT Greengrass 核心軟件 v1.7 或更高版本。

  • Python 版本 2.7 安裝在核心設備上,並添加到 PATH 環境變量中。

  • 已設定的 Kinesis 傳送串流。如需詳細資訊,請參閱 Amazon Kinesis Firehose 開發人員指南中的建立 Amazon 資料火管交付串流。

  • 設定為允許目標交付串流上的firehose:PutRecordfirehose:PutRecordBatch動作的 Greengrass 群組角色,如下列範例 IAM 政策所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    此連接器可讓您動態覆寫輸入訊息承載中的預設交付串流。如果您的實施使用此功能,則 IAM 政策應包含所有目標串流作為資源。您可以為資源授予細微或條件式存取 (例如,使用萬用字元 * 命名配置)。

    針對群組角色要求,您必須設定角色以授與必要的許可,並確認已將角色新增至群組。如需詳細資訊,請參閱 管理 Greengrass 群組角色 (主控台)管理 Greengrass 群組角色 (CLI)

Version 1
  • AWS IoT Greengrass 核心軟件 v1.7 或更高版本。

  • Python 版本 2.7 安裝在核心設備上,並添加到 PATH 環境變量中。

  • 已設定的 Kinesis 傳送串流。如需詳細資訊,請參閱 Amazon Kinesis Firehose 開發人員指南中的建立 Amazon 資料火管交付串流。

  • 設定為允許對目標交付串流firehose:PutRecord執行動作的 Greengrass 群組角色,如下列範例 IAM 政策所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    此連接器可讓您動態覆寫輸入訊息承載中的預設交付串流。如果您的實施使用此功能,則 IAM 政策應包含所有目標串流作為資源。您可以為資源授予細微或條件式存取 (例如,使用萬用字元 * 命名配置)。

    針對群組角色要求,您必須設定角色以授與必要的許可,並確認已將角色新增至群組。如需詳細資訊,請參閱 管理 Greengrass 群組角色 (主控台)管理 Greengrass 群組角色 (CLI)

連接器參數

此連接器提供下列參數:

Versions 5
DefaultDeliveryStreamArn

要將資料傳送至的預設 Firehose 傳送串流的 ARN。輸入訊息承載中的 delivery_stream_arn 屬性可以覆寫目的地串流。

注意

群組角色必須允許所有目標交付串流上的適當動作。如需詳細資訊,請參閱 要求

AWS IoT 主控台中的顯示名稱:預設傳遞串流 ARN

需要:true

類型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

同樣交付串流的新記錄遭拒絕前,記憶體中保留的記錄數上限。最小值為 2000。

AWS IoT 控制台中的顯示名稱:要緩衝的最大記錄數(每個流)

需要:true

類型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

配置給此連接器的記憶體數量 (KB)。

AWS IoT 控制台中的顯示名稱:內存大小

需要:true

類型:string

有效模式:^[0-9]+$

PublishInterval

將記錄發佈到 Firehose 的間隔時間 (以秒為單位)。若要停止批次處理,請將此數值設為 0。

AWS IoT 主控台中的顯示名稱:發佈間隔

需要:true

類型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

此連接器的容器化模式。預設值為GreengrassContainer,這表示連接器會在 AWS IoT Greengrass 容器內的隔離執行階段環境中執行。

注意

群組的預設容器化設定不會套用至連接器。

AWS IoT 主控台中的顯示名稱:容器隔離模式

需要:false

類型:string

有效值:GreengrassContainerNoContainer

有效模式:^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

要將資料傳送至的預設 Firehose 傳送串流的 ARN。輸入訊息承載中的 delivery_stream_arn 屬性可以覆寫目的地串流。

注意

群組角色必須允許所有目標交付串流上的適當動作。如需詳細資訊,請參閱 要求

AWS IoT 主控台中的顯示名稱:預設傳遞串流 ARN

需要:true

類型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

同樣交付串流的新記錄遭拒絕前,記憶體中保留的記錄數上限。最小值為 2000。

AWS IoT 控制台中的顯示名稱:要緩衝的最大記錄數(每個流)

需要:true

類型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

配置給此連接器的記憶體數量 (KB)。

AWS IoT 控制台中的顯示名稱:內存大小

需要:true

類型:string

有效模式:^[0-9]+$

PublishInterval

將記錄發佈到 Firehose 的間隔時間 (以秒為單位)。若要停止批次處理,請將此數值設為 0。

AWS IoT 主控台中的顯示名稱:發佈間隔

需要:true

類型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

要將資料傳送至的預設 Firehose 傳送串流的 ARN。輸入訊息承載中的 delivery_stream_arn 屬性可以覆寫目的地串流。

注意

群組角色必須允許所有目標交付串流上的適當動作。如需詳細資訊,請參閱 要求

AWS IoT 主控台中的顯示名稱:預設傳遞串流 ARN

需要:true

類型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

建立連接器範例 (AWS CLI)

下列 CLI 命令會ConnectorDefinition使用包含連接器的初始版本建立。

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

在 AWS IoT Greengrass 主控台中,您可以從群組的 [連接器] 頁面新增連接器。如需詳細資訊,請參閱 Greengrass 連接器入門 (主控台)

輸入資料

此連接器接受 MQTT 主題上的串流內容,然後將內容傳送到目標交付串流。它接受兩種輸入資料:

  • kinesisfirehose/message 主題上的 JSON 資料。

  • kinesisfirehose/message/binary/# 主題上的二進位資料。

Versions 2 - 5
主題篩選條件: kinesisfirehose/message

使用此主題傳送包含 JSON 資料的訊息。

訊息屬性
request

要傳送到交付串流和目標交付串流 (如果與預設串流不同) 的資料。

需要:true

類型:object包括以下屬性:

data

要傳送到交付串流的資料。

需要:true

類型:string

delivery_stream_arn

目標 Kinesis 傳送串流的 ARN。包含此屬性來覆寫預設的交付串流。

需要:false

類型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

請求的任意 ID。此屬性用於將輸入請求映射到輸出回應。當指定時,回應物件中的 id 屬性會設為這個值。如果您不使用此功能,您可以省略此屬性或指定空白字串。

需要:false

類型:string

有效模式:.*

範例輸入
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主題篩選條件: kinesisfirehose/message/binary/#

使用此主題傳送包含二進位資料的訊息。連接器不剖析二進位資料。資料會依現狀串流。

若要將輸入請求映射到輸出回應,請將訊息主題中的 # 萬用字元換成任意請求 ID。例如,如果您將訊息發佈到 kinesisfirehose/message/binary/request123,回應物件中的 id 屬性會設為 request123

如果您不想將請求映射到回應,您可以將訊息發佈到 kinesisfirehose/message/binary/。請務必包含結尾的斜線。

Version 1
主題篩選條件: kinesisfirehose/message

使用此主題傳送包含 JSON 資料的訊息。

訊息屬性
request

要傳送到交付串流和目標交付串流 (如果與預設串流不同) 的資料。

需要:true

類型:object包括以下屬性:

data

要傳送到交付串流的資料。

需要:true

類型:string

delivery_stream_arn

目標 Kinesis 傳送串流的 ARN。包含此屬性來覆寫預設的交付串流。

需要:false

類型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

請求的任意 ID。此屬性用於將輸入請求映射到輸出回應。當指定時,回應物件中的 id 屬性會設為這個值。如果您不使用此功能,您可以省略此屬性或指定空白字串。

需要:false

類型:string

有效模式:.*

範例輸入
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主題篩選條件: kinesisfirehose/message/binary/#

使用此主題傳送包含二進位資料的訊息。連接器不剖析二進位資料。資料會依現狀串流。

若要將輸入請求映射到輸出回應,請將訊息主題中的 # 萬用字元換成任意請求 ID。例如,如果您將訊息發佈到 kinesisfirehose/message/binary/request123,回應物件中的 id 屬性會設為 request123

如果您不想將請求映射到回應,您可以將訊息發佈到 kinesisfirehose/message/binary/。請務必包含結尾的斜線。

輸出資料

這個連接器會將狀態資訊發佈為輸出資料,且主題為 MQTT。

Versions 2 - 5
訂閱中的主題篩選條件

kinesisfirehose/message/status

範例輸出

回應中包含批次中所傳送之各筆資料記錄的狀態。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
注意

如果連接器偵測到可重試的錯誤 (例如,連線錯誤),它會在下一個批次中重試發佈。指數輪詢由 SDK 處理。 AWS 因可重試錯誤而失敗的請求會加回佇列末端,進一步發佈。

Version 1
訂閱中的主題篩選條件

kinesisfirehose/message/status

範例輸出:成功
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
範例輸出:失敗
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

用法示例

使用下列高階步驟來設定範例 Python 3.7 Lambda 函數,您可以使用此函數來試用連接器。

注意
  • 如果您使用其他 Python 運行時,則可以創建一個從 Python 3.x 到 Python 3.7 的符號鏈接。

  • 連接器入門 (主控台)連接器入門 (CLI) 主題包含詳細步驟,說明如何設定和部署範例 Twilio 通知連接器。

  1. 確定您符合連接器的要求

    針對群組角色要求,您必須設定角色以授與必要的許可,並確認已將角色新增至群組。如需詳細資訊,請參閱 管理 Greengrass 群組角色 (主控台)管理 Greengrass 群組角色 (CLI)

  2. 建立並發佈將輸入資料傳送至連接器的 Lambda 函數。

    範例程式碼儲存為 PY 檔案。下載並解壓縮適用於 Python 的AWS IoT Greengrass 核心 SDK。然後,建立在根層級包含 PY 檔案和 greengrasssdk 資料夾的 zip 套件。此 zip 套件是您上傳至的部署套件 AWS Lambda。

    建立 Python 3.7 Lambda 函數之後,請發佈函數版本並建立別名。

  3. 設定 Greengrass 群組。

    1. 依其別名新增 Lambda 函數 (建議使用)。將 Lambda 生命週期設定為長期存留期 (或"Pinned": true在 CLI 中)。

    2. 新增連接器並設定其參數

    3. 新增訂閱,允許連接器在支援主題篩選條件上接收 JSON 輸入資料並傳送輸出資料

      • 將 Lambda 函數設定為來源,將連接器設定為目標,並使用支援的輸入主題篩選器。

      • 將連接器設為來源、將 AWS IoT Core 設為目標,並使用支援的輸出主題篩選條件。您可以使用此訂閱在 AWS IoT 主控台中檢視狀態訊息。

  4. 部署群組。

  5. 在主 AWS IoT 控台的 [測試] 頁面上,訂閱輸出資料主題,以檢視來自連接器的狀態訊息。Lambda 函數的範例很長,並且會在部署群組後立即開始傳送訊息。

    完成測試後,您可以將 Lambda 生命週期設定為隨需 (或"Pinned": false在 CLI 中),然後部署群組。這會讓函數停止傳送訊息。

範例

下列範例 Lambda 函數會將輸入訊息傳送至連接器。此訊息包含 JSON 資料。

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

授權

Kinesis Firehose 連接器包含下列協力廠商軟體/授權:

此連接器是根據 Greengrass 核心軟體授權合約發行的。

變更記錄

下表說明每個版本連接器的變更。

版本

變更

5

已新增 IsolationMode 參數,以設定連接器的容器化模式。

4

將 Lambda 執行階段升級至 Python 3.7,這會變更執行階段需求。

3

修正以降低過多記錄,以及其他次要錯誤修正。

2

新增以指定時間間隔將批次處理資料記錄傳送至 Firehose 的支援。

  • 群組角色中也需要有 firehose:PutRecordBatch 動作。

  • 新的 MemorySizeDeliveryStreamQueueSizePublishInterval 參數。

  • 輸出訊息包含已發佈資料記錄的一系列狀態回應。

1

初始版本。

Greengrass 群組一次只能包含一個版本的連接器。若要取得有關升級連接器版本的資訊,請參閱升級連接器版本

另請參閱