Customization guide - Media Insights on AWS

Customization guide

Implementing a new operator in Media Insights on AWS

Operators are Lambda functions that derive new media objects from input media and/or generate metadata by analyzing input media. They run as part of a Media Insights on AWS workflow. Workflows are AWS Step Functions that define the order in which operators run.

Operators can be synchronous (sync) or asynchronous (async). Sync operators start an analysis (or transformation) job and get its result in a single Lambda function. Async operators use separate Lambda functions to start jobs and get their results. Typically, async operators run for several minutes.

Operator inputs can include a list of media, metadata, and user-defined workflow and/or operator configurations.

Operator outputs include the run status, and Amazon S3 locations for the newly derived media and metadata objects saved in Amazon S3. These outputs get passed to other operators in downstream workflow stages.

Step 1: Write operator Lambda functions

Time to complete: >1 hour

Operators reside under source/operators. Create a new folder there for your new operator. Copy source/operators/rekognition/generic_data_lookup.py to that new directory and change it to do what you require.

The Media Insights on AWS helper library must be used inside an operator to interact with the control plane and data plane. This library lives under lib/MediaInsightsEngineLambdaHelper/.

Using the Media Insights on AWS helper library

Instantiate the helper:

from MediaInsightsEngineLambdaHelper import OutputHelper output_object = OutputHelper(“<OPERATOR-NAME>”)

Get asset and workflow IDs

To make it easier to find results and know the provenance of data, operators should save the files that they generate to a directory path that is unique to the workflow run ID and asset ID (for example, s3// + dataplane_bucket + /private/assets/ + asset_id + /workflows/ + workflow_id + /). Obtain the workflow and asset IDs from the Lambda function’s entry point event object:

# Lambda function entrypoint: def lambda_handler(event, context): workflow_id = str(event["WorkflowExecutionId"]) asset_id = event['AssetId']

Get input media objects

Media objects are passed using their location in Amazon S3. Use the boto3 Amazon S3 client to access them from Amazon S3 using the locations specified in the Lambda function’s entry point event object:

def lambda_handler(event, context): if "Video" in event["Input"]["Media"]: s3bucket = event["Input"]["Media"]["Video"]["S3Bucket"] s3key = event["Input"]["Media"]["Video"]["S3Key"] elif "Image" in event["Input"]["Media"]: s3bucket = event["Input"]["Media"]["Image"]["S3Bucket"] s3key = event["Input"]["Media"]["Image"]["S3Key"]

Get operator configuration input

Operator configurations can be accessed from the Configuration attribute in the Lambda function’s entry point event object. For example, here's how the face search operator gets the user-specified face collection ID:

collection_id = event["Configuration"]["CollectionId"]

Write data to downstream operators

Metadata derived by an operator can be passed as an input to the next stage in a workflow by adding specified data to the operator's output_object. Do this with the add_workflow_metadata function in the OutputHelper, as shown below:

Important

The values for attributes must be strings. The values for attributes must not be empty strings.

from MediaInsightsEngineLambdaHelper import OutputHelper output_object = OutputHelper(“<OPERATOR-NAME>”) def lambda_handler(event, context): ... # Passing MyData objects to downstream operators output_object.add_workflow_metadata(MyData1=my_data_1) output_object.add_workflow_metadata(MyData2=my_data_2) # Multiple key value pairs can also be specified as a list, like this: output_object.add_workflow_metadata(MyData3=my_data_3, MyData4=my_data_4) ... return output_object.return_output_object()

Read data from upstream operators

Metadata that was output by upstream operators can be accessed from the Lambda function’s entry point event object:

my_data_1 = event["Input"]["MetaData"]["MyData1"]

Store media metadata to the data plane

Use store_asset_metadata() to store results. For paged results, call that function for each page:

from MediaInsightsEngineLambdaHelper import DataPlane dataplane = DataPlane() metadata_upload = dataplane.store_asset_metadata(asset_id, operator_name, workflow_id, response)

Store media objects to the data plane

Operators can derive new media objects. For example, the Amazon Transcribe operator derives a new text object from an input audio object. Save new media objects with add_media_object() as shown in the code:

from MediaInsightsEngineLambdaHelper import MediaInsightsOperationHelper operator_object = MediaInsightsOperationHelper(event) operator_object.add_media_object(my_media_type, bucket, key) The my_media_type variable should be "Video", "Audio", or "Text".

Retrieve media objects from the data plane

from MediaInsightsEngineLambdaHelper import MediaInsightsOperationHelper operator_object = MediaInsightsOperationHelper(event) bucket = operator_object.input["Media"][my_media_type]["S3Bucket"] key = operator_object.input["Media"][my_media_type]["S3Key"] s3_response = s3.get_object(Bucket=bucket, Key=key) Again, the my_media_type variable should be "Video", "Audio", or "Text".

Step 2: Add your operator to the Media Insights on AWS operator library

Time to complete: 30 minutes

This step involves editing the AWS CDK source for deploying the Media Insights on AWS operator library, located at source/cdk/lib/operator-library.ts. You must add new entries for your operator for the following resources:

  • Lambda functions

  • IAM roles

  • Register as operators in the control plane

  • Export operator names as outputs

Create the IAM role resource

Create an IAM resource to give your Lambda function the appropriate permissions. Media Insights on AWS operators need AWSLambdaBasicExecutionRole plus policies for any other AWS resource and services accessed by the Lambda function.

Create Lambda function resource

Create a Lambda function resource for your operator Lambda function using the createLambdaFunction helper function.

The parameters of the Lambda function are listed in the following table.

Parameter Description

id

Logical ID of the CloudFormation template resource.

codeArchive

Name of the compressed Lambda function source files generated in build-s3-dist.sh.

timeout

Number of seconds which the Lambda function times out.

props.handler

Name of the method in Lambda function to process events.

props.role

IAM resource created in the previous step.

props.tracing

Tracing settings for Lambda function.

props.memorySize

Size of memory to allocate to Lambda function.

props.environment

Environment variables for Lambda function.

Usage example:

const customOperator = createLambdaFunction(     this,     'customOperator',     "custom_operator.zip",     300,     {         handler: " custom_operator.lambda_handler",         role: customOperatorLambdaRole,         tracing: lambda.Tracing.PASS_THROUGH,         environment: {             OPERATOR_NAME: "CustomOperator",             DataplaneEndpoint,             DataLookupRole: genericDataLookupLambdaRole.roleArn,             botoConfig         }     } );

Create the Media Insights on AWS operator resource using your Lambda function(s) and export the operator as output

The createCustomResource helper Lambda function can be used to create both the custom resource and the output for the new custom operator.

The parameters of the Lambda function are listed in the following table.

Parameter Description

id

Logical ID of the CloudFormation template resource.

props.Name

Specify the name of the custom resource.

props.Description

Specify the description of the custom resource.

props.Async

Specify whether your operator is sync or async.

props.Configuration

Specify the MediaType and Enabled fields and add any other configurations needed.

props.StartLambdaArn

Specify the ARN of the Lambda function to start your operator.

props.MonitorLAmbdaArn

If your operator is async, specify the ARN of the monitoring Lambda function.

props.OutputName

Specify the OutputName of the operator. Same as id if not specified.

props.ExportName

Specify the ExportName of the operator. Same as props.Name if not specified.

Usage example:

createCustomResource(     this,     'customOperatorOperation',     {         Name: "customOperator",         Description: "Operation name of CustomOperator ",         Async: false,         Configuration: {             MediaType: "Video",             Enabled: false         },         StartLambdaArn: startGenericDataLookup.functionArn,         OutputName: "CustomOperator",         ExportName: "CustomOperator"     } );

Step 3: Update the build script to deploy your operator to AWS Lambda

Time to complete: 5 minutes

Update the Make lambda package section in build-s3-dist.sh to zip your operator’s Lambda function(s) into the regional distribution directory:

# Add operator code to a zip package for AWS Lambda zip my_operator.zip my_operator.py # Copy that zip to the regional distribution directory. cp "./dist/my_operator.zip" "$regional_dist_dir/ "

Step 4: Deploy your custom build

Run the build script to generate CloudFormation templates, then deploy them as described in the README.md file.

Step 5: Test your new workflow and operator

To test workflows and operators, submit requests to the workflow API endpoint using IAM authorization. Tools like Postman (described in the README.md file) and awscurl simplify IAM authorization. The following examples assume your AWS access key and secret key are set up correctly in awscurl:

Sample command to list all available workflows:

awscurl "$WORKFLOW_API_ENDPOINT"/workflow | cut -f 2 -d "'" | jq '.[].Name'

Sample command to list all stages in a workflow:

WORKFLOW_NAME="CasImageWorkflow" awscurl "$WORKFLOW_API_ENDPOINT"/workflow/"$WORKFLOW_NAME" | cut -f 2 -d "'" | jq -c '.Stages | .[].Name'

Sample command to get the workflow configuration for a stage:

WORKFLOW_NAME="CasImageWorkflow" STAGE_NAME="RekognitionStage" awscurl "$WORKFLOW_API_ENDPOINT"/workflow/"$WORKFLOW_NAME" | cut -f 2 -d "'" | jq -c '.Stages."$STAGE_NAME".Configuration'

Sample command to run a workflow with the default configuration:

WORKFLOW_NAME="CasImageWorkflow" aws s3 cp test_image.jpg s3://"$DATAPLANE_BUCKET"/ awscurl -X POST --data '{"Name":"$WORKFLOW_NAME", "Input":{"Media":{"Image":{"S3Bucket":"'$DATAPLANE_BUCKET'","S3Key":"test_image.jpg"}}}}' $WORKFLOW_API_ENDPOINT/workflow/execution

Sample command to run a workflow with a non-default configuration:

WORKFLOW_NAME="CasImageWorkflow" CONFIGURATION='{"RekognitionStage":{"faceDetectionImage":{"MediaType":"Image","Enabled":false},"celebrityRecognitionImage":{"MediaType":"Image","Enabled":false},"faceSearchImage":{"MediaType":"Image","Enabled":false},"contentModerationImage":{"MediaType":"Image","Enabled":false},"labelDetectionImage":{"MediaType":"Image","Enabled":false}}}' aws s3 cp test_image.jpg s3://"$DATAPLANE_BUCKET"/ awscurl -X POST --data '{"Name":"'$WORKFLOW_NAME'", "Configuration":'$CONFIGURATION', "Input":{"Media":{"Image":{"S3Bucket":"'$DATAPLANE_BUCKET'","S3Key":"test_image.jpg"}}}}' $WORKFLOW_API_ENDPOINT/workflow/execution

Monitor your test

You can monitor workflows with the following logs:

  • Your operator Lambda function. To find this log, search the Lambda functions for your operator name.

  • The data plane API Lambda function. To find this log, search Lambda functions for MediaInsightsDataplaneApiStack.

Validate metadata in the data plane

When your operator finishes successfully, then you can refer to data saved from the Dataplane.store_asset_metadata() function in the DataPlaneTable in Amazon DynamoDB.

Implementing a new data stream consumer

The data plane stores each item as an object in Amazon S3 and stores their Amazon S3 object identifier in DynamoDB. However, many application scenarios involve data access patterns that require capabilities beyond those provided by DynamoDB and Amazon S3. For example, you might need Amazon OpenSearch Service to support interactive analytics, Amazon SNS to provide real-time messaging and notifications, or Amazon QuickSight to support analytical reporting over big datasets.

The data plane provides a change-data-capture (CDC) stream from DynamoDB to communicate media analysis data to stream consumers, where ETL tasks can transform and load raw data to the downstream data stores that support end-user applications. This CDC stream is provided as a Kinesis Data Stream. The ARN for this is provided as an output called AnalyticsStreamArn in the base Media Insights on AWS CloudFormation stack, as shown below:

CloudFormation stack output showing AnalyticsStreamArn with Kinesis Data Stream ARN value.

For more information about how to implement Kinesis data stream consumers in Media Insights on AWS, refer to the Media Insights on AWS demo application, which includes a data stream consumer that feeds Amazon OpenSeach Service.