批处理 HTTP 操作消息 - AWS IoT Core

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

批处理 HTTP 操作消息

您可以使用批处理在单个请求中发送多个 HTTP 操作消息。

概述

批处理使您可以将消息从 AWS IoT Core 规则引擎批量发送到您的 HTTP 终端节点。此功能可通过降低 HTTP 操作执行次数来帮助您降低成本,并通过减少与建立新连接相关的开销来提高效率。

注意

批处理的 HTTP 操作按单个操作计量。根据 AWS IoT Core 规则引擎向下游服务发射的出站批处理有效载荷的大小,以 5 KiB 为增量进行计量。有关更多信息,请参阅 AWS IoT Core 价格页面

当您在物联网规则操作的定义中启用批处理时,将可以使用以下参数进行配置:

maxBatchOpenMs

传出消息等待其他消息创建批次的最长时间(以毫秒为单位)。设置越高,批处理 HTTP 操作的延迟时间就越长。

最小值:5 毫秒。最大值:200 毫秒。

默认值:20 毫秒

支持替换模板:否

maxBatchSize

在单个 IoT 规则操作执行中批处理的最大消息数。

最小值:2 条消息。最大值:10 条消息

默认值:10 条消息

支持替换模板:否

maxBatchSizeBytes

消息批处理的最大大小,以字节为单位。

最小值:100 字节。最大值:131,072 字节

默认值:5120 字节

支持替换模板:否

重要

当您指定多个批处理参数时,批处理将在达到第一个限制时完成。例如,如果您将 100 毫秒指定为最大批处理打开时间,将 5 KiB 指定为最大批处理大小,而规则引擎在 100 毫秒内仅批处理 2 KiB,则将创建并发送 2 KiB 的批处理。

批量使用 HTTP 标头

当您在 HTTP 操作中使用标头时,批处理请求会使用添加到批处理的最后一条消息(不一定是您发布的最后一条消息)中的标头值。我们建议使用以下任一标题值:

  • 批次中的所有消息都相同

  • 适用于所有消息(例如,身份验证凭据)

标头与 HTTP 请求一起发送,不是消息正文的一部分。

注意

启用批处理后:

  • 批处理请求会自动包含Content-Type: application/json标头,因为批处理是作为 JSON 数组发送的。

  • 我们无法保证批次中的最后一条消息是您发布的最后一条消息。这是进入批次的最后一条消息。

有效载荷示例

以下示例显示了发送到您的 HTTP 终端节点的批量消息负载的结构:

[ { "user_id": "user1", "steps_today": 1000 }, { "user_id": "user2", "steps_today": 21000 }, { "user_id": "user8", "steps_today": 1500 }, ... ]

限制

以下是批处理的限制:

  • AWS IoT Core 不能保证消息的整体顺序。批处理是在每台主机上本地执行的,这可能会导致批处理中的消息的顺序与接收顺序不同。

  • AWS IoT Core 不在接收方提供消息处理支持。您有责任确保将下游服务配置为批量接受和处理数据。

  • 不支持跨账户批处理,即使消息的目的地是相同的资源标识符(HTTP URL 或资源 ARN)。

  • AWS IoT Core 不能保证批量大小符合您指定的配置。根据时间和消息流,批次可能会小于您配置的限制。

  • 启用批处理后,不支持二进制负载(非 UTF-8 数据)。只接受 UTF-8 文本负载(例如 JSON)。要发送二进制数据,请先对其进行base64编码,然后再将其发送到 HTTP 操作,然后在接收端点对其进行解码。例如,您可以使用物联网规则中的编码功能对二进制有效负载进行编码。或者,您可以对物联网设备中的二进制有效负载进行编码并将其发布到。 AWS IoT Core

批处理的错误操作

您将无法在错误操作定义中定义单独的批处理逻辑。但是,如果您在主操作中定义了批处理逻辑,则您的错误操作将支持批处理。

当批量请求失败时, AWS IoT Core 规则引擎将遵循 HTTP 操作重试逻辑。在最后一次重试尝试之后,将对整个失败的批次调用错误操作。

以下是启用批处理时出现的错误操作消息的示例:

{ "ruleName": "FailedTopicRule", "topic": "topic/rulesengine", "payloadsWithMetadata": [ { "id": 1, "cloudwatchTraceId": "bebd6d93-6d4a-899e-9e40-56e82252d2be", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" }, { "id": 2, "cloudwatchTraceId": "af94d3b8-0b18-1dbf-2c7d-513f5cb9e2e1", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" }, { "id": 3, "cloudwatchTraceId": "ca441266-c2ce-c916-6aee-b9e5c7831675", "clientId": "Test", "sourceIp": "10.0.0.0", "base64OriginalPayload": "eyJ1c2VyX2lkIjogInVzZXI1NjQ3IiwgInN0ZXBzX3RvZGF5IjogMTMzNjUsICJ0aW1lc3RhbXAiOiAiMjAyNS0xMC0wOVQwNzoyMjo1OC45ODQ3OTAxNzZaIn0=" } ], "failures": [ { "affectedIds": [ 1, 2, 3 ], "failedAction": "HttpAction", "failedResource": "https://example.foobar.com/HttpAction", "errorMessage": "HttpAction failed to make a request to the specified endpoint. StatusCode: 500. Reason: Internal Server Error." }, { "affectedIds": [ 3 ], "failedAction": "S3Action", "failedResource": "amzn-s3-demo-bucket", "errorMessage": "Failed to put S3 object. The error received was The specified bucket does not exist" }, { "affectedIds": [ 3 ], "failedAction": "LambdaAction", "failedResource": "arn:aws:lambda:us-west-2:123456789012:function:dummy", "errorMessage": "Failed to invoke lambda function. Received Server error from Lambda. The error code is 403" } ] }
注意

批量操作失败还会生成更大的错误操作有效负载,这可能会增加由于大小而导致错误操作失败的概率。您可以使用该ErrorActionFailure指标监控错误操作失败。请参阅规则操作指标了解更多信息。

使用批处理 HTTP 操作消息 AWS CLI

使用批处理创建或更新规则操作

  1. 使用相应的 AWS CLI 命令创建或更新规则:

    • 要创建新规则,请使用以下create-topic-rule命令:

      aws iot create-topic-rule --rule-name myrule --topic-rule-payload file://myrule.json
    • 要更新现有规则,请使用以下replace-topic-rule命令:

      aws iot replace-topic-rule --rule-name myrule --topic-rule-payload file://myrule.json
  2. 通过在主题规则负载中将 enableBatching 参数设置为 true 来启用批处理功能:

    { "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "http": { "url": "https://www.example.com/subpath", "confirmationUrl": "https://www.example.com", "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" } ], "enableBatching": true, "batchConfig": { "maxBatchOpenMs": 100, "maxBatchSize": 5, "maxBatchSizeBytes": 1024 } } } ] }
  3. 配置批处理参数。您无需指定所有批处理参数。您可以选择指定 1、2 或全部 3 个批处理参数。如果您未指定批处理参数,则规则引擎将使用默认值更新该参数。有关批处理参数及其默认值的更多信息,请参阅 HTTP 参数