Using Lambda with Amazon MQ
Note
If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.
Amazon MQ is a managed message broker service for Apache ActiveMQ
Amazon MQ can also manage Amazon Elastic Compute Cloud (Amazon EC2) instances on your behalf by installing ActiveMQ or RabbitMQ brokers and by providing different network topologies and other infrastructure needs.
You can use a Lambda function to process records from your Amazon MQ message broker. Lambda invokes your function through an event source mapping, a Lambda resource that reads messages from your broker and invokes the function synchronously.
Warning
Lambda event source mappings process each event at least once, and duplicate processing of records can occur. To avoid potential issues
related to duplicate events, we strongly recommend that you make your function code idempotent. To learn more, see How do I make my Lambda function idempotent
The Amazon MQ event source mapping has the following configuration restrictions:
-
Concurrency – Lambda functions that use an Amazon MQ event source mapping have a default maximum concurrency setting. For ActiveMQ, the Lambda service limits the number of concurrent execution environments to five. For RabbitMQ, the number of concurrent execution environments is limited to 1. Even if you change your function's reserved or provisioned concurrency settings, the Lambda service won't make more execution environments available. To request an increase in the default maximum concurrency, contact AWS Support.
Cross account – Lambda does not support cross-account processing. You cannot use Lambda to process records from an Amazon MQ message broker that is in a different AWS account.
-
Authentication – For ActiveMQ, only the ActiveMQ SimpleAuthenticationPlugin
is supported. For RabbitMQ, only the PLAIN authentication mechanism is supported. Users must use AWS Secrets Manager to manage their credentials. For more information about ActiveMQ authentication, see Integrating ActiveMQ brokers with LDAP in the Amazon MQ Developer Guide. -
Connection quota – Brokers have a maximum number of allowed connections per wire-level protocol. This quota is based on the broker instance type. For more information, see the Brokers section of Quotas in Amazon MQ in the Amazon MQ Developer Guide.
-
Connectivity – You can create brokers in a public or private virtual private cloud (VPC). For private VPCs, your Lambda function needs access to the VPC to receive messages. For more information, see Network configuration later in this topic.
-
Event destinations – Only queue destinations are supported. However, you can use a virtual topic, which behaves as a topic internally while interacting with Lambda as a queue. For more information, see Virtual Destinations
on the Apache ActiveMQ website, and Virtual Hosts on the RabbitMQ website. -
Network topology – For ActiveMQ, only one single-instance or standby broker is supported per event source mapping. For RabbitMQ, only one single-instance broker or cluster deployment is supported per event source mapping. Single-instance brokers require a failover endpoint. For more information about these broker deployment modes, see Active MQ Broker Architecture and Rabbit MQ Broker Architecturein the Amazon MQ Developer Guide.
-
Protocols – Supported protocols depend on the type of Amazon MQ integration.
For ActiveMQ integrations, Lambda consumes messages using the OpenWire/Java Message Service (JMS) protocol. No other protocols are supported for consuming messages. Within the JMS protocol, only
TextMessage
and BytesMessage
are supported. Lambda also supports JMS custom properties. For more information about the OpenWire protocol, see OpenWire on the Apache ActiveMQ website. For RabbitMQ integrations, Lambda consumes messages using the AMQP 0-9-1 protocol. No other protocols are supported for consuming messages. For more information about RabbitMQ's implementation of the AMQP 0-9-1 protocol, see AMQP 0-9-1 Complete Reference Guide
on the RabbitMQ website.
Lambda automatically supports the latest versions of ActiveMQ and RabbitMQ that Amazon MQ supports. For the latest supported versions, see Amazon MQ release notes in the Amazon MQ Developer Guide.
Note
By default, Amazon MQ has a weekly maintenance window for brokers. During that window of time, brokers are unavailable. For brokers without standby, Lambda cannot process any messages during that window.
Sections
Lambda consumer group
To interact with Amazon MQ, Lambda creates a consumer group which can read from your Amazon MQ brokers. The consumer group is created with the same ID as the event source mapping UUID.
For Amazon MQ event sources, Lambda batches records together and sends them to your function in a single payload. To control behavior, you can configure the batching window and batch size. Lambda pulls messages until it processes the payload size maximum of 6 MB, the batching window expires, or the number of records reaches the full batch size. For more information, see Batching behavior.
The consumer group retrieves the messages as a BLOB of bytes, base64-encodes them into a single JSON payload, and then invokes your function. If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire.
Note
While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.
You can monitor a given function's concurrency usage using the ConcurrentExecutions
metric in
Amazon CloudWatch. For more information about concurrency, see Configuring reserved concurrency.
Example Amazon MQ record events
Note
In the RabbitMQ example, pizzaQueue
is the name of the RabbitMQ queue, and /
is the
name of the virtual host. When receiving messages, the event source lists messages under
pizzaQueue::/
.
Execution role permissions
To read records from an Amazon MQ broker, your Lambda function needs the following permissions added to its execution role:
Note
When using an encrypted customer managed key, add the kms:Decrypt
permission as well.
Network configuration
To give Lambda full access to your broker through your event source mapping, either your broker must use a public endpoint (public IP address), or you must provide access to the Amazon VPC you created the broker in.
By default, when you create an Amazon MQ broker, the PubliclyAccessible
flag is set to false. For your broker to receive a public
IP address, you must set the PubliclyAccessible
flag to true.
Best practice for using Amazon MQ with Lambda is to use AWS PrivateLink VPC endpoints and to give your Lambda function access to your broker's VPC. Deploy an endpoint for Lambda, and, for ActiveMQ only, an endpoint for AWS Security Token Service (AWS STS). If your broker uses authentication, also deploy an endpoint for AWS Secrets Manager. To learn more, see Working with VPC endpoints.
Alternatively, configure a NAT gateway on each public subnet in the VPC containing your Amazon MQ broker. For more information, see Enable internet access for VPC-connected Lambda functions.
When you create an event source mapping for an Amazon MQ broker, Lambda checks whether Elastic Network Interfaces (ENIs) are already present for the subnets and security groups of your broker’s VPC. If Lambda finds existing ENIs, it attempts to re-use them. Otherwise, Lambda creates new ENIs to connect to the event source and invoke your function.
Note
Lambda functions always run inside VPCs owned by the Lambda service. These VPCs are maintained automatically by the service and are not visible to customers. You can also connect your function to an Amazon VPC. In either case, your function’s VPC configuration doesn’t affect the event source mapping. Only the configuration of the event source’s VPC determines how Lambda connects to your event source.
VPC security group rules
Configure the security groups for the Amazon VPC containing your cluster with the following rules (at minimum):
-
Inbound rules – Allow all traffic on the broker port for the security group specified for your event source from within its own security group. ActiveMQ uses port 61617 by default and RabbitMQ uses port 5671 by default.
-
Outbound rules – Allow all traffic on port 443 for all destinations. Allow all traffic on the broker port for within its own security group. ActiveMQ uses port 61617 by default and RabbitMQ uses port 5671 by default.
-
If you use VPC endpoints instead of a NAT gateway, the security groups associated with the VPC endpoints must allow all inbound traffic on port 443 from the event source's security groups.
Working with VPC endpoints
When you use VPC endpoints, API calls to invoke your function are routed through these endpoints using the ENIs.
The Lambda service principal needs to call lambda:InvokeFunction
on any functions that use those ENIs.
Additionally, for ActiveMQ, the Lambda service principal needs to call sts:AssumeRole
on roles that use the ENIs.
By default, VPC endpoints have IAM policies which are open. Best practice is to restrict these policies to only allow
specific principals to perform the needed actions using that endpoint. To ensure that your event source mapping is able to
invoke your Lambda function, the VPC endpoint policy must allow the Lambda service principle to call lambda:InvokeFunction
and, for ActiveMQ, sts:AssumeRole
. Restricting your VPC endpoint policies to only allow API calls originating within
your organization prevents the event source mapping from functioning properly.
The following example VPC endpoint policies show how to grant the required access for AWS STS and Lambda endpoints.
Example VPC endpoint policy - AWS STS endpoint (ActiveMQ only)
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
Example VPC endpoint policy - Lambda endpoint
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
If your Amazon MQ broker uses authentication, you can also restrict the VPC endpoint policy for the Secrets Manager endpoint. To call the Secrets Manager API, Lambda uses your function role, not the Lambda service principal. The following example shows a Secrets Manager endpoint policy.
Example VPC endpoint policy - Secrets Manager endpoint
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "
customer_function_execution_role_arn
" ] }, "Resource": "customer_secret_arn
" } ] }
Add permissions and create the event source mapping
Create an event source mapping to tell Lambda to send records from an Amazon MQ broker to a Lambda function. You can create multiple event source mappings to process the same data with multiple functions, or to process items from multiple sources with a single function.
To configure your function to read from Amazon MQ, add the required permissions and create an MQ trigger in the Lambda console.
To add permissions and create a trigger
Open the Functions page
of the Lambda console. -
Choose the name of a function.
-
Choose the Configuration tab, and then choose Permissions.
-
Under Role name, choose the link to your execution role. This link opens the role in the IAM console.
-
Choose Add permissions, and then choose Create inline policy.
-
In the Policy editor, choose JSON. Enter the following policy. Your function needs these permissions to read from an Amazon MQ broker.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "mq:DescribeBroker", "secretsmanager:GetSecretValue", "ec2:CreateNetworkInterface", "ec2:DeleteNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
Note
When using an encrypted customer managed key, you must also add the
kms:Decrypt
permission. -
Choose Next. Enter a policy name and then choose Create policy.
-
Go back to your function in the Lambda console. Under Function overview, choose Add trigger.
-
Choose the MQ trigger type.
-
Configure the required options, and then choose Add.
Lambda supports the following options for Amazon MQ event sources:
-
MQ broker – Select an Amazon MQ broker.
-
Batch size – Set the maximum number of messages to retrieve in a single batch.
-
Queue name – Enter the Amazon MQ queue to consume.
-
Source access configuration – Enter virtual host information and the Secrets Manager secret that stores your broker credentials.
-
Enable trigger – Disable the trigger to stop processing records.
To enable or disable the trigger (or delete it), choose the MQ trigger in the designer. To reconfigure the trigger, use the event source mapping API operations.
Event source mapping API
To manage an event source with the AWS Command Line Interface (AWS CLI) or an AWS SDK
To create the event source mapping with the AWS Command Line Interface (AWS CLI), use the create-event-source-mapping
command.
The following example AWS CLI command creates an event source which maps a Lambda function named
MQ-Example-Function
to an Amazon MQ RabbitMQ-based broker named ExampleMQBroker
. The command also
provides the virtual host name and a Secrets Manager secret ARN that stores the broker
credentials.
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98
\
--function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function
\
--queues ExampleQueue
\
--source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt
\
You should see the following output:
{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }
Using the update-event-source-mapping
command, you can configure additional options such as how Lambda
processes batches and to specify when to discard records that cannot be processed. The following example command
updates an event source mapping to have a batch size of 2.
aws lambda update-event-source-mapping \
--uuid 91eaeb7e-c976-1234-9451-8709db01f137
\
--batch-size 2
You should see the following output:
{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }
Lambda updates these settings asynchronously. The output will not reflect changes until this process completes.
To view the current status of your resource, use the get-event-source-mapping
command.
aws lambda get-event-source-mapping \
--uuid 91eaeb7e-c976-4939-9451-8709db01f137
You should see the following output:
{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }
Event source mapping errors
When a Lambda function encounters an unrecoverable error, your Amazon MQ consumer stops processing records. Any
other consumers can continue processing, provided that they do not encounter the same error. To determine the
potential cause of a stopped consumer, check the StateTransitionReason
field in the return details of
your EventSourceMapping
for one of the following codes:
ESM_CONFIG_NOT_VALID
-
The event source mapping configuration is not valid.
EVENT_SOURCE_AUTHN_ERROR
-
Lambda failed to authenticate the event source.
EVENT_SOURCE_AUTHZ_ERROR
-
Lambda does not have the required permissions to access the event source.
FUNCTION_CONFIG_NOT_VALID
-
The function's configuration is not valid.
Records also go unprocessed if Lambda drops
them due to their size. The size limit for Lambda records is 6 MB. To
redeliver messages upon function error, you can use a dead-letter queue (DLQ). For more information, see Message Redelivery and
DLQ Handling
Note
Lambda does not support custom redelivery policies. Instead, Lambda uses a policy with the default values from the Redelivery PolicymaximumRedeliveries
set to 6.
Amazon MQ and RabbitMQ configuration parameters
All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Amazon MQ and RabbitMQ.
Event source parameters that apply to Amazon MQ and RabbitMQ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Parameter | Required | Default | Notes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BatchSize |
N |
100 |
Maximum: 10,000 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Enabled |
N |
true |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionName |
Y |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FilterCriteria |
N |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumBatchingWindowInSeconds |
N |
500 ms |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Queues |
N |
The name of the Amazon MQ broker destination queue to consume. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SourceAccessConfigurations |
N |
For ActiveMQ, BASIC_AUTH credentials. For RabbitMQ, can contain both BASIC_AUTH credentials and VIRTUAL_HOST information. |