将 Lambda 与 Amazon MQ 结合使用
Amazon MQ 是 Apache ActiveMQ
Amazon MQ 还可以通过安装 ActiveMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。
您可以使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。您的函数通过事件源映射触发,事件源映射是从您的代理读取消息并同步调用函数的一种 Lambda 资源。
Amazon MQ 事件源映射有以下配置限制:
-
身份验证 – 仅支持 ActiveMQ SimpleAuthenticationPlugin
。与代理关联的用户凭证是唯一的连接方法。有关身份验证的更多信息,请参阅 Amazon MQ 开发人员指南 中的 ActiveMQ 的消息收发身份验证和授权。 -
连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅 Amazon MQ 开发人员指南 中的配额Amazon MQ的代理部分。
-
连接 – 您可以在公共或私有 Virtual Private Cloud (VPC) 中创建代理。对于私有 VPC,您的 Lambda 函数需要具备对 VPC 的访问权限才能与记录进行交互。有关更多信息,请参阅本主题后面的 事件源映射 API。
-
事件目标 – 仅支持队列目标。但是,您可以使用虚拟主题,虚拟主题在内部与主题行为一致,在与 Lambda 交互时与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标
。 -
网络拓扑 – 每个事件源映射仅支持一个单实例或备用代理。单实例代理需要一个故障转移终端节点。有关这些代理部署模式的更多信息,请参阅 Amazon MQ 开发人员指南 中的 Amazon MQ 代理架构。
-
协议 – Lambda 按照 OpenWire/Java Message Service (JMS) 协议使用消息。不支持任何其他协议。在 JMS 协议中,仅支持
TextMessage
和 BytesMessage
。有关 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站上的 OpenWire 。
Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ。有关受支持的最新版本,请参阅 Amazon MQ 开发人员指南 中的 Amazon MQ 发行说明。
默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理,Lambda 将无法在该时段处理任何消息。
Lambda 使用者组
为了与 Amazon MQ 进行交互,Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。
Lambda 将提取消息,直到达到最大处理容量 (6MB)、超时或满足批处理大小。配置后,批处理大小将确定单个批次中要检索的最大项目数。您的批处理将转换为 Lambda 有效负载,并调用您的目标函数。消息既不会永久保存,也不会反序列化。相反,它们被使用者组作为字节 BLOB 进行检索,并进行 base64 编码以形成 JSON 有效负载。
函数调用的最长时间为 14 分钟。
Lambda 同时处理所有传入批次,并自动扩展并发以满足需求。您可以使用 Amazon CloudWatch 中的 ConcurrentExecutions
指标监控给定函数的并发使用情况。有关并发的更多信息,请参阅 管理 Lambda 函数的并发。
例 Amazon MQ 记录事件
{
"eventSource": "aws:amq",
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
"messages": { [
{
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
"messageType": "jms/text-message",
"data": "QUJDOkFBQUE=",
"connectionId": "myJMSCoID",
"redelivered": false,
"destination": {
"physicalname": "testQueue"
},
"timestamp": 1598827811958,
"brokerInTime": 1598827811958,
"brokerOutTime": 1598827811959
},
{
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
"messageType":"jms/bytes-message",
"data": "3DTOOW7crj51prgVLQaGQ82S48k=",
"connectionId": "myJMSCoID1",
"persistent": false,
"destination": {
"physicalname": "testQueue"
},
"timestamp": 1598827811958,
"brokerInTime": 1598827811958,
"brokerOutTime": 1598827811959
}
]
}
}
执行角色权限
要从 Amazon MQ 代理读取记录,您的 Lambda 函数需要向其 执行角色 添加以下权限:
使用加密的客户托管密钥时,也可以添加 kms:Decrypt
权限。
将代理配置为事件源
创建事件源映射以指示 Lambda 将 Amazon MQ 代理中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个函数处理相同的数据,或使用单个函数处理来自多个源的项目。
要将您的函数配置为从 Amazon MQ 读取,请在 Lambda 控制台中创建一个 MQ 触发器。
创建触发器
-
打开 Lambda 控制台的“函数”页面
。 -
选择函数。
-
在 Function overview (函数概览) 下,选择 Add trigger (添加触发器)。
-
选择触发器类型。
-
配置所需选项,然后选择 Add (添加)。
Lambda 支持对 Amazon MQ 事件源使用以下选项:
-
MQ broker (MQ 代理) – 选择一个 Amazon MQ 代理。
-
Batch size (批处理大小) – 设置要在单个批次中检索的最大消息数。
-
Queue name (队列名称) – 输入要使用的 Amazon MQ 队列。
-
Source access configuration (源访问配置) – 选择存储代理凭据的 AWS Secrets Manager 密码。
-
启用触发器 – 禁用触发器以停止处理记录。
要启用或禁用触发器(或删除触发器),请在设计器中选择 MQ 触发器。要重新配置触发器,请使用事件源映射 API 操作。
事件源映射 API
要使用 AWS CLI 或 AWS 开发工具包
要使用 AWS Command Line Interface (AWS CLI) 创建事件源映射,请使用 create-event-source-mapping
默认情况下,在 PubliclyAccessible
标志设置为 false 时创建 Amazon MQ 代理。只有在 PubliclyAccessible
设置为 true 时,才会为代理提供公有 IP 地址。
要获得对事件源映射的完全访问权限,您的代理必须使用公有终端节点或提供对 VPC 的访问权限。要满足 Amazon VPC 访问要求,您可以执行以下操作之一:
-
为每个公有子网配置一个 NAT 网关。有关更多信息,请参阅 VPC 连接函数的 Internet 和服务访问。
-
在 Amazon VPC 和 Lambda 之间创建连接。您的 Amazon VPC 还必须连接到 AWS STS 和 Secrets Manager 终端节点。有关更多信息,请参阅 为 Lambda 配置 接口 VPC 终端节点。
您配置的 Amazon Virtual Private Cloud (Amazon VPC) 安全组规则至少应具有以下设置:
-
入站规则 – 对于没有公共可访问性的代理,允许指定为源的安全组的所有端口上的所有流量。对于具有公共可访问性的代理,允许所有目标的所有端口上的所有流量。
-
出站规则 – 允许所有目标的所有端口上的所有流量传输。
Amazon VPC 配置可通过 Amazon MQAPI 发现,无需在 create-event-source-mapping
设置中配置。
以下示例 AWS CLI 命令将创建一个事件源,将名为 MQ-Example-Function
的 Lambda 函数映射到名为 ExampleMQBroker
的 Amazon MQ 代理。该命令还提供了一个名为 ExampleMQBrokerUserPassword
的 Secrets Manager 密钥,用于存储代理凭据。
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca
\
--function-name MQ-Example-Function
\
--source-access-configuration Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:12345678901:secret:ExampleMQBrokerUserPassword-xPBMTt
\
--queues ExampleQueue
您应看到以下输出:
{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:12345678901:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:12345678901:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }
使用 update-event-source-mapping
命令可以配置其他选项,例如如何处理批次,以及指定何时丢弃无法处理的记录。以下示例命令将事件源映射更新为批处理大小为 2。
aws lambda update-event-source-mapping \
--uuid 91eaeb7e-c976-1234-9451-8709db01f137
\
--batch-size 2
您应看到以下输出:
{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:12345678901:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }
更新的设置是异步应用的,并且直到该过程完成才反映在输出中。要查看资源的当前状态,请使用 get-event-source-mapping
aws lambda get-event-source-mapping \
--uuid 91eaeb7e-c976-4939-9451-8709db01f137
您应看到以下输出:
{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:12345678901:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }
事件源映射错误
当 Lambda 函数遇到不可恢复的错误时,您的 Amazon MQ 使用者将停止处理记录。任何其他使用者如果没有遇到相同的错误,都可以继续处理。要确定使用者停止的潜在原因,请检查
EventSourceMapping
返回的详细信息中的 StateTransitionReason
字段中是否有以下代码:
ESM_CONFIG_NOT_VALID
-
事件源映射配置无效。
EVENT_SOURCE_AUTHN_ERROR
-
Lambda 未能验证事件源。
EVENT_SOURCE_AUTHZ_ERROR
-
Lambda 没有访问事件源所需的权限。
FUNCTION_CONFIG_NOT_VALID
-
函数的配置无效。
如果记录由于其大小而被丢弃,也将处于未处理状态。Lambda 记录的大小限制为 6 MB。要在函数出错时重新传递消息,您可以使用重新传递策略和死信队列 (DLQ)
处理 Amazon MQ。有关更多信息,请参阅 Apache ActiveMQ 网站上的消息重新传递和 DLQ 处理