Loading streaming data from Amazon DynamoDB - Amazon OpenSearch Service

Loading streaming data from Amazon DynamoDB

You can use AWS Lambda to send data to your OpenSearch Service domain from Amazon DynamoDB. New data that arrives in the database table triggers an event notification to Lambda, which then runs your custom code to perform the indexing.

Prerequisites

Before proceeding, you must have the following resources.

Prerequisite Description
DynamoDB table

The table contains your source data. For more information, see Basic Operations on DynamoDB Tables in the Amazon DynamoDB Developer Guide.

The table must reside in the same Region as your OpenSearch Service domain and have a stream set to New image. To learn more, see Enabling a Stream.

OpenSearch Service domain The destination for data after your Lambda function processes it. For more information, see Creating OpenSearch Service domains.
IAM role

This role must have basic OpenSearch Service, DynamoDB, and Lambda execution permissions, such as the following:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

The role must have the following trust relationship:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

To learn more, see Creating IAM roles in the IAM User Guide.

Create the Lambda function

Follow the instructions in Create the Lambda deployment package, but create a directory named ddb-to-opensearch and use the following code for sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Edit the variables for region and host.

Install pip if you haven't already, then use the following commands to install your dependencies:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Then follow the instructions in Create the Lambda function, but specify the IAM role from Prerequisites and the following settings for the trigger:

  • Table: your DynamoDB table

  • Batch size: 100

  • Starting position: Trim horizon

To learn more, see Process New Items with DynamoDB Streams and Lambda in the Amazon DynamoDB Developer Guide.

At this point, you have a complete set of resources: a DynamoDB table for your source data, a DynamoDB stream of changes to the table, a function that runs after your source data changes and indexes those changes, and an OpenSearch Service domain for searching and visualization.

Test the Lambda function

After you create the function, you can test it by adding a new item to the DynamoDB table using the AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Then use the OpenSearch Service console or OpenSearch Dashboards to verify that lambda-index contains a document. You can also use the following request:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }