亚马逊 MQ 消息代理作为 Pipes 中的来源 EventBridge - Amazon EventBridge

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊 MQ 消息代理作为 Pipes 中的来源 EventBridge

您可以使用 Pip EventBridge es 接收来自亚马逊 MQ 消息代理的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,您可以选择特定于 Amazon MQ 的设置。 EventBridge 在将数据发送到目标时,Pipes 会维护消息代理中记录的顺序。

Amazon MQ 是一项托管消息代理服务,用于 Apache ActiveMQRabbitMQ。消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议进行通信,使用主题或队列作为事件目标。

亚马逊 MQ 还可以通过安装 ActiveMQ 或 RabbitMQ 代理来代表您管理亚马逊弹性计算云EC2(亚马逊)实例。安装代理后,它会为您的实例提供不同的网络拓扑和其他基础设施需求。

Amazon MQ 源有以下配置限制:

  • 跨账户 — EventBridge 不支持跨账户处理。您不能使用 EventBridge 来处理来自不同 AWS 账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 — 对于 ActiveMQ,仅支持 ActiveMQ。 SimpleAuthenticationPlugin对于 RabbitMQ,仅支持PLAIN身份验证机制。要管理凭证,请使用 AWS Secrets Manager。有关 ActiveMQ 身份验证的更多信息,请参阅亚马逊 MQ 开发者指南中的 ActiveMQ 代理与集成。LDAP

  • 连接配额 - 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅《Amazon MQ 开发人员指南》中的 *Amazon MQ 中的配额*代理部分。

  • 连接 — 您可以在公共或私有虚拟私有云中创建代理(VPC)。对于私密VPCs设置,您的管道需要访问权限VPC才能接收消息。

  • 事件目标 - 仅支持队列目标。但是,您可以使用虚拟主题,在与管道交互时与,虚拟主题在内部与主题行为一致,在外部与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标和 RabbitMQ 网站上的虚拟主机

  • 网络拓扑 - 对于 ActiveMQ,管道仅支持一个单实例或备用代理。对于 RabbitMQ,每个管道只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息,请参阅《Amazon MQ 开发人员指南》中的 Active MQ 代理架构RabbitMQ 代理架构

  • 协议 - 支持的协议取决于您使用的 Amazon MQ 集成。

    • 对于 ActiveMQ 集成 EventBridge ,使用 /Java 消息服务 () 协议来使用 OpenWire消息。JMS任何其他协议都不支持消息使用。 EventBridge 仅支持JMS协议中的TextMessageBytesMessage操作。有关该 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站OpenWire上的内容。

    • 对于 RabbitMQ 集成, EventBridge 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 实现 AMQP 0-9-1 协议的更多信息,请参阅 RabbitMQ 网站上的 AMQP0-9-1 完整参考指南。

EventBridge 自动支持亚马逊 MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本,请参阅《Amazon MQ 开发人员指南》中的 Amazon MQ 发布说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有待机状态的经纪人,在窗口结束之前 EventBridge 不会处理消息。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon P EventBridge ipes 中的事件筛选

ActiveMQ

[ { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ]

RabbitMQ

[ { "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "eventSourceKey": "pizzaQueue::/", "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ]

使用者组

要与 Amazon MQ 互动,请 EventBridge 创建一个可以从您的亚马逊 MQ 经纪人那里读取信息的消费者群组。使用与管道相同的 ID 创建使用者组UUID。

对于 Amazon MQ 来源,对记录进行 EventBridge 批处理,然后通过单个有效负载将其发送到您的函数。要控制行为,您可以配置批处理时段和批处理大小。 EventBridge 提取消息,直到出现以下情况之一:

  • 处理后的记录达到 6 MB 的有效负载大小上限。

  • 批处理时间窗过期。

  • 记录数达到完整批次大小。

EventBridge 将您的批处理转换为单个有效负载,然后调用您的函数。消息既不会永久保存,也不会反序列化。相反,使用者组以字节BLOB的形式检索它们。然后 base64 将它们编码为有效载荷。JSON如果管道对批处理中的任何消息返回错误,则 EventBridge 会重试整批消息,直到处理成功或消息过期。

网络配置

默认情况下,在 PubliclyAccessible 标志设置为 false 时创建 Amazon MQ 代理。只有在 PubliclyAccessible 设置为 true 时,代理才会接收公有 IP 地址。要完全访问您的管道,您的经纪人必须使用公共端点或提供对管道的访问权限VPC。

如果您的亚马逊 MQ 经纪人无法公开访问,则 EventBridge 必须有权访问与您的经纪人关联的亚马逊虚拟私有云 VPC (Amazon) 资源。

  • 要访问您VPC的 Amazon MQ 代理, EventBridge 可以使用来源子网的出站互联网接入。对于公有子网,它必须是托管NAT网关。对于私有子网,它可以是NAT网关,也可以是您自己的NAT网关。确保NAT具有公有 IP 地址并且可以连接到互联网。

  • EventBridge Pipes 还支持通过传送事件 AWS PrivateLink,允许您在不通过公共互联网的情况下将事件从位于 Amazon Virtual Private Cloud (Amazon VPC) 的事件源发送到 Pipes 目标。您可以使用 Pipes 从 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、自我管理的 Apache Kafka 以及位于私有子网中的 Amazon MQ 源进行轮询,而无需部署互联网网关、配置防火墙规则或设置代理服务器。

    要设置VPC终端节点,请参阅AWS PrivateLink 用户指南中的创建VPC终端节点。对于服务名称,请选择com.amazonaws.region.pipes-data

使用以下规则(至少)配置您的 Amazon VPC 安全组:

  • 入站规则-允许 Amazon MQ 代理端口上为您的源指定的安全组的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Amazon MQ 代理端口上为您的源指定的安全组的所有流量。

    代理端口包括:

    • 9092 表示纯文本

    • 9094 for TLS

    • 9096 for SASL

    • 9098 for IAM

注意

您可以通过 Amazon MQ 发现您的亚马逊VPCAPI配置。在设置过程中,您不需要对其进行配置。