Tutorial: Using Lambda with Kinesis Data Streams
In this tutorial, you create a Lambda function to consume events from a Amazon Kinesis data stream.
-
Custom app writes records to the stream.
-
AWS Lambda polls the stream and, when it detects new records in the stream, invokes your Lambda
function.
-
AWS Lambda runs the Lambda function by assuming the execution role you specified at the time you created
the Lambda function.
Prerequisites
This tutorial assumes that you have some knowledge of basic Lambda operations and the Lambda console. If you
haven't already, follow the instructions in Create a Lambda function with the console to create your first Lambda function.
To complete the following steps, you need the AWS CLI version 2. Commands and the expected output are listed in separate blocks:
aws --version
You should see the following output:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
For long commands, an escape character (\
) is used to split a command over multiple lines.
On Linux and macOS, use your preferred shell and package manager.
In Windows, some Bash CLI commands that you commonly use with Lambda (such as zip
) are not supported by the operating system's built-in terminals.
To get a Windows-integrated version of Ubuntu and Bash, install the Windows Subsystem for Linux.
Example CLI commands in this guide use Linux formatting. Commands which include inline JSON documents must be reformatted if you are using the Windows CLI.
Create the execution role
Create the execution role that gives your function
permission to access AWS resources.
To create an execution role
-
Open the roles page in the IAM console.
-
Choose Create role.
-
Create a role with the following properties.
-
Trusted entity – AWS Lambda.
-
Permissions – AWSLambdaKinesisExecutionRole.
-
Role name – lambda-kinesis-role
.
The AWSLambdaKinesisExecutionRole policy has the permissions that the function needs to
read items from Kinesis and write logs to CloudWatch Logs.
Create the function
Create a Lambda function that processes your Kinesis messages. The function code logs the event ID
and event data of the Kinesis record to CloudWatch Logs.
This tutorial uses the Node.js 18.x runtime, but we've also provided example code in other runtime
languages. You can select the tab in the following box to see code for the runtime you're interested in.
The JavaScript code you'll use in this step is in the first example shown in the
JavaScript tab.
- .NET
-
- AWS SDK for .NET
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming a Kinesis event with Lambda using .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
- Go
-
- SDK for Go V2
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming a Kinesis event with Lambda using Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK for Java 2.x
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming a Kinesis event with Lambda using Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
@Override
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
}
}
- JavaScript
-
- SDK for JavaScript (v3)
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming a Kinesis event with Lambda using JavaScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
Consuming a Kinesis event with Lambda using TypeScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- SDK for PHP
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming an Kinesis event with Lambda using PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleKinesis(KinesisEvent $event, Context $context): void
{
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming a Kinesis event with Lambda using Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
-
- SDK for Ruby
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming an Kinesis event with Lambda using Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
end
end
puts "Successfully processed #{event['Records'].length} records."
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
end
- Rust
-
- SDK for Rust
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Consuming an Kinesis event with Lambda using Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
To create the function
-
Create a directory for the project, and then switch to that directory.
mkdir kinesis-tutorial
cd kinesis-tutorial
-
Copy the sample JavaScript code into a new file named index.js
.
-
Create a deployment package.
zip function.zip index.js
-
Create a Lambda function with the create-function
command.
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-kinesis-role
Test the
Lambda function
Invoke your Lambda function manually using the invoke
AWS Lambda CLI command and a sample Kinesis
event.
To test the Lambda function
-
Copy the following JSON into a file and save it as input.txt
.
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
}
]
}
-
Use the invoke
command to send the event to the function.
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
The cli-binary-format option is required if you're using AWS CLI version 2. To make this the default setting, run aws configure set cli-binary-format raw-in-base64-out
. For more information, see AWS CLI supported global command line options in the AWS Command Line Interface User Guide for Version 2.
The response is saved to out.txt
.
Use the create-stream
command to create a stream.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
Run the following describe-stream
command to get the stream ARN.
aws kinesis describe-stream --stream-name lambda-stream
You should see the following output:
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
}
}
You use the stream ARN in the next step to associate the stream with your Lambda function.
Run the following AWS CLI add-event-source
command.
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
Note the mapping ID for later use. You can get a list of event source mappings by running the
list-event-source-mappings
command.
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
In the response, you can verify the status value is enabled
. Event source mappings can be
disabled to pause polling temporarily without losing any records.
To test the event source mapping, add event records to your Kinesis stream. The --data
value is a
string that the CLI encodes to base64 prior to sending it to Kinesis. You can run the same command more than once to
add multiple records to the stream.
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda uses the execution role to read records from the stream. Then it invokes your Lambda function, passing in
batches of records. The function decodes data from each record and logs it, sending the output to CloudWatch Logs. View the
logs in the CloudWatch console.
Clean up your resources
You can now delete the resources that you created for this tutorial, unless you want to retain them. By deleting AWS resources that you're no longer using, you prevent unnecessary charges to your AWS account.
To delete the execution role
-
Open the Roles page of the IAM console.
-
Select the execution role that you created.
-
Choose Delete.
-
Enter the name of the role in the text input field and choose Delete.
To delete the Lambda function
-
Open the Functions page of the Lambda console.
-
Select the function that you created.
-
Choose Actions, Delete.
-
Type delete
in the text input field and choose Delete.
To delete the Kinesis stream
-
Sign in to the AWS Management Console and open the Kinesis console at
https://console.aws.amazon.com/kinesis.
-
Select the stream you created.
-
Choose Actions, Delete.
-
Enter delete
in the text input field.
-
Choose Delete.