Example of streaming data from Amazon S3 - Text Analysis with Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) and Amazon Comprehend

Example of streaming data from Amazon S3

This solution provides a proxy endpoint that allows you to stream data from different sources into the Amazon OpenSearch Service domain.

This section provides an example of how to stream data from Amazon Simple Storage Service (Amazon S3). When new data is uploaded in an Amazon S3 bucket, an event notification is invoked, and an AWS Lambda function runs your custom code to perform the indexing.

A Python sample code is provided below to index one JSON document per line file.

Create an S3 bucket

Use the following AWS CLI command to create an Amazon S3 bucket. Replace <aws-region> and <bucket-name> with the applicable values.

aws --region <aws-region> s3api create-bucket --bucket <bucket-name> --create-bucket-configuration LocationConstraint=<aws-region>

Create a directory

Use the following example to write logic to handle an Amazon S3 event and stream it to the Amazon OpenSearch Service domain via the proxy endpoint, and create a directory.

Note that in this code sample, we use the name s3-to-es, then create a file in the directory named example.py. Make sure that you enter the applicable values into the Region, endpoint, and index variables.

import boto3 import re import requests import json from requests_aws4auth import AWS4Auth endpoint = '' # the proxy endpoint, including https:// region = '' # e.g. us-west-1 index = '' # index name service = 'execute-api' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) url = endpoint + '/' + index + '/_doc' headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() for line in lines: document = json.loads(line) r = requests.post(url, auth=awsauth, json=document, headers=headers) print(r)

Install dependencies

Install requests and requests_aws4auth libraries. Run the following command:

pip install requests -t . pip install requests_aws4auth -t .

Package the application code and dependencies

Run the following command:

zip -r lambda.zip *

Create a Lambda function with Python3.7 as runtime

  1. Create a trust-lambda-policy.json using the following JSON document.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
  2. Use the following command to create a Lambda execution role with permissions.

    aws iam create-role --role-name s3-to-es-test-role --assume-role-policy-document file://trust-lambda-policy.json
    aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name s3-to-es-test-role
    aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAPIGatewayInvokeFullAccess --role-name s3-to-es-test-role
    aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name s3-to-es-test-role
  3. Use the following command to create a Lambda function. Make sure that you replace <lambda-role-arn> with the role created with previous command.

    aws lambda create-function --function-name s3-to-es-test-lambda --runtime python3.7 --role <lambda-role-arn> --handler example.handler --zip-file fileb://lambda.zip --timeout 300
  4. Use the following command to allow Amazon S3 to invoke a Lambda function. Verify that the <bucket-name> and <your-account-id> are replaced with your information.

    aws lambda add-permission --function-name s3-to-es-test-lambda --statement-id allows3 --action "lambda:InvokeFunction" --principal s3.amazonaws.com --source-arn "arn:aws:s3:::<bucket-name>" --source-account <your-account-id>

Put S3 Bucket notification configuration

Use the following JSON code to create a file named bucket_lambda_notification_config.json with the following content. Make sure that you replace <lambda-function-arn> with your Lambda ARN.

{ "LambdaFunctionConfigurations": [ { "Id": "s3-to-proxy-es", "LambdaFunctionArn": "<lambda-function-arn>", "Events": ["s3:ObjectCreated:*"], "Filter": { "Key": { "FilterRules": [ { "Name": "prefix", "Value": "news" } ] } } } ] }

Run the following command to attach the notification to the bucket.

aws s3api put-bucket-notification-configuration --bucket <your-bucket-name> --notification-configuration file://bucket_lambda_notification_config.json

Create a sample.json file

Use the following command to copy the following example content into a file and name it as sample.json.

{"content": "Bob enjoys working as a software engineer in Amazon.com since 2000"} {"content": "Alice does not like the weather in Seattle"} {"content": "Alice like shopping at Amazon.com"}

Run the following command to put the sample.json file into the Amazon S3 bucket under corpus prefix.

aws s3 cp sample.json s3://<your-bucket-name>/news/sample.json