Using Lambda with self-managed Apache Kafka - AWS Lambda

Using Lambda with self-managed Apache Kafka

Lambda supports Apache Kafka as an event source. Apache Kafka is a an open-source event streaming platform that supports workloads such as data pipelines and streaming analytics.

You can use the AWS managed Kafka service Amazon Managed Streaming for Apache Kafka (Amazon MSK), or a self-managed Kafka cluster. For details about using Lambda with Amazon MSK, see Using Lambda with Amazon MSK.

This topic describes how to use Lambda with a self-managed Kafka cluster. In AWS terminology, a self-managed cluster includes non-AWS hosted Kafka clusters. For example, you can host your Kafka cluster with a cloud provider such as CloudKarafka. You can also use other AWS hosting options for your cluster. For more information, see Best Practices for Running Apache Kafka on AWS on the AWS Big Data Blog.

Apache Kafka as an event source operates similarly to using Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload. The maximum batch size is configurable. (The default is 100 messages.)

For an example of how to use self-managed Kafka as an event source, see Using self-hosted Apache Kafka as an event source for AWS Lambda on the AWS Compute Blog.

Lambda sends the batch of messages in the event parameter when it invokes your Lambda function. The event payload contains an array of messages. Each array item contains details of the Kafka topic and Kafka partition identifier, together with a timestamp and a base64-encoded message.

{ "eventSource":"aws:SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":"0", "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Managing access and permissions

For Lambda to poll your Apache Kafka topic and update other cluster resources, your Lambda function—as well as your AWS Identity and Access Management (IAM) users and roles—must have the following permissions.

Required Lambda function permissions

To create and store logs to a log group in Amazon CloudWatch Logs, your Lambda function must have the following permissions in its execution role:

Optional Lambda function permissions

Your Lambda function might need permission to describe your AWS Secrets Manager secret or your AWS Key Management Service (AWS KMS) customer managed customer master key (CMK), or to access your virtual private cloud (VPC).

Secrets Manager and AWS KMS permissions

If your Apache Kafka users access your Kafka brokers over the internet, you must specify a Secrets Manager secret. Your Lambda function might need permission to describe your Secrets Manager secret or to decrypt your AWS KMS customer managed CMK. To access these resources, your function's execution role must have the following permissions:

VPC permissions

If only users within a VPC can access your self-managed Apache Kafka cluster, your Lambda function must have permission to access your Amazon Virtual Private Cloud (Amazon VPC) resources. These resources include your VPC, subnets, security groups, and network interfaces. To access these resources, your function's execution role must have the following permissions:

Adding permissions to your execution role

To access other AWS services that your self-managed Apache Kafka cluster uses, Lambda uses the permissions policies that you define in your Lambda function's execution role.

By default, Lambda is not permitted to perform the required or optional actions for a self-managed Apache Kafka cluster. You must create and define these actions in an IAM trust policy, and then attach the policy to your execution role. This example shows how you might create a policy that allows Lambda to access your Amazon VPC resources.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"arn:aws:ec2:us-east-1:01234567890:instance/my-instance-name" } ] }

For information about creating a JSON policy document in the IAM console, see Creating policies on the JSON tab in the IAM User Guide.

Adding users to an IAM policy

By default, IAM users and roles do not have permission to perform event source API operations. To grant access to users in your organization or account, you might need to create an identity-based policy. For more information, see Controlling access to AWS resources using policies in the IAM User Guide.

Kafka authentication

Lambda supports several methods to authenticate with your self-managed Apache Kafka cluster. Make sure that you configure the Kafka cluster to use one of the following authentication methods that Lambda supports:

  • VPC

    If only Kafka users within your VPC access your Kafka brokers, you must configure the event source with VPC access.

  • SASL/SCRAM

    Lambda supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with TLS encryption. Lambda sends the encrypted credentials to authenticate with the cluster. Because the credentials are encrypted, the connection to the cluster does not need to be encrypted. For more information about SASL/SCRAM authentication, see RFC 5802.

  • SASL/PLAIN

    Lambda supports SASL/PLAIN authentication with TLS encryption. With SASL/PLAIN authentication, credentials are sent as clear text (unencrypted) to the server. Because the credentials are clear text, the connection to the server must use TLS encryption.

For SASL authentication, you must store the user name and password as a secret in Secrets Manager. For more information, see Tutorial: Creating and retrieving a secret in the AWS Secrets Manager User Guide.

Network configuration

If you configure Amazon VPC access to your Kafka brokers, Lambda must have access to the Amazon VPC resources.

Lambda must have access to the Amazon Virtual Private Cloud (Amazon VPC) resources associated with your Kafka cluster. We recommend that you deploy AWS PrivateLink VPC endpoints for Lambda and AWS Security Token Service (AWS STS). If authentication is required, also deploy a VPC endpoint for Secrets Manager.

Alternatively, ensure that the VPC associated with your Kafka cluster includes one NAT gateway per public subnet. For more information, see Internet and service access for VPC-connected functions.

You must configure your Amazon VPC security groups with the following rules (at minimum):

  • Inbound rules – Allow all traffic on all ports for the security group specified as your event source.

  • Outbound rules – Allow all traffic on all ports for all destinations.

For more information about configuring the network, see Setting up AWS Lambda with an Apache Kafka cluster within a VPC on the AWS Compute Blog.

Adding a Kafka cluster as an event source

To create an event source mapping, add your Kafka cluster as a Lambda function trigger using the Lambda console, an AWS SDK, or the AWS Command Line Interface (AWS CLI).

This section describes how to create an event source mapping using the Lambda console and the AWS CLI.

Prerequisites

  • A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.0.0 and later.

  • A Lambda execution role with permission to access the AWS resources that your self-managed Kafka cluster uses.

Adding a self-managed Kafka cluster (console)

Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.

To add an Apache Kafka trigger to your Lambda function (console)

  1. Open the Functions page of the Lambda console.

  2. Choose the name of your Lambda function.

  3. Under Function overview, choose Add trigger.

  4. Under Trigger configuration, do the following:

    1. Choose the Apache Kafka trigger type.

    2. For Bootstrap servers, enter the host and port pair address of a Kafka broker in your cluster, and then choose Add. Repeat for each Kafka broker in the cluster.

    3. For Topic name, enter the name of the Kafka topic used to store records in the cluster.

    4. (Optional) For Batch size, enter the maximum number of records to receive in a single batch.

    5. (Optional) For Starting position, choose Latest to start reading the stream from the latest record. Or, choose Trim horizon to start at the earliest available record.

  5. Under Authentication method, choose the access or authentication protocol of the Kafka brokers in your cluster. If only users within your VPC access your Kafka brokers, you must configure VPC access. If users access your Kafka brokers over the internet, you must configure SASL authentication.

    • To configure VPC access, choose the VPC for your Kafka cluster, then choose VPC subnets and VPC security groups.

    • To configure SASL authentication, under Secret key, choose Add, then do the following:

      1. Choose the key type. If your Kafka broker uses SASL plaintext, choose BASIC_AUTH. Otherwise, choose one of the SASL_SCRAM options.

      2. Choose the name of the Secrets Manager secret key that contains the credentials for your Kafka cluster.

  6. To create the trigger, choose Add.

Adding a self-managed Kafka cluster (AWS CLI)

Use the following example AWS CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.

Using SASL/SCRAM

If Kafka users access your Kafka brokers over the internet, you must specify the Secrets Manager secret that you created for SASL/SCRAM authentication. The following example uses the create-event-source-mapping AWS CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping --topics AWSKafkaTopic --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName --function-name arn:aws:lambda:us-east-1:01234567890:function:my-kafka-function --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

For more information, see the API reference documentation.

Using a VPC

If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC security group. The following example uses the create-event-source-mapping AWS CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping --topics AWSKafkaTopic --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' --function-name arn:aws:lambda:us-east-1:01234567890:function:my-kafka-function --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

For more information, see the API reference documentation.

Viewing the status using the AWS CLI

The following example uses the get-event-source-mapping AWS CLI command to describe the status of the event source mapping that you created.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Using a Kafka cluster as an event source

When you add your Apache Kafka cluster as a trigger for your Lambda function, the cluster is used as an event source.

Lambda reads event data from the Kafka topics that you specify in CreateEventSourceMapping Topics based on the starting position that you specify in CreateEventSourceMapping StartingPosition. After successful processing, your Kafka topic is committed to your Kafka cluster.

If you specify LATEST as the starting position, Lambda starts reading from the latest message in each partition belonging to the topic. Because there can be some delay after trigger configuration before Lambda starts reading the messages, Lambda does not read any messages produced during this window.

Lambda processes records from one or more Kafka topic partitions that you specify and sends a JSON payload to your Lambda function. When more records are available, Lambda continues processing records in batches, based on the value that you specify in CreateEventSourceMapping >BatchSize, until the function catches up with the topic.

If your Lambda function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire.

The maximum amount of time that Lambda lets a function run before stopping it is 14 minutes.

Auto scaling of the Kafka event source

When you initially create an Apache Kafka event source, Lambda allocates one consumer to process all of the partitions in the Kafka topic. Lambda automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

Every 15 minutes, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes consumers from the topic.

If your target Lambda function is overloaded, Lambda reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.

To monitor the throughput of your Kafka topic, you can view the Apache Kafka consumer metrics, such as consumer_lag and consumer_offset. To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function.

Event source API operations

When you add your Kafka cluster as an event source for your Lambda function using the Lambda console, an AWS SDK, or the AWS CLI, Lambda uses APIs to process your request.

To manage an event source with the AWS CLI or AWS SDK, you can use the following API operations:

Event source mapping errors

When you add your Apache Kafka cluster as an event source for your Lambda function, if your function encounters an error, your Kafka consumer stops processing records. Consumers of a topic partition are those that subscribe to, read, and process your records. Your other Kafka consumers can continue processing records, provided they don't encounter the same error.

To determine the cause of a stopped consumer, check the StateTransitionReason field in the response of EventSourceMapping. The following list describes the event source errors that you can receive:

ESM_CONFIG_NOT_VALID

The event source mapping configuration is not valid.

EVENT_SOURCE_AUTHN_ERROR

Lambda could not authenticate the event source.

EVENT_SOURCE_AUTHZ_ERROR

Lambda does not have the required permissions to access the event source.

FUNCTION_CONFIG_NOT_VALID

The function configuration is not valid.

Note

If your Lambda event records exceed the allowed size limit of 6 MB, they can go unprocessed.

Event source configuration parameters

All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Apache Kafka.

Event source parameters that apply to self-managed Apache Kafka
Parameter Required Default Notes

BatchSize

N

100

Maximum: 10,000

Enabled

N

Enabled

FunctionName

Y

SelfManagedEventSource

Y

List of Kafka Brokers. Can set only on Create

SourceAccessConfigurations

N

No credentials

VPC information or authentication credentials for the cluster

For SASL_PLAIN, set to BASIC_AUTH

StartingPosition

Y

TRIM_HORIZON or LATEST

Can set only on Create

Topics

Y

Topic name

Can set only on Create