Processing self-managed Apache Kafka messages with Lambda
Note
If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.
Topics
Adding a Kafka cluster as an event source
To create an event source mapping, add your Kafka
      cluster as a Lambda function trigger using the Lambda
      console, an AWS SDK
This section describes how to create an event source mapping using the Lambda console and the AWS CLI.
Prerequisites
- 
          A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.1.0 and later. 
- 
          An execution role with permission to access the AWS resources that your self-managed Kafka cluster uses. 
Customizable consumer group ID
When setting up Kafka as an event source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Kafka consumer group that you want your Lambda function to join. You can use this feature to seamlessly migrate any ongoing Kafka record processing setups from other consumers to Lambda.
If you specify a consumer group ID and there are other active pollers within that consumer group, Kafka distributes messages across all consumers. In other words, Lambda doesn't receive all message for the Kafka topic. If you want Lambda to handle all messages in the topic, turn off any other pollers in that consumer group.
Additionally, if you specify a consumer group ID, and Kafka finds a valid existing consumer group with the same ID, Lambda ignores the
      StartingPosition parameter for your event source mapping. Instead, Lambda begins processing records according to the committed
      offset of the consumer group. If you specify a consumer group ID, and Kafka cannot find an existing consumer group, then Lambda configures your
      event source with the specified StartingPosition.
The consumer group ID that you specify must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.
Adding a self-managed Kafka cluster (console)
Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.
To add an Apache Kafka trigger to your Lambda function (console)
- 
          Open the Functions page of the Lambda console. 
- 
          Choose the name of your Lambda function. 
- 
          Under Function overview, choose Add trigger. 
- 
          Under Trigger configuration, do the following: - 
              Choose the Apache Kafka trigger type. 
- 
              For Bootstrap servers, enter the host and port pair address of a Kafka broker in your cluster, and then choose Add. Repeat for each Kafka broker in the cluster. 
- 
              For Topic name, enter the name of the Kafka topic used to store records in the cluster. 
- 
              (Optional) For Batch size, enter the maximum number of records to receive in a single batch. 
- 
              For Batch window, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function. 
- 
              (Optional) For Consumer group ID, enter the ID of a Kafka consumer group to join. 
- 
              (Optional) For Starting position, choose Latest to start reading the stream from the latest record, Trim horizon to start at the earliest available record, or At timestamp to specify a timestamp to start reading from. 
- 
              (Optional) For VPC, choose the Amazon VPC for your Kafka cluster. Then, choose the VPC subnets and VPC security groups. This setting is required if only users within your VPC access your brokers. 
- 
              (Optional) For Authentication, choose Add, and then do the following: - 
                  Choose the access or authentication protocol of the Kafka brokers in your cluster. - 
                      If your Kafka broker uses SASL/PLAIN authentication, choose BASIC_AUTH. 
- 
                      If your broker uses SASL/SCRAM authentication, choose one of the SASL_SCRAM protocols. 
- 
                      If you're configuring mTLS authentication, choose the CLIENT_CERTIFICATE_TLS_AUTH protocol. 
 
- 
                      
- 
                  For SASL/SCRAM or mTLS authentication, choose the Secrets Manager secret key that contains the credentials for your Kafka cluster. 
 
- 
                  
- 
              (Optional) For Encryption, choose the Secrets Manager secret containing the root CA certificate that your Kafka brokers use for TLS encryption, if your Kafka brokers use certificates signed by a private CA. This setting applies to TLS encryption for SASL/SCRAM or SASL/PLAIN, and to mTLS authentication. 
- 
              To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger. 
 
- 
              
- 
          To create the trigger, choose Add. 
Adding a self-managed Kafka cluster (AWS CLI)
Use the following example AWS CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.
Using SASL/SCRAM
If Kafka users access your Kafka brokers over the internet, specify the Secrets Manager secret that you created
          for SASL/SCRAM authentication. The following example uses the create-event-source-mappingmy-kafka-function to a Kafka topic named AWSKafkaTopic.
aws lambda create-event-source-mapping \ --topicsAWSKafkaTopic\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName\ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
Using a VPC
If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC
          security group. The following example uses the create-event-source-mappingmy-kafka-function to a Kafka topic named AWSKafkaTopic.
aws lambda create-event-source-mapping \ --topicsAWSKafkaTopic\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
Viewing the status using the AWS CLI
The following example uses the get-event-source-mapping
aws lambda get-event-source-mapping --uuiddh38738e-992b-343a-1077-3478934hjkfd7
Self-managed Apache Kafka configuration parameters
All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Apache Kafka.
| Parameter | Required | Default | Notes | 
|---|---|---|---|
| BatchSize | N | 100 | Maximum: 10,000 | 
| DestinationConfig | N | N/A | Capturing discarded batches for a self-managed Apache Kafka event source | 
| Enabled | N | True | |
| FilterCriteria | N | N/A | |
| FunctionName | Y | N/A | |
| KMSKeyArn | N | N/A | |
| MaximumBatchingWindowInSeconds | N | 500 ms | |
| ProvisionedPollersConfig | N | 
 
 | |
| SelfManagedEventSource | Y | N/A | List of Kafka Brokers. Can set only on Create | 
| SelfManagedKafkaEventSourceConfig | N | Contains the ConsumerGroupId field which defaults to a unique value. | Can set only on Create | 
| SourceAccessConfigurations | N | No credentials | VPC information or authentication credentials for the cluster For SASL_PLAIN, set to BASIC_AUTH | 
| StartingPosition | Y | N/A | AT_TIMESTAMP, TRIM_HORIZON, or LATEST Can set only on Create | 
| StartingPositionTimestamp | N | N/A | Required if StartingPosition is set to AT_TIMESTAMP | 
| Tags | N | N/A | |
| Topics | Y | N/A | Topic name Can set only on Create | 
Using a Kafka cluster as an event source
When you add your Apache Kafka or Amazon MSK cluster as a trigger for your Lambda function, the cluster is used as an event source.
Lambda reads event data from the Kafka topics that you specify as Topics in a
      CreateEventSourceMapping request, based on the StartingPosition that you specify. After
      successful processing, your Kafka topic is committed to your Kafka cluster.
If you specify the StartingPosition as LATEST, Lambda starts reading from the latest
      message in each partition belonging to the topic. Because there can be some delay after trigger configuration
      before Lambda starts reading the messages, Lambda doesn't read any messages produced during this window.
Lambda processes records from one or more Kafka topic partitions that you specify and sends a JSON payload to
      your function. A single Lambda payload can contain messages from multiple partitions. When more records are available, 
      Lambda continues processing records in batches, based on the
      BatchSize value that you specify in a CreateEventSourceMapping request, until your function
      catches up with the topic.
If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.
Note
While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.
Polling and stream starting positions
Be aware that stream polling during event source mapping creation and updates is eventually consistent.
- 
        During event source mapping creation, it may take several minutes to start polling events from the stream. 
- 
        During event source mapping updates, it may take several minutes to stop and restart polling events from the stream. 
This behavior means that if you specify LATEST as the starting position for the stream, the event source mapping could 
    miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON 
    or AT_TIMESTAMP.
Message throughput scaling behavior for self-managed Apache Kafka event source mappings
You can choose between two modes of message throughput scaling behavior for your Amazon MSK event source mapping:
Default (on-demand) mode
When you initially create an self-managed Apache Kafka event source, Lambda allocates a default number of event pollers to process all partitions in the Kafka topic. Lambda automatically scales up or down the number of event pollers based on message load.
In one-minute intervals, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the offset lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes event pollers from the topic. This autoscaling process of adding or removing event pollers occurs within three minutes of evaluation.
If your target Lambda function is throttled, Lambda reduces the number of event pollers. This action reduces the workload on the function by reducing the number of messages that event pollers can retrieve and send to the function.
To monitor the throughput of your Kafka topic, you can view the Apache Kafka consumer metrics,
        such as consumer_lag and consumer_offset.
Configuring provisioned mode
For workloads where you need to fine-tune the throughput of your event source mapping, you can use provisioned mode. In provisioned mode, you define minimum and maximum limits for the amount of provisioned event pollers. These provisioned event pollers are dedicated to your event source mapping, and can handle unexpected message spikes instantly when they occur. We recommend that you use provisioned mode for Kafka workloads that have strict performance requirements.
In Lambda, an event poller is a compute unit capable of handling up to 5 MBps of throughput.
    For reference, suppose your event source produces an average payload of 1MB, and the average function duration is 1 sec.
    If the payload doesn’t undergo any transformation (such as filtering), a single poller can support 5 MBps throughput,
    and 5 concurrent Lambda invocations. Using provisioned mode incurs additional costs. For pricing estimates,
    see AWS Lambda pricing
In provisioned mode, the range of accepted values for the minimum number of event pollers
                (MinimumPollers) is between 1 and 200, inclusive. The range of
                accepted values for the maximum number of event pollers (MaximumPollers)
                is between 1 and 2,000, inclusive. MaximumPollers must be greater than
                or equal to MinimumPollers. In addition, to maintain ordered
                processing within partitions, Lambda caps the MaximumPollers to the
                number of partitions in the topic.
For more details about choosing appropriate values for minimum and maximum event pollers, see Best practices and considerations when using provisioned mode.
You can configure provisioned mode for your self-managed Apache Kafka event source mapping using the console or the Lambda API.
To configure provisioned mode for an existing self-managed Apache Kafka event source mapping (console)
- 
          Open the Functions page of the Lambda console. 
- 
          Choose the function with the self-managed Apache Kafka event source mapping you want to configure provisioned mode for. 
- 
          Choose Configuration, then choose Triggers. 
- 
          Choose the self-managed Apache Kafka event source mapping that you want to configure provisioned mode for, then choose Edit. 
- 
          Under Event source mapping configuration, choose Configure provisioned mode. - 
                            For Minimum event pollers, enter a value between 1 and 200. If you don't specify a value, Lambda chooses a default value of 1. 
- 
                            For Maximum event pollers, enter a value between 1 and 2,000. This value must be greater than or equal to your value for Minimum event pollers. If you don't specify a value, Lambda chooses a default value of 200. 
 
- 
                            
- 
          Choose Save. 
You can configure provisioned mode programmatically using the ProvisionedPollerConfig object
                in your 
                EventSourceMappingConfiguration. For example, the following UpdateEventSourceMapping CLI
                command configures a MinimumPollers value of 5, and a
                MaximumPollers value of 100.
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'
After configuring provisioned mode, you can observe the usage of event pollers for your workload by monitoring
    the ProvisionedPollers metric. For more information, see Event source mapping metrics.
To disable provisioned mode and return to default (on-demand) mode, you can use the following UpdateEventSourceMapping CLI command:
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'
Best practices and considerations when using provisioned mode
The optimal configuration of minimum and maximum event pollers for your event source mapping depends on your application's performance requirements. We recommend that you start with the default minimum event pollers to baseline the performance profile. Adjust your configuration based on observed message processing patterns and your desired performance profile.
For workloads with spiky traffic and strict performance needs, increase the minimum event pollers to handle sudden surges in messages. To determine the minimum event pollers required, consider your workload's messages per second and average payload size, and use the throughput capacity of a single event poller (up to 5 MBps) as a reference.
To maintain ordered processing within a partition, Lambda limits the maximum event pollers to the number of partitions in the topic. Additionally, the maximum event pollers your event source mapping can scale to depends on the function's concurrency settings.
When activating provisioned mode, update your network settings to remove AWS PrivateLink VPC endpoints and associated permissions.
Amazon CloudWatch metrics
Lambda emits the OffsetLag metric while your function processes records. The value of this metric
      is the difference in offset between the last record written to the Kafka event source topic and the last record that your function's 
      consumer group processed. You can use OffsetLag to estimate the latency between when a record is added and when
      your consumer group processes it.
An increasing trend in OffsetLag can indicate issues with pollers in your function's consumer group. For more information, see
      Using CloudWatch metrics with Lambda.