本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
批处理 HTTP 操作消息
您可以使用批处理在单个请求中发送多个 HTTP 操作消息。
概述
批处理使您可以将消息从 AWS IoT Core 规则引擎批量发送到您的 HTTP 终端节点。此功能可通过降低 HTTP 操作执行次数来帮助您降低成本,并通过减少与建立新连接相关的开销来提高效率。
注意
批处理的 HTTP 操作按单个操作计量。根据 AWS IoT Core 规则引擎向下游服务发射的出站批处理有效载荷的大小,以 5 KiB 为增量进行计量。有关更多信息,请参阅 AWS IoT Core 价格页面
当您在物联网规则操作的定义中启用批处理时,将可以使用以下参数进行配置:
maxBatchOpenMs-
传出消息等待其他消息创建批次的最长时间(以毫秒为单位)。设置越高,批处理 HTTP 操作的延迟时间就越长。
最小值:5 毫秒。最大值:200 毫秒。
默认值:20 毫秒
支持替换模板:否
maxBatchSize-
在单个 IoT 规则操作执行中批处理的最大消息数。
最小值:2 条消息。最大值:10 条消息
默认值:10 条消息
支持替换模板:否
maxBatchSizeBytes-
消息批处理的最大大小,以字节为单位。
最小值:100 字节。最大值:131,072 字节
默认值:5120 字节
支持替换模板:否
batchAcrossTopics-
是否允许将来自不同 MQTT 主题的消息批处理到单个 HTTP 请求中。默认情况下,只有来自同一主题的消息才会批量处理。对于来自多个设备主题的消息发往同一 HTTP 端点的路由用例,启用此参数。
默认值:假
支持替换模板:否
重要
当您指定多个批处理参数时,批处理将在达到第一个限制时完成。例如,如果您将 100 毫秒指定为最大批处理打开时间,将 5 KiB 指定为最大批处理大小,而规则引擎在 100 毫秒内仅批处理 2 KiB,则将创建并发送 2 KiB 的批处理。
注意
消息始终在相同的账户、规则名称、目标 HTTP 终端节点 URL 和账单组的范围内进行批处理。无论batchAcrossTopics设置如何,在这些属性中存在差异的消息都不会合并到同一个批次中。
批量使用 HTTP 标头
当您在 HTTP 操作中使用标头时,批处理请求会使用最后一条添加到批处理中的消息(不一定是您发布的最后一条消息)中的标头值。我们建议使用以下任一标题值:
-
批次中的所有消息都相同
-
适用于所有消息(例如,身份验证凭据)
标头与 HTTP 请求一起发送,不是消息正文的一部分。
注意
启用批处理后:
批处理请求会自动包含
Content-Type: application/json标头,因为批处理是作为 JSON 数组发送的。我们无法保证批次中的最后一条消息是您发布的最后一条消息。这是进入批次的最后一条消息。
有效载荷示例
以下示例显示了发送到您的 HTTP 终端节点的批量消息负载的结构:
[ { "user_id": "user1", "steps_today": 1000 }, { "user_id": "user2", "steps_today": 21000 }, { "user_id": "user8", "steps_today": 1500 }, ... ]
限制
以下是批处理的限制:
AWS IoT Core 不能保证消息的整体顺序。批处理是在每台主机上本地执行的,这可能会导致批处理中的消息的顺序与接收顺序不同。
AWS IoT Core 不在接收方提供消息处理支持。您有责任确保将下游服务配置为批量接受和处理数据。
Cross-account 不支持批处理,即使消息的目的地是相同的资源标识符(HTTP URL 或资源 ARN)。
AWS IoT Core 不能保证批量大小符合您指定的配置。根据时间和消息流,批次可能会小于您配置的限制。
启用批处理后,不支持二进制负载(非UTF-8 数据)。仅接受 UTF-8 文本负载(例如 JSON)。要发送二进制数据,请先对其进行base64编码,然后再将其发送到 HTTP 操作,然后在接收端点对其进行解码。例如,您可以使用物联网规则中的编码功能对二进制有效负载进行编码。或者,您可以对物联网设备中的二进制有效负载进行编码并将其发布到。 AWS IoT Core
批处理的错误操作
您将无法在错误操作定义中定义单独的批处理逻辑。但是,如果您在主操作中定义了批处理逻辑,则您的错误操作将支持批处理。
当批量请求失败时, AWS IoT Core 规则引擎将遵循 HTTP 操作重试逻辑。在最后一次重试尝试之后,将对整个失败的批次调用错误操作。
以下是启用批处理时的错误操作消息示例:
{ "ruleName": "FailedTopicRule", "topic": "topic/rulesengine", "payloadsWithMetadata": [ { "id": 1, "cloudwatchTraceId": "bebd6d93-6d4a-899e-9e40-56e82252d2be", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" }, { "id": 2, "cloudwatchTraceId": "af94d3b8-0b18-1dbf-2c7d-513f5cb9e2e1", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" }, { "id": 3, "cloudwatchTraceId": "ca441266-c2ce-c916-6aee-b9e5c7831675", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" } ], "failures": [ { "affectedIds": [ 1, 2, 3 ], "failedAction": "HttpAction", "failedResource": "https://example.foobar.com/HttpAction", "errorMessage": "HttpAction failed to make a request to the specified endpoint. StatusCode: 500. Reason: Internal Server Error." }, { "affectedIds": [ 3 ], "failedAction": "S3Action", "failedResource": "amzn-s3-demo-bucket", "errorMessage": "Failed to put S3 object. The error received was The specified bucket does not exist" }, { "affectedIds": [ 3 ], "failedAction": "LambdaAction", "failedResource": "arn:aws:lambda:us-west-2:123456789012:function:dummy", "errorMessage": "Failed to invoke lambda function. Received Server error from Lambda. The error code is 403" } ] }
启用后batchAcrossTopics,错误操作负载格式会发生变化。该topic字段从顶层移动到每个payloadsWithMetadata条目的内部:
{ "ruleName": "FailedTopicRule", "payloadsWithMetadata": [ { "id": 1, "topic": "topic/sensor1", "cloudwatchTraceId": "bebd6d93-6d4a-899e-9e40-56e82252d2be", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" }, { "id": 2, "topic": "topic/sensor2", "cloudwatchTraceId": "af94d3b8-0b18-1dbf-2c7d-513f5cb9e2e1", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" } ], "failures": [ { "affectedIds": [1, 2], "failedAction": "HttpAction", "failedResource": "https://example.foobar.com/HttpAction", "errorMessage": "HttpAction failed to make a request to the specified endpoint. StatusCode: 500. Reason: Internal Server Error." } ] }
注意
批量操作失败还会生成更大的错误操作有效负载,这可能会增加由于大小而导致错误操作失败的概率。您可以使用该ErrorActionFailure指标监控错误操作失败。请参阅规则操作指标了解更多信息。
使用批处理 HTTP 操作消息 AWS CLI
使用批处理创建或更新规则操作
-
使用相应的 AWS CLI 命令创建或更新规则:
-
要创建新规则,请使用 create-topic-rule 命令:
aws iot create-topic-rule --rule-namemyrule--topic-rule-payload file://myrule.json -
要更新现有规则,请使用 repl ace-topic-rule 命令:
aws iot replace-topic-rule --rule-namemyrule--topic-rule-payload file://myrule.json
-
-
通过在主题规则负载中将 enableBatching 参数设置为 true 来启用批处理功能:
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "http": { "url": "https://www.example.com/subpath", "confirmationUrl": "https://www.example.com", "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" } ], "enableBatching": true, "batchConfig": { "maxBatchOpenMs":100, "maxBatchSize":5, "maxBatchSizeBytes":1024, "batchAcrossTopics":true} } } ] } -
配置批处理参数。您无需指定所有批处理参数。您可以选择指定 1、2、3 或全部 4 个批处理参数。如果您未指定批处理参数,则规则引擎将使用默认值更新该参数。有关批处理参数及其默认值的更多信息,请参阅 HTTP 参数。