Tutorial: Using AWS Lambda with Amazon DynamoDB streams - AWS Lambda

Tutorial: Using AWS Lambda with Amazon DynamoDB streams

In this tutorial, you create a Lambda function to consume events from an Amazon DynamoDB stream.

Prerequisites

This tutorial assumes that you have some knowledge of basic Lambda operations and the Lambda console. If you haven't already, follow the instructions in Create a Lambda function with the console to create your first Lambda function.

To complete the following steps, you need the AWS CLI version 2. Commands and the expected output are listed in separate blocks:

aws --version

You should see the following output:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

For long commands, an escape character (\) is used to split a command over multiple lines.

On Linux and macOS, use your preferred shell and package manager.

Note

In Windows, some Bash CLI commands that you commonly use with Lambda (such as zip) are not supported by the operating system's built-in terminals. To get a Windows-integrated version of Ubuntu and Bash, install the Windows Subsystem for Linux. Example CLI commands in this guide use Linux formatting. Commands which include inline JSON documents must be reformatted if you are using the Windows CLI.

Create the execution role

Create the execution role that gives your function permission to access AWS resources.

To create an execution role
  1. Open the roles page in the IAM console.

  2. Choose Create role.

  3. Create a role with the following properties.

    • Trusted entity – Lambda.

    • PermissionsAWSLambdaDynamoDBExecutionRole.

    • Role namelambda-dynamodb-role.

The AWSLambdaDynamoDBExecutionRole has the permissions that the function needs to read items from DynamoDB and write logs to CloudWatch Logs.

Create the function

Create a Lambda function that processes your DynamoDB events. The function code writes some of the incoming event data to CloudWatch Logs.

.NET
AWS SDK for .NET
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); foreach (var record in dynamoEvent.Records) { context.Logger.LogInformation($"Event ID: {record.EventID}"); context.Logger.LogInformation($"Event Name: {record.EventName}"); context.Logger.LogInformation(JsonSerializer.Serialize(record)); } context.Logger.LogInformation("Stream processing complete."); } }
Go
SDK for Go V2
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/events" "fmt" ) func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) { if len(event.Records) == 0 { return nil, fmt.Errorf("received empty event") } for _, record := range event.Records { LogDynamoDBRecord(record) } message := fmt.Sprintf("Records processed: %d", len(event.Records)) return &message, nil } func main() { lambda.Start(HandleRequest) } func LogDynamoDBRecord(record events.DynamoDBEventRecord){ fmt.Println(record.EventID) fmt.Println(record.EventName) fmt.Printf("%+v\n", record.Change) }
Java
SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using Java.

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; import com.google.gson.Gson; import com.google.gson.GsonBuilder; public class example implements RequestHandler<DynamodbEvent, Void> { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); @Override public Void handleRequest(DynamodbEvent event, Context context) { System.out.println(GSON.toJson(event)); event.getRecords().forEach(this::logDynamoDBRecord); return null; } private void logDynamoDBRecord(DynamodbStreamRecord record) { System.out.println(record.getEventID()); System.out.println(record.getEventName()); System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb())); } }
JavaScript
SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); }; const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };

Consuming a DynamoDB event with Lambda using TypeScript.

export const handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); } const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };
PHP
SDK for PHP
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using PHP.

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\DynamoDb\DynamoDbHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends DynamoDbHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleDynamoDb(DynamoDbEvent $event, Context $context): void { $this->logger->info("Processing DynamoDb table items"); $records = $event->getRecords(); foreach ($records as $record) { $eventName = $record->getEventName(); $keys = $record->getKeys(); $old = $record->getOldImage(); $new = $record->getNewImage(); $this->logger->info("Event Name:".$eventName."\n"); $this->logger->info("Keys:". json_encode($keys)."\n"); $this->logger->info("Old Image:". json_encode($old)."\n"); $this->logger->info("New Image:". json_encode($new)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords items"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using Python.

import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
Ruby
SDK for Ruby
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using Ruby.

def lambda_handler(event:, context:) return 'received empty event' if event['Records'].empty? event['Records'].each do |record| log_dynamodb_record(record) end "Records processed: #{event['Records'].length}" end def log_dynamodb_record(record) puts record['eventID'] puts record['eventName'] puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}" end
Rust
SDK for Rust
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a DynamoDB event with Lambda using Rust.

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
To create the function
  1. Copy the sample code into a file named example.js.

  2. Create a deployment package.

    zip function.zip example.js
  3. Create a Lambda function with the create-function command.

    aws lambda create-function --function-name ProcessDynamoDBRecords \ --zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-dynamodb-role

Test the Lambda function

In this step, you invoke your Lambda function manually using the invoke AWS Lambda CLI command and the following sample DynamoDB event. Copy the following into a file named input.txt.

Example input.txt
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ] }

Run the following invoke command.

aws lambda invoke --function-name ProcessDynamoDBRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

The cli-binary-format option is required if you're using AWS CLI version 2. To make this the default setting, run aws configure set cli-binary-format raw-in-base64-out. For more information, see AWS CLI supported global command line options in the AWS Command Line Interface User Guide for Version 2.

The function returns the string message in the response body.

Verify the output in the outputfile.txt file.

Create a DynamoDB table with a stream enabled

Create an Amazon DynamoDB table with a stream enabled.

To create a DynamoDB table
  1. Open the DynamoDB console.

  2. Choose Create table.

  3. Create a table with the following settings.

    • Table namelambda-dynamodb-stream

    • Primary keyid (string)

  4. Choose Create.

To enable streams
  1. Open the DynamoDB console.

  2. Choose Tables.

  3. Choose the lambda-dynamodb-stream table.

  4. Under Exports and streams, choose DynamoDB stream details.

  5. Choose Turn on.

  6. For View type, choose Key attributes only.

  7. Choose Turn on stream.

Write down the stream ARN. You need this in the next step when you associate the stream with your Lambda function. For more information on enabling streams, see Capturing table activity with DynamoDB Streams.

Add an event source in AWS Lambda

Create an event source mapping in AWS Lambda. This event source mapping associates the DynamoDB stream with your Lambda function. After you create this event source mapping, AWS Lambda starts polling the stream.

Run the following AWS CLI create-event-source-mapping command. After the command runs, note down the UUID. You'll need this UUID to refer to the event source mapping in any commands, for example, when deleting the event source mapping.

aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \ --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn

This creates a mapping between the specified DynamoDB stream and the Lambda function. You can associate a DynamoDB stream with multiple Lambda functions, and associate the same Lambda function with multiple streams. However, the Lambda functions will share the read throughput for the stream they share.

You can get the list of event source mappings by running the following command.

aws lambda list-event-source-mappings

The list returns all of the event source mappings you created, and for each mapping it shows the LastProcessingResult, among other things. This field is used to provide an informative message if there are any problems. Values such as No records processed (indicates that AWS Lambda has not started polling or that there are no records in the stream) and OK (indicates AWS Lambda successfully read records from the stream and invoked your Lambda function) indicate that there are no issues. If there are issues, you receive an error message.

If you have a lot of event source mappings, use the function name parameter to narrow down the results.

aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords

Test the setup

Test the end-to-end experience. As you perform table updates, DynamoDB writes event records to the stream. As AWS Lambda polls the stream, it detects new records in the stream and invokes your Lambda function on your behalf by passing events to the function.

  1. In the DynamoDB console, add, update, and delete items to the table. DynamoDB writes records of these actions to the stream.

  2. AWS Lambda polls the stream and when it detects updates to the stream, it invokes your Lambda function by passing in the event data it finds in the stream.

  3. Your function runs and creates logs in Amazon CloudWatch. You can verify the logs reported in the Amazon CloudWatch console.

Clean up your resources

You can now delete the resources that you created for this tutorial, unless you want to retain them. By deleting AWS resources that you're no longer using, you prevent unnecessary charges to your AWS account.

To delete the Lambda function
  1. Open the Functions page of the Lambda console.

  2. Select the function that you created.

  3. Choose Actions, Delete.

  4. Type delete in the text input field and choose Delete.

To delete the execution role
  1. Open the Roles page of the IAM console.

  2. Select the execution role that you created.

  3. Choose Delete.

  4. Enter the name of the role in the text input field and choose Delete.

To delete the DynamoDB table
  1. Open the Tables page of the DynamoDB console.

  2. Select the table you created.

  3. Choose Delete.

  4. Enter delete in the text box.

  5. Choose Delete table.