搭配使用 Lambda 與 Amazon MQ - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配使用 Lambda 與 Amazon MQ

注意

如果您想要將資料傳送到 Lambda 函數以外的目標,或在傳送資料之前豐富資料,請參閱 Amazon EventBridge 管道

Amazon MQ 是一項受管訊息代理程式服務,適用於 Apache ActiveMQRabbitMQ訊息代理程式透過主題或佇列事件目的地,允許軟體應用程式和元件使用各種程式設計語言、作業系統和正式簡訊協定來進行通訊。

Amazon MQ 還可以透過安裝 ActiveMQ 或 RabbitMQ 代理程式,並提供不同的網路拓撲和其他基礎架構需求,來代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 執行個體。

您可以使用 Lambda 函數來處理您的 Amazon MQ 訊息代理程式中的記錄。Lambda 透過事件來源映射叫用函數,這是從您的代理程式讀取訊息並同步叫用函數的 Lambda 資源。

警告

Lambda 事件來源對應至少處理每個事件一次,並且可能會重複處理記錄。為了避免與重複事件相關的潛在問題,我們強烈建議您將函數代碼設為冪等。若要深入了解,請參閱 AWS 知識中心如何讓 Lambda 函數具有冪等性

Amazon MQ 事件來源映射具有下列組態限制:

  • 並行:使用 Amazon MQ 事件來源映射的 Lambda 函數具有預設並行上限設定。若使用 ActiveMQ,Lambda 服務會將並行執行環境的數量上限設為 5 個。若使用 RabbitMQ,並行執行環境的數量上限為 1 個。即使您變更函數的保留或佈建並行設定,Lambda 服務還是無法提供更多可用的執行環境。如要請求提升預設的並行上限,請聯絡 AWS Support。

  • 跨帳戶 - Lambda 不支援跨帳戶處理。您無法用 Lambda 處理來自不同 AWS 帳戶中 Amazon MQ 訊息代理程式的記錄。

  • 驗證 — 對於 ActiveMQ,僅支援 ActiveM SimpleAuthenticationPluginQ。對於 RabbitMQ,僅支援 PLAIN 身分驗證機制。使用者必須使用 AWS Secrets Manager 來管理其認證。如需 ActiveMQ 身分驗證的詳細資訊,請參閱 Amazon MQ 開發人員指南中的將 ActiveMQ 代理程式與 LDAP 整合

  • 連線配額 - 代理程式每個線路層級協定允許的連線數量上限。此配額以代理程式執行個體類型為基礎。如需詳細資訊,請參閱 Amazon MQ 開發人員指南中的 Amazon MQ 中的配額代理程式

  • 連線 - 您可以在公有或私有 Virtual Private Cloud (VPC) 中建立代理程式。若是私有 VPC,您的 Lambda 函數需要存取 VPC 才能接收訊息。如需詳細資訊,請參閱本主題後面部分的 網路組態

  • 事件目的地 - 僅支援佇列目的地。然而,您可以使用虛擬主題,當作為佇列與 Lambda 互動時,其行為在內部可作為主題。如需詳細資訊,請參閱 Apache ActiveMQ 網站上的虛擬目的地,以及 RabbitMQ 網站上的虛擬主機

  • 網路拓撲 - 對於 ActiveMQ,每個事件來源映射僅支援單一執行個體或待命代理程式。對於 RabbitMQ,每個事件來源映射僅支援單一執行個體代理程式或叢集部署。單一執行個體代理程式需要容錯移轉端點。如需這些代理程式部署模式的詳細資訊,請參閱 Amazon MQ 開發人員指南中的 ActiveMQ 代理程式架構Rabbit MQ 代理程式架構

  • 協定 - 支援的協定取決於 Amazon MQ 整合的類型。

    • 若是 ActiveMQ 整合,Lambda 會使用 OpenWire /Java 訊息服務 (JMS) 通訊協定取用訊息。不支援透過其他協定來取用訊息。在 JMS 協定中,僅支援 TextMessageBytesMessage。Lambda 也支援 JMS 自訂屬性。如需有關通訊 OpenWire 協定的詳細資訊,請參閱 Apache OpenWire的 ActiveMQ 網站上。

    • 對於 RabbitMQ 整合,Lambda 透過 AMQP 0-9-1 協定來取用訊息。不支援透過其他協定來取用訊息。如需 RabbitMQ 實作 AMQP 0-9-1 協定的詳細資訊,請參閱 RabbitMQ 網站上的 AMQP 0-9-1 完整參考指南

Lambda 會自動支援 Amazon MQ 支援的最新版 ActiveMQ 和 RabbitMQ。如需支援的最新版,請參閱 Amazon MQ 開發人員指南中的 Amazon MQ 版本備註

注意

根據預設,Amazon MQ 具有每週代理程式維護時段。代理程式在該時段不可用。若是無待命狀態的代理程式,則 Lambda 無法在該時段處理任何訊息。

Lambda 取用者群組

若要與 Amazon MQ 互動,Lambda 會建立可以從您的 Amazon MQ 代理程式讀取的取用者群體。建立取用者群組時,會使用與事件來源映射 UUID 相同的 ID。

對於 Amazon MQ 事件來源,Lambda 會批次處理記錄,並在單個承載中將它們傳送到您的函數。要控制行為,您可以設定批次間隔和批次大小。Lambda 會提取訊息,直到它處理最大為 6 MB 的承載大小、批次間隔過期或記錄數達到完整批次大小。如需詳細資訊,請參閱 批次處理行為

取用者群組會將訊息擷取為位元組 BLOB,並透過 base64 將其編碼成單一 JSON 承載,然後調用您的函數。如果函數針對批次中的任何訊息傳回錯誤,Lambda 會重試整個批次的訊息,直至處理成功或訊息過期。

注意

雖然 Lambda 函數的逾時上限通常為 15 分鐘,但 Amazon MSK、自我管理的 Apache Kafka、Amazon DocumentDB 以及 Amazon MQ for ActiveMQ 和 Amazon MQ for RabbitMQ 的事件來源映射只支援 14 分鐘逾時限制上限的函數。此限制條件可確保事件來源映射能夠正確處理函數錯誤和重試。

您可以使用 Amazon CloudWatch 中的指ConcurrentExecutions標監視給定函數的並行使用情況。如需並行的詳細資訊,請參閱設定預留並行

範例 Amazon MQ 記錄事件
ActiveMQ
{ "eventSource": "aws:mq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注意

在 RabbitMQ 範例中,pizzaQueue 是 RabbitMQ 佇列的名稱,/ 是虛擬主機的名稱。接收訊息時,事件來源會在 pizzaQueue::/ 列出訊息。

執行角色許可

若要從 Amazon MQ 代理程式讀取記錄,您的 Lambda 函數需要將下列許可新增至其執行角色

注意

使用加密的客戶管理金鑰時,也請新增 kms:Decrypt 許可。

網路組態

若要透過事件來源映射授予 Lambda 代理程式的完整存取權,您的代理程式必須使用公有端點 (公有 IP 地址),或者您必須提供建立代理程式之 Amazon VPC 的存取權。

預設情況下,當您建立 Amazon MQ 代理程式時,PubliclyAccessible 旗標會設為 false。為了讓您的代理程式能夠接收到公有 IP 地址,您必須將 PubliclyAccessible 旗標設為 true。

將 Amazon MQ 與 Lambda 搭配使用的最佳做法是使用 AWS PrivateLink VPC 端點,並將 Lambda 函數存取權授予代理程式 VPC 的存取權。部署 Lambda 的端點,以及 () 的端點 (僅適用於 ActiveMQ AWS Security Token Service )AWS STS。如果您的代理程式使用驗證,請同時為 AWS Secrets Manager. 如需進一步了解,請參閱使用 VPC 端點

或者,在包含 Amazon MQ 代理程式的 VPC 中的每個公有子網路上設定 NAT 閘道。如需詳細資訊,請參閱 為虛擬私人電腦連線的 Lambda 函數啟用網際網路

當您為 Amazon MQ 代理程式建立事件來源對應時,Lambda 會檢查代理程式 VPC 的子網路和安全群組是否已存在彈性網路界面 (ENI)。如果 Lambda 找到現有的 ENI,它會嘗試重複使用它們。否則,Lambda 會建立新的 ENI 以連接至事件來源並叫用您的函數。

注意

Lambda 函數一律在 Lambda 服務擁有的 VPC 內執行。這些 VPC 由服務自動維護,客戶看不到。您也可以將您的功能連接到 Amazon VPC。在任何一種情況下,函數的 VPC 配置都不會影響事件源映射。只有事件來源 VPC 的組態才會決定 Lambda 連線至事件來源的方式。

VPC 安全群組規則

使用下列規則 (至少) 為包含叢集的 Amazon VPC 設定安全群組:

  • 傳入規則:允許為事件來源指定之安全群組的代理程式連接埠上的所有流量來自其安全群組。預設情況下,ActiveMQ 會使用連接埠 61617,RabbitMQ 則會使用連接埠 5671。

  • 傳出規則:針對所有目的地,允許連接埠 443 上的所有流量。允許代理程式連接埠上的所有流量位於其安全群組。預設情況下,ActiveMQ 會使用連接埠 61617,RabbitMQ 則會使用連接埠 5671。

  • 如果您使用的是 VPC 端點而不是 NAT 閘道,則與 VPC 端點相關聯的安全群組必須允許連接埠 443 上所有來自事件來源安全群組的傳入流量。

使用 VPC 端點

當您使用 VPC 端點時,會使用 ENI 透過這些端點路由呼叫函數的 API 呼叫。Lambda 服務主體需要呼叫lambda:InvokeFunction使用這些 ENI 的任何函數。此外,對於 ActiveMQ,Lambda 服務主體需要呼叫sts:AssumeRole使用 ENI 的角色。

根據預設,VPC 端點具有開放的 IAM 政策。最佳做法是將這些原則限制為僅允許特定主參與者使用該端點執行所需的動作。為了確保您的事件來源對應能夠叫用 Lambda 函數,虛擬私人雲端端點政策必須允許 Lambda 服務原則呼叫,lambda:InvokeFunction而且如果是 ActiveMQ,則必須允許 Lambda 服務原則呼叫。sts:AssumeRole將 VPC 端點原則限制為僅允許來自組織內的 API 呼叫,可防止事件來源對應正常運作。

下列範例 VPC 端點原則示範如何授與 AWS STS 和 Lambda 端點所需的存取權。

範例 VPC 私人雲端端點策略- AWS STS 端點(僅適用於 ActiveMQ)
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
範例 VPC 私人雲端端點政策
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

如果您的 Amazon MQ 代理程式使用身份驗證,您也可以限制 Secrets Manager 端點的 VPC 端點政策。若要呼叫機 Secrets Manager API,Lambda 會使用您的函數角色,而不是使用 Lambda 服務主體。下列範例顯示 Secrets Manager 端點策略。

範例 VPC 端點原則-Secrets Manager 端點
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

新增權限並建立事件來源對應

建立事件來源映射,指示 Lambda 將記錄從 Amazon MQ 代理程式傳送至 Lambda 函數。您可以建立多個事件來源映射,來使用多個函數處理相同資料,或使用單一函數處理來自多個來源的項目。

若要將函數設定為從 Amazon MQ 讀取,請新增必要的許可,並在 Lambda 主控台中建立 MQ 觸發器。

若要新增權限並建立觸發器
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇函數的名稱。

  3. 依序選擇 Configuration (組態) 索引標籤和 Permissions (許可)。

  4. 在 [角色名稱] 下,選擇指向您的執行角色的連結。此連結會在 IAM 主控台中開啟角色。

    
            連結至執行角色
  5. 選擇 [新增權限],然後選擇 [建立內嵌原則]。

    
            在 IAM 主控台中建立內嵌政策
  6. 在 [原則編輯器] 中,選擇 [JSON]。輸入下列政策。您的函數需要這些許可才能從 Amazon MQ 代理程式讀取。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "mq:DescribeBroker", "secretsmanager:GetSecretValue", "ec2:CreateNetworkInterface", "ec2:DeleteNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
    注意

    使用加密的客戶管理金鑰時,您還必須新增kms:Decrypt權限。

  7. 選擇下一步。輸入策略名稱,然後選擇 [建立策略]。

  8. 返回 Lambda 主控台中的函數。在 函式概觀 下,選擇 新增觸發條件

    
            Lambda 主控台的函數概觀區段
  9. 選擇 MQ 觸發器類型。

  10. 設定需要的選項,然後選擇 新增

Lambda 支援 Amazon MQ 事件來源的下列選項:

  • MQ broker (MQ 代理程式) – 選取 Amazon MQ 代理程式。

  • Batch size (批次大小) - 設定單一批次中要擷取的訊息數目上限。

  • Queue name (佇列名稱) - 輸入要取用的 Amazon MQ 佇列。

  • Source access configuration (來源存取組態) - 輸入儲存您代理程式憑證的虛擬主機資訊和 Secrets Manager 機密。

  • Enable trigger (啟用觸發條件) - 停用觸發條件以停止處理記錄。

若要啟用或停用觸發條件 (或將其刪除),請在設計工具中選擇 MQ 觸發條件。若要重新設定觸發條件,請使用事件來源映射 API 操作。

事件來源映射 API

若要使用 AWS Command Line Interface (AWS CLI)AWS SDK 來管理事件來源,可以使用下列 API 操作:

若要使用 AWS Command Line Interface (AWS CLI) 建立事件來源對應,請使用create-event-source-mapping指令。

下列範例 AWS CLI 命令會建立事件來源,該事件來源會將名為的 Lambda 函數對應MQ-Example-Function至名為的 Amazon MQ RabbitMQ 代理程式。ExampleMQBroker該命令還會提供虛擬主機名稱以及存放代理程式憑證的 Secrets Manager 機密 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98 \ --function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function \ --queues ExampleQueue \ --source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt \

您應該會看到下列輸出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }

使用 update-event-source-mapping 命令時,您可以設定額外選項,例如 Lambda 的批次處理方式,以及指定何時捨棄無法處理的記錄。下列範例命令會將事件來源映射更新為具有批次大小 2。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

您應該會看到下列輸出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

Lambda 會以非同步方式更新這些設定。在此程序完成之前,輸出不會反映變更。使用 get-event-source-mapping 命令可檢視資源的目前狀態。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

您應該會看到下列輸出:

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

事件來源映射錯誤

當 Lambda 函數遇到無法復原的錯誤時,您的 Amazon MQ 取用者將停止處理記錄。任何其他取用者可能會繼續處理,只要他們沒有遇到相同的錯誤。若要判斷停止 EventSourceMapping 取用者的潛在原因,請檢查傳回詳細資料中的 StateTransitionReason 欄位,以取得下列其中一個程式碼:

ESM_CONFIG_NOT_VALID

事件來源對應組態無效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 驗證事件來源失敗。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 沒有存取事件來源所需的許可。

FUNCTION_CONFIG_NOT_VALID

該函數的配置無效。

如果記錄因大小而遭 Lambda 棄置,則也將不會得到處理。Lambda 記錄的大小限制為 6MB。若要在函數錯誤時重新傳遞訊息,您可以使用無效字母佇列 (DLQ)。如需詳細資訊,請參閱 Apache ActiveMQ 網站上的訊息重新傳遞和 DLQ 處理,以及 RabbitMQ 網站上的可靠性指南

注意

Lambda 不支援自訂重新傳遞政策,相反地,Lambda 會使用 Apache ActiveMQ 網站上「重新傳遞政策」頁面中預設值的原則,且maximumRedeliveries設定為 6。

Amazon MQ 和 RabbitMQ 組態參數

所有 Lambda 事件來源類型都共用相同CreateEventSourceMappingUpdateEventSourceMappingAPI 作業。但是,只有一些參數適用於 Amazon MQ 和 RabbitMQ。

適用於 Amazon MQ 和 RabbitMQ 的事件來源參數
參數 必要 預設 備註

BatchSize

100

上限:10,000

已啟用

N

true

FunctionName

Y

FilterCriteria

N

Lambda 事件篩選

MaximumBatchingWindowInSeconds

N

500 毫秒

批次處理行為

佇列

N

要使用的 Amazon MQ 代理程式目的地佇列的名稱。

SourceAccessConfigurations

N

若為 ActiveMQ,可使用 BASIC_AUTH 憑證。若為 RabbitMQ,可同時包含 BASIC_AUTH 憑證和 VIRTUAL_HOST 資訊。