Kinesis Firehose - AWS IoT Greengrass

AWS IoT Greengrass Version 1 2023 年 6 月 30 日进入延长寿命阶段。有关更多信息,请参阅 AWS IoT Greengrass V1 维护策略。在此日期之后,将 AWS IoT Greengrass V1 不会发布提供功能、增强功能、错误修复或安全补丁的更新。在上面运行的设备 AWS IoT Greengrass V1 不会中断,将继续运行并连接到云端。我们强烈建议您迁移到 AWS IoT Greengrass Version 2,这样可以添加重要的新功能支持其他平台

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Firehose

Kinesis Firehose 连接器通过亚马逊数据 Firehose 传输流将数据发布到亚马逊 S3、亚马逊 Redshift 或亚马逊服务等目的地。 OpenSearch

此连接器是 Kinesis 传输流的数据创建器。它接收 MQTT 主题上的输入数据,并将这些数据发送到指定的传输流。然后,该传输流将数据记录发送到已配置的目标(例如,S3 存储桶)。

此连接器具有以下版本。

版本

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

有关版本更改的信息,请参阅更改日志

要求

此连接器具有以下要求:

Version 4 - 5
  • AWS IoT Greengrass 核心软件 v1.9.3 或更高版本。

  • Python 版本 3.7 或 3.8 已安装在核心设备上,并已添加到 PATH 环境变量中。

    注意

    要使用 Python 3.8,请运行以下命令来创建从默认 Python 3.7 安装文件夹到已安装的 Python 3.8 二进制文件的符号链接。

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    这会将设备配置为满足 AWS IoT Greengrass 的 Python 要求。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅亚马逊 Kinesis Firehose 开发者指南中的创建亚马逊数据 Firehose 传输流

  • Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecordfirehose:PutRecordBatch 操作,如以下示例 IAM policy 中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM policy 应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

Versions 2 - 3
  • AWS IoT Greengrass 核心软件 v1.7 或更高版本。

  • Python 版本 2.7 已安装在核心设备上,并已添加到 PATH 环境变量中。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅亚马逊 Kinesis Firehose 开发者指南中的创建亚马逊数据 Firehose 传输流

  • Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecordfirehose:PutRecordBatch 操作,如以下示例 IAM policy 中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM policy 应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

Version 1
  • AWS IoT Greengrass 核心软件 v1.7 或更高版本。

  • Python 版本 2.7 已安装在核心设备上,并已添加到 PATH 环境变量中。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅亚马逊 Kinesis Firehose 开发者指南中的创建亚马逊数据 Firehose 传输流

  • Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecord 操作,如以下示例 IAM policy 中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM policy 应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

连接器参数

该连接器提供以下参数:

Versions 5
DefaultDeliveryStreamArn

要向其发送数据的默认 Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅要求

AWS IoT 控制台中的显示名称:默认传送流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

AWS IoT 控制台中的显示名称:要缓冲的最大记录数(每个流)

必需:true

类型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

AWS IoT 控制台中的显示名称:内存大小

必需:true

类型:string

有效模式:^[0-9]+$

PublishInterval

将记录发布到 Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

AWS IoT 控制台中的显示名称:发布间隔

必需:true

类型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

此连接器的容器化模式。默认值为GreengrassContainer,这意味着连接器在 AWS IoT Greengrass 容器内的隔离运行时环境中运行。

注意

组的默认容器化设置不适用于连接器。

AWS IoT 控制台中的显示名称:容器隔离模式

必需:false

类型:string

有效值:GreengrassContainerNoContainer

有效模式:^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

要向其发送数据的默认 Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅要求

AWS IoT 控制台中的显示名称:默认传送流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

AWS IoT 控制台中的显示名称:要缓冲的最大记录数(每个流)

必需:true

类型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

AWS IoT 控制台中的显示名称:内存大小

必需:true

类型:string

有效模式:^[0-9]+$

PublishInterval

将记录发布到 Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

AWS IoT 控制台中的显示名称:发布间隔

必需:true

类型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

要向其发送数据的默认 Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅要求

AWS IoT 控制台中的显示名称:默认传送流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

创建连接器示例 (AWS CLI)

以下 CLI 命令将创建一个 ConnectorDefinition,其初始版本包含该连接器。

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

在 AWS IoT Greengrass 控制台中,您可以从群组的 “连接器” 页面添加连接器。有关更多信息,请参阅Greengrass 连接器入门(控制台)

输入数据

此连接器接受 MQTT 主题上的流内容,然后将这些内容发送到目标传输流。它接受两种类型的输入数据:

  • JSON 数据,位于 kinesisfirehose/message 主题中。

  • 二进制数据,位于 kinesisfirehose/message/binary/# 主题中。

Versions 2 - 5
主题筛选条件kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需:true

类型:包含以下属性的 object

data

要发送到传输流的数据。

必需:true

类型:string

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需:false

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需:false

类型:string

有效模式:.*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。

Version 1
主题筛选条件kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需:true

类型:包含以下属性的 object

data

要发送到传输流的数据。

必需:true

类型:string

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需:false

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需:false

类型:string

有效模式:.*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。

输出数据

此连接器将状态信息发布为 MQTT 主题的输出数据。

Versions 2 - 5
订阅中的主题筛选条件

kinesisfirehose/message/status

示例输出

响应将包含批次中发送的每条数据记录的状态。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
注意

如果连接器检测到可重试的错误(如连接错误),它会在下一批次中重试发布。指数退避由 SDK 处理。 AWS 失败并带可重试的错误的请求将添加回队列的结尾以供进一步发布。

Version 1
订阅中的主题筛选条件

kinesisfirehose/message/status

示例输出:成功
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
示例输出:失败
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

用法示例

使用以下概括步骤设置可用于尝试连接器的示例 Python 3.7 Lambda 函数。

注意
  1. 确保满足连接器的要求

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

  2. 创建并发布将输入数据发送到连接器的 Lambda 函数。

    示例代码保存为 PY 文件。下载并解压适用于 Python 的 AWS IoT Greengrass Core 软件开发工具包。然后,创建一个 zip 包,其中在根级别包含 PY 文件和 greengrasssdk 文件夹。此 zip 包是您上传到 AWS Lambda 的部署包。

    创建 Python 3.7 Lambda 函数后,请发布函数版本并创建别名。

  3. 配置 Greengrass 组。

    1. 通过别名来添加 Lambda 函数(推荐)。将 Lambda 生命周期配置为长时间生存(或在 CLI 中设置为 "Pinned": true)。

    2. 添加连接器并配置其参数

    3. 添加允许连接器接收 JSON 输入数据并针对支持的主题筛选条件发送输出数据的订阅。

      • 将 Lambda 函数设置为源,将连接器设置为目标,并使用支持的输入主题筛选条件。

      • 将连接器设置为源,将 AWS IoT Core 设置为目标,并使用支持的输出主题筛选条件。您可以使用此订阅在 AWS IoT 控制台中查看状态消息。

  4. 部署组。

  5. 在 AWS IoT 控制台的 “测试” 页面上,订阅输出数据主题以查看来自连接器的状态消息。示例 Lambda 函数是长时间生存的,并且在部署组后立即开始发送消息。

    完成测试后,您可以将 Lambda 生命周期设置为按需(或在 CLI 中设置为 "Pinned": false)并部署组。这会阻止函数发送消息。

示例

以下示例 Lambda 函数向连接器发送一条输入消息。此消息包含 JSON 数据。

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

许可证

Kinesis Firehose 连接器包含以下第三方软件/许可:

该连接器在 Greengrass Core 软件许可协议下发布。

更改日志

下表介绍每个版本连接器的更改。

版本

更改

5

增加了用于配置连接器容器化模式的 IsolationMode 参数。

4

已将 Lambda 运行时升级到 Python 3.7,这会更改运行时要求。

3

修复以减少过多的日志记录,并修复了其他较小的错误。

2

增加了按指定间隔向 Firehose 发送批量数据记录的支持。

  • 还需要在组角色中执行 firehose:PutRecordBatch 操作。

  • 新的 MemorySizeDeliveryStreamQueueSizePublishInterval 参数。

  • 输入消息包含一组已发布的数据记录的状态响应。

1

首次发布。

Greengrass 组在一个时间上只能包含一个版本的连接器。有关升级连接器版本的信息,请参阅升级连接器版本

另请参阅