Processing self-managed Apache Kafka messages with Lambda
Note
If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.
Topics
Adding a Kafka cluster as an event source
To create an event source mapping, add your Kafka
cluster as a Lambda function trigger using the Lambda
console, an AWS SDK
This section describes how to create an event source mapping using the Lambda console and the AWS CLI.
Prerequisites
-
A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.1.0 and later.
-
An execution role with permission to access the AWS resources that your self-managed Kafka cluster uses.
Customizable consumer group ID
When setting up Kafka as an event source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Kafka consumer group that you want your Lambda function to join. You can use this feature to seamlessly migrate any ongoing Kafka record processing setups from other consumers to Lambda.
If you specify a consumer group ID and there are other active pollers within that consumer group, Kafka distributes messages across all consumers. In other words, Lambda doesn't receive all message for the Kafka topic. If you want Lambda to handle all messages in the topic, turn off any other pollers in that consumer group.
Additionally, if you specify a consumer group ID, and Kafka finds a valid existing consumer group with the same ID, Lambda ignores the
StartingPosition
parameter for your event source mapping. Instead, Lambda begins processing records according to the committed
offset of the consumer group. If you specify a consumer group ID, and Kafka cannot find an existing consumer group, then Lambda configures your
event source with the specified StartingPosition
.
The consumer group ID that you specify must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.
Adding a self-managed Kafka cluster (console)
Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.
To add an Apache Kafka trigger to your Lambda function (console)
-
Open the Functions page
of the Lambda console. -
Choose the name of your Lambda function.
-
Under Function overview, choose Add trigger.
-
Under Trigger configuration, do the following:
-
Choose the Apache Kafka trigger type.
-
For Bootstrap servers, enter the host and port pair address of a Kafka broker in your cluster, and then choose Add. Repeat for each Kafka broker in the cluster.
-
For Topic name, enter the name of the Kafka topic used to store records in the cluster.
-
(Optional) For Batch size, enter the maximum number of records to receive in a single batch.
-
For Batch window, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function.
-
(Optional) For Consumer group ID, enter the ID of a Kafka consumer group to join.
-
(Optional) For Starting position, choose Latest to start reading the stream from the latest record, Trim horizon to start at the earliest available record, or At timestamp to specify a timestamp to start reading from.
-
(Optional) For VPC, choose the Amazon VPC for your Kafka cluster. Then, choose the VPC subnets and VPC security groups.
This setting is required if only users within your VPC access your brokers.
-
(Optional) For Authentication, choose Add, and then do the following:
-
Choose the access or authentication protocol of the Kafka brokers in your cluster.
-
If your Kafka broker uses SASL/PLAIN authentication, choose BASIC_AUTH.
-
If your broker uses SASL/SCRAM authentication, choose one of the SASL_SCRAM protocols.
-
If you're configuring mTLS authentication, choose the CLIENT_CERTIFICATE_TLS_AUTH protocol.
-
-
For SASL/SCRAM or mTLS authentication, choose the Secrets Manager secret key that contains the credentials for your Kafka cluster.
-
-
(Optional) For Encryption, choose the Secrets Manager secret containing the root CA certificate that your Kafka brokers use for TLS encryption, if your Kafka brokers use certificates signed by a private CA.
This setting applies to TLS encryption for SASL/SCRAM or SASL/PLAIN, and to mTLS authentication.
-
To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger.
-
-
To create the trigger, choose Add.
Adding a self-managed Kafka cluster (AWS CLI)
Use the following example AWS CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.
Using SASL/SCRAM
If Kafka users access your Kafka brokers over the internet, specify the Secrets Manager secret that you created
for SASL/SCRAM authentication. The following example uses the create-event-source-mapping
AWS CLI command to map a Lambda function named my-kafka-function
to a Kafka topic named AWSKafkaTopic
.
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333
:secret:MyBrokerSecretName
\ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Using a VPC
If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC
security group. The following example uses the create-event-source-mapping
AWS CLI command to map a Lambda function named my-kafka-function
to a Kafka topic named AWSKafkaTopic
.
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Viewing the status using the AWS CLI
The following example uses the get-event-source-mapping
AWS CLI command to describe the status of the event source mapping that you created.
aws lambda get-event-source-mapping --uuid
dh38738e-992b-343a-1077-3478934hjkfd7
Self-managed Apache Kafka configuration parameters
All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Apache Kafka.
Parameter | Required | Default | Notes |
---|---|---|---|
BatchSize |
N |
100 |
Maximum: 10,000 |
Enabled |
N |
Enabled |
none |
FunctionName |
Y |
N/A |
none |
FilterCriteria |
N |
N/A |
|
MaximumBatchingWindowInSeconds |
N |
500 ms |
|
SelfManagedEventSource |
Y |
N/A |
List of Kafka Brokers. Can set only on Create |
SelfManagedKafkaEventSourceConfig |
N |
Contains the ConsumerGroupId field which defaults to a unique value. |
Can set only on Create |
SourceAccessConfigurations |
N |
No credentials |
VPC information or authentication credentials for the cluster For SASL_PLAIN, set to BASIC_AUTH |
StartingPosition |
Y |
N/A |
AT_TIMESTAMP, TRIM_HORIZON, or LATEST Can set only on Create |
StartingPositionTimestamp |
N |
N/A |
Required if StartingPosition is set to AT_TIMESTAMP |
Topics |
Y |
N/A |
Topic name Can set only on Create |
Using a Kafka cluster as an event source
When you add your Apache Kafka or Amazon MSK cluster as a trigger for your Lambda function, the cluster is used as an event source.
Lambda reads event data from the Kafka topics that you specify as Topics
in a
CreateEventSourceMapping request, based on the StartingPosition
that you specify. After
successful processing, your Kafka topic is committed to your Kafka cluster.
If you specify the StartingPosition
as LATEST
, Lambda starts reading from the latest
message in each partition belonging to the topic. Because there can be some delay after trigger configuration
before Lambda starts reading the messages, Lambda doesn't read any messages produced during this window.
Lambda processes records from one or more Kafka topic partitions that you specify and sends a JSON payload to
your function. When more records are available, Lambda continues processing records in batches, based on the
BatchSize
value that you specify in a CreateEventSourceMapping request, until your function
catches up with the topic.
If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.
Note
While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.
Polling and stream starting positions
Be aware that stream polling during event source mapping creation and updates is eventually consistent.
-
During event source mapping creation, it may take several minutes to start polling events from the stream.
-
During event source mapping updates, it may take several minutes to stop and restart polling events from the stream.
This behavior means that if you specify LATEST
as the starting position for the stream, the event source mapping could
miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON
or AT_TIMESTAMP
.
Auto scaling of the Kafka event source
When you initially create an an Apache Kafka event source, Lambda allocates one consumer to process all partitions in the Kafka topic. Each consumer has multiple processors running in parallel to handle increased workloads. Additionally, Lambda automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.
In one-minute intervals, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes consumers from the topic. The scaling process of adding or removing consumers occurs within three minutes of evaluation.
If your target Lambda function is overloaded, Lambda reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.
To monitor the throughput of your Kafka topic, you can view the Apache Kafka consumer metrics, such as
consumer_lag
and consumer_offset
. To check how many function invocations occur in
parallel, you can also monitor the concurrency metrics for
your function.
Amazon CloudWatch metrics
Lambda emits the OffsetLag
metric while your function processes records. The value of this metric
is the difference in offset between the last record written to the Kafka event source topic and the last record that your function's
consumer group processed. You can use OffsetLag
to estimate the latency between when a record is added and when
your consumer group processes it.
An increasing trend in OffsetLag
can indicate issues with pollers in your function's consumer group. For more information, see
View metrics for Lambda functions.