Using Lambda with self-managed Apache Kafka - AWS Lambda

Using Lambda with self-managed Apache Kafka

Note

If you want to send data to a target other than a Lambda function or enrich the data before sending it, see Amazon EventBridge Pipes.

Lambda supports Apache Kafka as an event source. Apache Kafka is a an open-source event streaming platform that supports workloads such as data pipelines and streaming analytics.

You can use the AWS managed Kafka service Amazon Managed Streaming for Apache Kafka (Amazon MSK), or a self-managed Kafka cluster. For details about using Lambda with Amazon MSK, see Using Lambda with Amazon MSK.

This topic describes how to use Lambda with a self-managed Kafka cluster. In AWS terminology, a self-managed cluster includes non-AWS hosted Kafka clusters. For example, you can host your Kafka cluster with a cloud provider such as Confluent Cloud.

Apache Kafka as an event source operates similarly to using Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload. The maximum batch size is configurable. (The default is 100 messages.)

Warning

Lambda event source mappings process each event at least once, and duplicate processing of records can occur. To avoid potential issues related to duplicate events, we strongly recommend that you make your function code idempotent. To learn more, see How do I make my Lambda function idempotent in the AWS Knowledge Center.

For Kafka-based event sources, Lambda supports processing control parameters, such as batching windows and batch size. For more information, see Batching behavior.

For an example of how to use self-managed Kafka as an event source, see Using self-hosted Apache Kafka as an event source for AWS Lambda on the AWS Compute Blog.

Example event

Lambda sends the batch of messages in the event parameter when it invokes your Lambda function. The event payload contains an array of messages. Each array item contains details of the Kafka topic and Kafka partition identifier, together with a timestamp and a base64-encoded message.

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Kafka cluster authentication

Lambda supports several methods to authenticate with your self-managed Apache Kafka cluster. Make sure that you configure the Kafka cluster to use one of these supported authentication methods. For more information about Kafka security, see the Security section of the Kafka documentation.

VPC access

If only Kafka users within your VPC access your Kafka brokers, you must configure the Kafka event source for Amazon Virtual Private Cloud (Amazon VPC) access.

SASL/SCRAM authentication

Lambda supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with Transport Layer Security (TLS) encryption (SASL_SSL). Lambda sends the encrypted credentials to authenticate with the cluster. Lambda doesn't support SASL/SCRAM with plaintext (SASL_PLAINTEXT). For more information about SASL/SCRAM authentication, see RFC 5802.

Lambda also supports SASL/PLAIN authentication. Because this mechanism uses clear text credentials, the connection to the server must use TLS encryption to ensure that the credentials are protected.

For SASL authentication, you store the sign-in credentials as a secret in AWS Secrets Manager. For more information about using Secrets Manager, see Tutorial: Create and retrieve a secret in the AWS Secrets Manager User Guide.

Important

To use Secrets Manager for authentication, secrets must be stored in the same AWS region as your Lambda function.

Mutual TLS authentication

Mutual TLS (mTLS) provides two-way authentication between the client and server. The client sends a certificate to the server for the server to verify the client, and the server sends a certificate to the client for the client to verify the server.

In self-managed Apache Kafka, Lambda acts as the client. You configure a client certificate (as a secret in Secrets Manager) to authenticate Lambda with your Kafka brokers. The client certificate must be signed by a CA in the server's trust store.

The Kafka cluster sends a server certificate to Lambda to authenticate the Kafka brokers with Lambda. The server certificate can be a public CA certificate or a private CA/self-signed certificate. The public CA certificate must be signed by a certificate authority (CA) that's in the Lambda trust store. For a private CA/self-signed certificate, you configure the server root CA certificate (as a secret in Secrets Manager). Lambda uses the root certificate to verify the Kafka brokers.

For more information about mTLS, see Introducing mutual TLS authentication for Amazon MSK as an event source.

Configuring the client certificate secret

The CLIENT_CERTIFICATE_TLS_AUTH secret requires a certificate field and a private key field. For an encrypted private key, the secret requires a private key password. Both the certificate and private key must be in PEM format.

Note

Lambda supports the PBES1 (but not PBES2) private key encryption algorithms.

The certificate field must contain a list of certificates, beginning with the client certificate, followed by any intermediate certificates, and ending with the root certificate. Each certificate must start on a new line with the following structure:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager supports secrets up to 65,536 bytes, which is enough space for long certificate chains.

The private key must be in PKCS #8 format, with the following structure:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

For an encrypted private key, use the following structure:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

The following example shows the contents of a secret for mTLS authentication using an encrypted private key. For an encrypted private key, include the private key password in the secret.

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Configuring the server root CA certificate secret

You create this secret if your Kafka brokers use TLS encryption with certificates signed by a private CA. You can use TLS encryption for VPC, SASL/SCRAM, SASL/PLAIN, or mTLS authentication.

The server root CA certificate secret requires a field that contains the Kafka broker's root CA certificate in PEM format. The following example shows the structure of the secret.

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

Managing API access and permissions

In addition to accessing your self-managed Kafka cluster, your Lambda function needs permissions to perform various API actions. You add these permissions to the function's execution role. If your users need access to any API actions, add the required permissions to the identity policy for the AWS Identity and Access Management (IAM) user or role.

Required Lambda function permissions

To create and store logs in a log group in Amazon CloudWatch Logs, your Lambda function must have the following permissions in its execution role:

Optional Lambda function permissions

Your Lambda function might also need permissions to:

  • Describe your Secrets Manager secret.

  • Access your AWS Key Management Service (AWS KMS) customer managed key.

  • Access your Amazon VPC.

  • Send records of failed invocations to a destination.

Secrets Manager and AWS KMS permissions

Depending on the type of access control that you're configuring for your Kafka brokers, your Lambda function might need permission to access your Secrets Manager secret or to decrypt your AWS KMS customer managed key. To access these resources, your function's execution role must have the following permissions:

VPC permissions

If only users within a VPC can access your self-managed Apache Kafka cluster, your Lambda function must have permission to access your Amazon VPC resources. These resources include your VPC, subnets, security groups, and network interfaces. To access these resources, your function's execution role must have the following permissions:

Adding permissions to your execution role

To access other AWS services that your self-managed Apache Kafka cluster uses, Lambda uses the permissions policies that you define in your Lambda function's execution role.

By default, Lambda is not permitted to perform the required or optional actions for a self-managed Apache Kafka cluster. You must create and define these actions in an IAM trust policy, and then attach the policy to your execution role. This example shows how you might create a policy that allows Lambda to access your Amazon VPC resources.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

For information about creating a JSON policy document in the IAM console, see Creating policies on the JSON tab in the IAM User Guide.

Granting users access with an IAM policy

By default, users and roles don't have permission to perform event source API operations. To grant access to users in your organization or account, you create or update an identity-based policy. For more information, see Controlling access to AWS resources using policies in the IAM User Guide.

Authentication and authorization errors

If any of the permissions required to consume data from the Kafka cluster are missing, Lambda displays one of the following error messages in the event source mapping under LastProcessingResult.

Cluster failed to authorize Lambda

For SASL/SCRAM or mTLS, this error indicates that the provided user doesn't have all of the following required Kafka access control list (ACL) permissions:

  • DescribeConfigs Cluster

  • Describe Group

  • Read Group

  • Describe Topic

  • Read Topic

When you create Kafka ACLs with the required kafka-cluster permissions, specify the topic and group as resources. The topic name must match the topic in the event source mapping. The group name must match the event source mapping's UUID.

After you add the required permissions to the execution role, it might take several minutes for the changes to take effect.

SASL authentication failed

For SASL/SCRAM or SASL/PLAIN, this error indicates that the provided sign-in credentials aren't valid.

Server failed to authenticate Lambda

This error indicates that the Kafka broker failed to authenticate Lambda. This can occur for any of the following reasons:

  • You didn't provide a client certificate for mTLS authentication.

  • You provided a client certificate, but the Kafka brokers aren't configured to use mTLS authentication.

  • A client certificate isn't trusted by the Kafka brokers.

Lambda failed to authenticate server

This error indicates that Lambda failed to authenticate the Kafka broker. This can occur for any of the following reasons:

  • The Kafka brokers use self-signed certificates or a private CA, but didn't provide the server root CA certificate.

  • The server root CA certificate doesn't match the root CA that signed the broker's certificate.

  • Hostname validation failed because the broker's certificate doesn't contain the broker's DNS name or IP address as a subject alternative name.

Provided certificate or private key is invalid

This error indicates that the Kafka consumer couldn't use the provided certificate or private key. Make sure that the certificate and key use PEM format, and that the private key encryption uses a PBES1 algorithm.

Network configuration

For Lambda to use your Kafka cluster as an event source, it needs access to the Amazon VPC your cluster resides in. We recommend that you deploy AWS PrivateLink VPC endpoints for Lambda to access your VPC. Deploy endpoints for Lambda and AWS Security Token Service (AWS STS). If the broker uses authentication, also deploy a VPC endpoint for Secrets Manager. If you configured an on-failure destination, also deploy a VPC endpoint for the destination service.

Alternatively, ensure that the VPC associated with your Kafka cluster includes one NAT gateway per public subnet. For more information, see Enable internet access for VPC-connected Lambda functions.

If you use VPC endpoints, you must also configure them to enable private DNS names.

When you create an event source mapping for a self-managed Apache Kafka cluster, Lambda checks whether Elastic Network Interfaces (ENIs) are already present for the subnets and security groups of your cluster’s VPC. If Lambda finds existing ENIs, it attempts to re-use them. Otherwise, Lambda creates new ENIs to connect to the event source and invoke your function.

Note

Lambda functions always run inside VPCs owned by the Lambda service. These VPCs are maintained automatically by the service and are not visible to customers. You can also connect your function to an Amazon VPC. In either case, your function’s VPC configuration doesn’t affect the event source mapping. Only the configuration of the event source’s VPC determines how Lambda connects to your event source.

For more information about configuring the network, see Setting up AWS Lambda with an Apache Kafka cluster within a VPC on the AWS Compute Blog.

VPC security group rules

Configure the security groups for the Amazon VPC containing your cluster with the following rules (at minimum):

  • Inbound rules – Allow all traffic on the Kafka broker port for the security groups specified for your event source. Kafka uses port 9092 by default.

  • Outbound rules – Allow all traffic on port 443 for all destinations. Allow all traffic on the Kafka broker port for the security groups specified for your event source. Kafka uses port 9092 by default.

  • If you are using VPC endpoints instead of a NAT gateway, the security groups associated with the VPC endpoints must allow all inbound traffic on port 443 from the event source's security groups.

Working with VPC endpoints

When you use VPC endpoints, API calls to invoke your function are routed through these endpoints using the ENIs. The Lambda service principal needs to call sts:AssumeRole and lambda:InvokeFunction on any roles and functions that use those ENIs.

By default, VPC endpoints have IAM policies which are open. Best practice is to restrict these policies to allow only specific principals to perform the needed actions using that endpoint. To ensure that your event source mapping is able to invoke your Lambda function, the VPC endpoint policy must allow the Lambda service principle to call sts:AssumeRole and lambda:InvokeFunction. Restricting your VPC endpoint policies to allow only API calls originating within your organization prevents the event source mapping from functioning properly.

The following example VPC endpoint policies show how to grant the required access to the Lambda service principal for the AWS STS and Lambda endpoints.

Example VPC endpoint policy - AWS STS endpoint
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
Example VPC endpoint policy - Lambda endpoint
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

If your Kafka broker uses authentication, you can also restrict the VPC endpoint policy for the Secrets Manager endpoint. To call the Secrets Manager API, Lambda uses your function role, not the Lambda service principal. The following example shows a Secrets Manager endpoint policy.

Example VPC endpoint policy - Secrets Manager endpoint
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

If you have an on-failure destination configured, Lambda also uses your function’s role to call either s3:PutObject, sns:Publish, or sqs:sendMessage using the Lambda-managed ENIs.

Adding a Kafka cluster as an event source

To create an event source mapping, add your Kafka cluster as a Lambda function trigger using the Lambda console, an AWS SDK, or the AWS Command Line Interface (AWS CLI).

This section describes how to create an event source mapping using the Lambda console and the AWS CLI.

Prerequisites

  • A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.1.0 and later.

  • An execution role with permission to access the AWS resources that your self-managed Kafka cluster uses.

Customizable consumer group ID

When setting up Kafka as an event source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Kafka consumer group that you want your Lambda function to join. You can use this feature to seamlessly migrate any ongoing Kafka record processing setups from other consumers to Lambda.

If you specify a consumer group ID and there are other active pollers within that consumer group, Kafka distributes messages across all consumers. In other words, Lambda doesn't receive all message for the Kafka topic. If you want Lambda to handle all messages in the topic, turn off any other pollers in that consumer group.

Additionally, if you specify a consumer group ID, and Kafka finds a valid existing consumer group with the same ID, Lambda ignores the StartingPosition parameter for your event source mapping. Instead, Lambda begins processing records according to the committed offset of the consumer group. If you specify a consumer group ID, and Kafka cannot find an existing consumer group, then Lambda configures your event source with the specified StartingPosition.

The consumer group ID that you specify must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value.

Adding a self-managed Kafka cluster (console)

Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.

To add an Apache Kafka trigger to your Lambda function (console)
  1. Open the Functions page of the Lambda console.

  2. Choose the name of your Lambda function.

  3. Under Function overview, choose Add trigger.

  4. Under Trigger configuration, do the following:

    1. Choose the Apache Kafka trigger type.

    2. For Bootstrap servers, enter the host and port pair address of a Kafka broker in your cluster, and then choose Add. Repeat for each Kafka broker in the cluster.

    3. For Topic name, enter the name of the Kafka topic used to store records in the cluster.

    4. (Optional) For Batch size, enter the maximum number of records to receive in a single batch.

    5. For Batch window, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function.

    6. (Optional) For Consumer group ID, enter the ID of a Kafka consumer group to join.

    7. (Optional) For Starting position, choose Latest to start reading the stream from the latest record, Trim horizon to start at the earliest available record, or At timestamp to specify a timestamp to start reading from.

    8. (Optional) For VPC, choose the Amazon VPC for your Kafka cluster. Then, choose the VPC subnets and VPC security groups.

      This setting is required if only users within your VPC access your brokers.

    9. (Optional) For Authentication, choose Add, and then do the following:

      1. Choose the access or authentication protocol of the Kafka brokers in your cluster.

        • If your Kafka broker uses SASL/PLAIN authentication, choose BASIC_AUTH.

        • If your broker uses SASL/SCRAM authentication, choose one of the SASL_SCRAM protocols.

        • If you're configuring mTLS authentication, choose the CLIENT_CERTIFICATE_TLS_AUTH protocol.

      2. For SASL/SCRAM or mTLS authentication, choose the Secrets Manager secret key that contains the credentials for your Kafka cluster.

    10. (Optional) For Encryption, choose the Secrets Manager secret containing the root CA certificate that your Kafka brokers use for TLS encryption, if your Kafka brokers use certificates signed by a private CA.

      This setting applies to TLS encryption for SASL/SCRAM or SASL/PLAIN, and to mTLS authentication.

    11. To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger.

  5. To create the trigger, choose Add.

Adding a self-managed Kafka cluster (AWS CLI)

Use the following example AWS CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.

Using SASL/SCRAM

If Kafka users access your Kafka brokers over the internet, specify the Secrets Manager secret that you created for SASL/SCRAM authentication. The following example uses the create-event-source-mapping AWS CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Using a VPC

If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC security group. The following example uses the create-event-source-mapping AWS CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Viewing the status using the AWS CLI

The following example uses the get-event-source-mapping AWS CLI command to describe the status of the event source mapping that you created.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

On-failure destinations

To retain records of failed event source mapping invocations, add a destination to your function's event source mapping. Each record sent to the destination is a JSON document with metadata about the failed invocation. You can configure any Amazon SNS topic, Amazon SQS queue, or S3 bucket as a destination. Your execution role must have permissions for the destination:

Additionally, if you configured a KMS key on your destination, Lambda needs the following permissions depending on the destination type:

  • If you've enabled encryption with your own KMS key for an S3 destination, kms:GenerateDataKey is required. If the KMS key and S3 bucket destination are in a different account from your Lambda function and execution role, configure the KMS key to trust the execution role to allow kms:GenerateDataKey.

  • If you've enabled encryption with your own KMS key for SQS destination, kms:Decrypt and kms:GenerateDataKey are required. If the KMS key and SQS queue destination are in a different account from your Lambda function and execution role, configure the KMS key to trust the execution role to allow kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey, and kms:ReEncrypt.

  • If you've enabled encryption with your own KMS key for SNS destination, kms:Decrypt and kms:GenerateDataKey are required. If the KMS key and SNS topic destination are in a different account from your Lambda function and execution role, configure the KMS key to trust the execution role to allow kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey, and kms:ReEncrypt.

To configure an on-failure destination using the console, follow these steps:

  1. Open the Functions page of the Lambda console.

  2. Choose a function.

  3. Under Function overview, choose Add destination.

  4. For Source, choose Event source mapping invocation.

  5. For Event source mapping, choose an event source that's configured for this function.

  6. For Condition, select On failure. For event source mapping invocations, this is the only accepted condition.

  7. For Destination type, choose the destination type that Lambda sends invocation records to.

  8. For Destination, choose a resource.

  9. Choose Save.

You can also configure an on-failure destination using the AWS CLI. For example, the following create-event-source-mapping command adds an event source mapping with an SQS on-failure destination to MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

The following update-event-source-mapping command adds an S3 on-failure destination to the event source associated with the input uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

To remove a destination, supply an empty string as the argument to the destination-config parameter:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

SNS and SQS example invocation record

The following example shows what Lambda sends to an SNS topic or SQS queue destination for a failed Kafka event source invocation. Each of the keys under recordsInfo contains both the Kafka topic and partition, separated by a hyphen. For example, for the key "Topic-0", Topic is the Kafka topic, and 0 is the partition. For each topic and partition, you can use the offsets and timestamp data to find the original invocation records.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 destination example invocation record

For S3 destinations, Lambda sends the entire invocation record along with the metadata to the destination. The following example shows that Lambda sends to an S3 bucket destination for a failed Kafka event source invocation. In addition to all of the fields from the previous example for SQS and SNS destinations, the payload field contains the original invocation record as an escaped JSON string.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Tip

We recommend enabling S3 versioning on your destination bucket.

Using a Kafka cluster as an event source

When you add your Apache Kafka cluster as a trigger for your Lambda function, the cluster is used as an event source.

Lambda reads event data from the Kafka topics that you specify as Topics in a CreateEventSourceMapping request, based on the StartingPosition that you specify. After successful processing, your Kafka topic is committed to your Kafka cluster.

If you specify the StartingPosition as LATEST, Lambda starts reading from the latest message in each partition belonging to the topic. Because there can be some delay after trigger configuration before Lambda starts reading the messages, Lambda doesn't read any messages produced during this window.

Lambda processes records from one or more Kafka topic partitions that you specify and sends a JSON payload to your function. When more records are available, Lambda continues processing records in batches, based on the BatchSize value that you specify in a CreateEventSourceMapping request, until your function catches up with the topic.

If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.

Note

While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes. This constraint ensures that the event source mapping can properly handle function errors and retries.

Polling and stream starting positions

Be aware that stream polling during event source mapping creation and updates is eventually consistent.

  • During event source mapping creation, it may take several minutes to start polling events from the stream.

  • During event source mapping updates, it may take several minutes to stop and restart polling events from the stream.

This behavior means that if you specify LATEST as the starting position for the stream, the event source mapping could miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON or AT_TIMESTAMP.

Auto scaling of the Kafka event source

When you initially create an an Apache Kafka event source, Lambda allocates one consumer to process all partitions in the Kafka topic. Each consumer has multiple processors running in parallel to handle increased workloads. Additionally, Lambda automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

In one-minute intervals, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes consumers from the topic. The scaling process of adding or removing consumers occurs within three minutes of evaluation.

If your target Lambda function is overloaded, Lambda reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.

To monitor the throughput of your Kafka topic, you can view the Apache Kafka consumer metrics, such as consumer_lag and consumer_offset. To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function.

Event source mapping errors

When you add your Apache Kafka cluster as an event source for your Lambda function, if your function encounters an error, your Kafka consumer stops processing records. Consumers of a topic partition are those that subscribe to, read, and process your records. Your other Kafka consumers can continue processing records, provided they don't encounter the same error.

To determine the cause of a stopped consumer, check the StateTransitionReason field in the response of EventSourceMapping. The following list describes the event source errors that you can receive:

ESM_CONFIG_NOT_VALID

The event source mapping configuration isn't valid.

EVENT_SOURCE_AUTHN_ERROR

Lambda couldn't authenticate the event source.

EVENT_SOURCE_AUTHZ_ERROR

Lambda doesn't have the required permissions to access the event source.

FUNCTION_CONFIG_NOT_VALID

The function configuration isn't valid.

Note

If your Lambda event records exceed the allowed size limit of 6 MB, they can go unprocessed.

Amazon CloudWatch metrics

Lambda emits the OffsetLag metric while your function processes records. The value of this metric is the difference in offset between the last record written to the Kafka event source topic and the last record that your function's consumer group processed. You can use OffsetLag to estimate the latency between when a record is added and when your consumer group processes it.

An increasing trend in OffsetLag can indicate issues with pollers in your function's consumer group. For more information, see Working with Lambda function metrics.

Self-managed Apache Kafka configuration parameters

All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Apache Kafka.

Event source parameters that apply to self-managed Apache Kafka
Parameter Required Default Notes

BatchSize

N

100

Maximum: 10,000

Enabled

N

Enabled

FunctionName

Y

FilterCriteria

N

Lambda event filtering

MaximumBatchingWindowInSeconds

N

500 ms

Batching behavior

SelfManagedEventSource

Y

List of Kafka Brokers. Can set only on Create

SelfManagedKafkaEventSourceConfig

N

Contains the ConsumerGroupId field which defaults to a unique value.

Can set only on Create

SourceAccessConfigurations

N

No credentials

VPC information or authentication credentials for the cluster

For SASL_PLAIN, set to BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, or LATEST

Can set only on Create

StartingPositionTimestamp

N

Required if StartingPosition is set to AT_TIMESTAMP

Topics

Y

Topic name

Can set only on Create