

# 结合 Amazon MQ 使用 Lambda
<a name="with-mq"></a>

**注意**  
如果想要将数据发送到 Lambda 函数以外的目标，或要在发送数据之前丰富数据，请参阅 [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)（Amazon EventBridge 管道）。

Amazon MQ 是一项托管消息代理服务，用于 [Apache ActiveMQ](https://activemq.apache.org/) 和 [RabbitMQ](https://www.rabbitmq.com)。*消息代理*允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议，通过主题或队列事件目标进行通信。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。

您可使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。Lambda 通过[事件源映射](invocation-eventsourcemapping.md)调用您的函数，事件源映射是从您的代理读取消息并[同步](invocation-sync.md)调用函数的一种 Lambda 资源。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Amazon MQ 事件源映射有以下配置限制：
+ 并发 – 使用 Amazon MQ 事件源映射的 Lambda 函数具有默认的最大[并发](lambda-concurrency.md)设置。对于 ActiveMQ，Lambda 服务将每个 Amazon MQ 事件源映射的并发执行环境数量限制为 5 个。对于 RabbitMQ，每个 Amazon MQ 事件源映射的并发执行环境数量限制为 1 个。即使您更改了函数的预留或预调配并发设置，Lambda 服务也不会提供更多的执行环境。要请求增加单个 Amazon MQ 事件源映射的默认最大并发数，请联系 支持 并提供事件源映射 UUID 和区域。因为是在特定的事件源映射级别而不是账户或区域级别增加，所以需要为每个事件源映射手动请求按比例增加。
+ 跨账户 – Lambda 不支持跨账户处理。您不能使用 Lambda 处理来自不同 AWS 账户 账户中的 Amazon MQ 消息代理的记录。
+ 身份验证 – 对于 ActiveMQ，仅支持 ActiveMQ [SimpleAuthenticationPlugin](https://activemq.apache.org/security#simple-authentication-plugin)。对于 RabbitMQ，仅支持 [PLAIN](https://www.rabbitmq.com/access-control.html#mechanisms) 身份验证机制。用户必须使用 AWS Secrets Manager 来管理凭据。有关 ActiveMQ 身份验证的更多信息，请参阅 *Amazon MQ 开发人员指南*中的[使用 LDAP 集成 ActiveMQ 代理](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/security-authentication-authorization.html)。
+ 连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息，请参阅 *Amazon MQ 开发人员指南*中的 **Amazon MQ 中的配额**的[代理](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-limits.html#broker-limits)部分。
+ 连接 – 您可以在公有或私有虚拟私有云（VPC）中创建代理。对于私有 VPC，您的 Lambda 函数需要具备对 VPC 的访问权限才能接收消息。有关更多信息，请参阅此部分后面的[配置网络安全](process-mq-messages-with-lambda.md#process-mq-messages-with-lambda-networkconfiguration)。
+ 事件目标 – 仅支持队列目标。但是，您可以使用虚拟主题，虚拟主题在内部与主题行为一致，在与 Lambda 交互时与队列行为一致。有关更多信息，请参阅 Apache ActiveMQ 网站上的[虚拟目标](https://activemq.apache.org/virtual-destinations)和 RabbitMQ 网站上的[虚拟主机](https://www.rabbitmq.com/vhosts.html)。
+ 网络拓扑 – 对于 ActiveMQ，每个事件源映射仅支持一个单实例或备用代理。对于 RabbitMQ，每个事件源映射只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息，请参阅 *Amazon MQ 开发人员指南*中的 [Active MQ 代理架构](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-broker-architecture.html)和[RabbitMQ 代理架构](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/rabbitmq-broker-architecture.html)。
+ 协议 – 支持的协议取决于 Amazon MQ 集成的类型。
  + 对于 ActiveMQ 集成，Lambda 使用 OpenWire/Java Message Service (JMS) 协议来使用消息。消息的使用不支持任何其他协议。在 JMS 协议中，仅支持 [https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.6.0/html/classactivemq_1_1commands_1_1_active_m_q_text_message.html](https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.6.0/html/classactivemq_1_1commands_1_1_active_m_q_text_message.html) 和 [https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.9.0/html/classactivemq_1_1commands_1_1_active_m_q_bytes_message.html](https://activemq.apache.org/components/cms/api_docs/activemqcpp-3.9.0/html/classactivemq_1_1commands_1_1_active_m_q_bytes_message.html)。Lambda 还支持 JMS 自定义属性。有关 OpenWire 协议的更多信息，请参阅 Apache ActiveMQ 网站上的 [OpenWire](https://activemq.apache.org/openwire.html)。
  + 对于 RabbitMQ 集成，Lambda 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息，请参阅 RabbitMQ 网站上的 [AMQP 0-9-1 完整参考指南](https://www.rabbitmq.com/amqp-0-9-1-reference.html)。

Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本，请参阅 *Amazon MQ 开发人员指南*中的 [Amazon MQ 发布说明](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-release-notes.html)。

**注意**  
默认情况下，Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理，Lambda 将无法在该时段处理任何消息。

**Topics**
+ [了解 Amazon MQ 的 Lambda 使用者组](#services-mq-configure)
+ [为 Lambda 配置 Amazon MQ 事件源](process-mq-messages-with-lambda.md)
+ [事件源映射 API](services-mq-params.md)
+ [筛选来自 Amazon MQ 事件源的事件](with-mq-filtering.md)
+ [Amazon MQ 事件源映射错误排查](services-mq-errors.md)

## 了解 Amazon MQ 的 Lambda 使用者组
<a name="services-mq-configure"></a>

为了与 Amazon MQ 进行交互，Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 事件源，Lambda 会将记录合并为批处理，然后通过单个有效负载中将其发送到您的函数。要控制行为，您可以配置批处理时段和批处理大小。Lambda 会持续提取消息，直到达到 6 MB 的最大有效负载大小、批处理时段过期或记录数达到完整批处理大小时为止。有关更多信息，请参阅 [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

使用者组将消息作为字节 BLOB 进行检索，然后以 base64 格式编码为单个 JSON 有效负载，接下来调用您的函数。如果函数为批处理中的任何消息返回错误，Lambda 将重试整批消息，直到处理成功或消息过期为止。

**注意**  
尽管 Lambda 函数的最大超时限制通常为 15 分钟，但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射，仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

您可以使用 Amazon CloudWatch 中的 `ConcurrentExecutions` 指标监控给定函数的并发使用情况。有关并发的更多信息，请参阅 [为函数配置预留并发](configuration-concurrency.md)。

**Example Amazon MQ 记录事件**  

```
{
   "eventSource": "aws:mq",
   "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
   "messages": [
      { 
        "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-east-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", 
        "messageType": "jms/text-message",
        "deliveryMode": 1,
        "replyTo": null,
        "type": null,
        "expiration": "60000",
        "priority": 1,
        "correlationId": "myJMSCoID",
        "redelivered": false,
        "destination": { 
          "physicalName": "testQueue" 
        },
        "data":"QUJDOkFBQUE=",
        "timestamp": 1598827811958,
        "brokerInTime": 1598827811958, 
        "brokerOutTime": 1598827811959, 
        "properties": {
          "index": "1",
          "doAlarm": "false",
          "myCustomProperty": "value"
        }
      },
      { 
        "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-east-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
        "messageType": "jms/bytes-message",
        "deliveryMode": 1,
        "replyTo": null,
        "type": null,
        "expiration": "60000",
        "priority": 2,
        "correlationId": "myJMSCoID1",
        "redelivered": false,
        "destination": { 
          "physicalName": "testQueue" 
        },
        "data":"LQaGQ82S48k=",
        "timestamp": 1598827811958,
        "brokerInTime": 1598827811958, 
        "brokerOutTime": 1598827811959, 
        "properties": {
          "index": "1",
          "doAlarm": "false",
          "myCustomProperty": "value"
        }
      }
   ]
}
```

```
{
  "eventSource": "aws:rmq",
  "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
  "rmqMessagesByQueue": {
    "pizzaQueue::/": [
      {
        "basicProperties": {
          "contentType": "text/plain",
          "contentEncoding": null,
          "headers": {
            "header1": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                49
              ]
            },
            "header2": {
              "bytes": [
                118,
                97,
                108,
                117,
                101,
                50
              ]
            },
            "numberInHeader": 10
          },
          "deliveryMode": 1,
          "priority": 34,
          "correlationId": null,
          "replyTo": null,
          "expiration": "60000",
          "messageId": null,
          "timestamp": "Jan 1, 1970, 12:33:41 AM",
          "type": null,
          "userId": "AIDACKCEVSQ6C2EXAMPLE",
          "appId": null,
          "clusterId": null,
          "bodySize": 80
        },
        "redelivered": false,
        "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
      }
    ]
  }
}
```
在 RabbitMQ 示例中，`pizzaQueue` 是 RabbitMQ 队列的名称，`/` 是虚拟主机的名称。接收消息时，事件源会将消息列在 `pizzaQueue::/` 下。