Using Lambda with Amazon MSK - AWS Lambda

Using Lambda with Amazon MSK

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that you can use to build and run applications that use Apache Kafka to process streaming data. Amazon MSK simplifies the setup, scaling, and management of clusters running Kafka. Amazon MSK also makes it easier to configure your application for multiple Availability Zones and for security with AWS Identity and Access Management (IAM). Additionally, Amazon MSK supports multiple open-source versions of Kafka.

Amazon MSK as an event source operates similarly to using Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload. The maximum batch size is configurable. (The default is 100 messages.)

For an example of how to configure Amazon MSK as an event source, see Using Amazon MSK as an event source for AWS Lambda on the AWS Compute Blog. Also, see Amazon MSK Lambda Integration in the Amazon MSK Labs for a complete tutorial.

Lambda reads the messages sequentially for each partition. After Lambda processes each batch, it commits the offsets of the messages in that batch. If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire.

Lambda allows a function to run for up to 14 minutes before stopping it.

Lambda sends the batch of messages in the event parameter when it invokes your function. The event payload contains an array of messages. Each array item contains details of the Amazon MSK topic and partition identifier, together with a timestamp and a base64-encoded message.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":"0", "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Managing access and permissions

For Lambda to poll your Kafka topic and update other cluster resources, your Lambda function—as well as your IAM users and roles—must have the following permissions.

Required Lambda function permissions

Your Lambda function's execution role must have permission to read records from your Amazon MSK cluster on your behalf. You can either add the AWS managed policy AWSLambdaMSKExecutionRole to your execution role, or create a custom policy with permission to perform the following actions:

Adding a policy to your execution role

Follow these steps to add the AWS managed policy AWSLambdaMSKExecutionRole to your execution role using the IAM console.

To add an AWS managed policy

  1. Open the Policies page of the IAM console.

  2. In the search box, enter the policy name (AWSLambdaMSKExecutionRole).

  3. Select the policy from the list, and then choose Policy actions, Attach.

  4. On the Attach policy page, select your execution role from the list, and then choose Attach policy.

Granting users access with an IAM policy

By default, IAM users and roles do not have permission to perform Amazon MSK API operations. To grant access to users in your organization or account, you might need an identity-based policy. For more information, see Amazon MSK Identity-Based Policy Examples in the Amazon Managed Streaming for Apache Kafka Developer Guide.

Using SASL/SCRAM authentication

Amazon MSK supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with TLS encryption. You can control access to your Amazon MSK clusters by setting up user name and password authentication using an AWS Secrets Manager secret. For more information, see Username and password authentication with AWS Secrets Manager in the Amazon Managed Streaming for Apache Kafka Developer Guide.

Note that Amazon MSK does not support SASL/PLAIN authentication.

Network configuration

Lambda must have access to the Amazon Virtual Private Cloud (Amazon VPC) resources associated with your Amazon MSK cluster. We recommend that you deploy AWS PrivateLink VPC endpoints for Lambda and AWS Security Token Service (AWS STS). If authentication is required, also deploy a VPC endpoint for Secrets Manager.

Alternatively, ensure that the VPC associated with your Amazon MSK cluster includes one NAT gateway per public subnet. For more information, see Internet and service access for VPC-connected functions.

You must configure your Amazon VPC security groups with the following rules (at minimum):

  • Inbound rules – Allow all traffic on all ports for the security group specified as your event source.

  • Outbound rules – Allow all traffic on all ports for all destinations.

Note

Your Amazon VPC configuration is discoverable through the Amazon MSK API, and does not need to be configured during setup using the create-event-source-mapping command.

For more information about configuring the network, see Setting up AWS Lambda with an Apache Kafka cluster within a VPC on the AWS Compute Blog.

Adding Amazon MSK as an event source

To create an event source mapping, add Amazon MSK as a Lambda function trigger using the Lambda console, an AWS SDK, or the AWS Command Line Interface (AWS CLI).

This section describes how to create an event source mapping using the Lambda console and the AWS CLI.

Prerequisites

  • An Amazon MSK cluster and a Kafka topic. For more information, see Getting Started Using Amazon MSK in the Amazon Managed Streaming for Apache Kafka Developer Guide.

  • A Lambda execution role with permission to access the AWS resources that your Amazon MSK cluster uses.

Adding an Amazon MSK trigger (console)

Follow these steps to add your Amazon MSK cluster and a Kafka topic as a trigger for your Lambda function.

To add an Amazon MSK trigger to your Lambda function (console)

  1. Open the Functions page of the Lambda console.

  2. Choose the name of your Lambda function.

  3. Under Function overview, choose Add trigger.

  4. Under Trigger configuration, do the following:

    1. Choose the MSK trigger type.

    2. For MSK cluster, select your cluster.

    3. For Batch size, enter the maximum number of messages to receive in a single batch.

    4. For Topic name, enter the name of a Kafka topic.

    5. (Optional) For Starting position, choose Latest to start reading the stream from the latest record. Or, choose Trim horizon to start at the earliest available record.

    6. (Optional) For Secret key, choose the secret key for SASL/SCRAM authentication of the brokers in your Amazon MSK cluster.

    7. To create the trigger in a disabled state for testing (recommended), clear Enable trigger. Or, to enable the trigger immediately, select Enable trigger.

  5. To create the trigger, choose Add.

Adding an Amazon MSK trigger (AWS CLI)

Use the following example AWS CLI commands to create and view an Amazon MSK trigger for your Lambda function.

Creating a trigger using the AWS CLI

The following example uses the create-event-source-mapping AWS CLI command to map a Lambda function named my-kafka-function to a Kafka topic named AWSKafkaTopic. The topic's starting position is set to LATEST.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-west-2:arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function

For more information, see the API reference documentation.

Viewing the status using the AWS CLI

The following example uses the get-event-source-mapping AWS CLI command to describe the status of the event source mapping that you created.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Auto scaling of the Amazon MSK event source

When you initially create an Amazon MSK event source, Lambda allocates one consumer to process all of the partitions in the Kafka topic. Lambda automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

Every 15 minutes, Lambda evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than Lambda can process them. If necessary, Lambda adds or removes consumers from the topic.

If your target Lambda function is overloaded, Lambda reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.

To monitor the throughput of your Kafka topic, you can view the Amazon MSK consumer-lag metrics. To help you find the metrics for this Lambda function, the value of the consumer group field in the logs is set to the event source UUID.

To check how many function invocations occur in parallel, you can also monitor the concurrency metrics for your function.

Amazon MSK configuration parameters

All Lambda event source types share the same CreateEventSourceMapping and UpdateEventSourceMapping API operations. However, only some of the parameters apply to Amazon MSK.

Event source parameters that apply to Amazon MSK
Parameter Required Default Notes

BatchSize

N

100

Maximum: 10,000

Enabled

N

Enabled

EventSourceArn

Y

Can set only on Create

FunctionName

Y

SourceAccessConfigurations

N

No credentials

VPC information or SASL/SCRAM authentication credentials for your event source

StartingPosition

Y

TRIM_HORIZON or LATEST

Can set only on Create

Topics

Y

Kafka topic name

Can set only on Create