Customization guide
Implementing a new operator in Media Insights on AWS
Operators are Lambda functions that derive new media objects
from input media and/or generate metadata by analyzing input
media. They run as part of a Media Insights on AWS workflow.
Workflows
are AWS Step Functions
Operators can be synchronous (sync) or asynchronous (async). Sync operators start an analysis (or transformation) job and get its result in a single Lambda function. Async operators use separate Lambda functions to start jobs and get their results. Typically, async operators run for several minutes.
Operator inputs can include a list of media, metadata, and user-defined workflow and/or operator configurations.
Operator outputs include the run status, and Amazon S3 locations for the newly derived media and metadata objects saved in Amazon S3. These outputs get passed to other operators in downstream workflow stages.
Step 1: Write operator Lambda functions
Time to complete: >1 hour
Operators reside under source/operators
. Create a new folder
there for your new operator.
Copy source/operators/rekognition/generic_data_lookup.py
to
that new directory and change it to do what you require.
The Media Insights on AWS helper library must be used inside
an operator to interact with the control plane and data plane.
This library lives under lib/MediaInsightsEngineLambdaHelper/
.
Using the Media Insights on AWS helper library
Instantiate the helper:
from MediaInsightsEngineLambdaHelper import OutputHelper output_object = OutputHelper(“<OPERATOR-NAME>”)
Get asset and workflow IDs
To make it easier to find results and know the provenance of
data, operators should save the files that they generate to a
directory path that is unique to the workflow run ID and asset
ID (for example, s3// + dataplane_bucket + /private/assets/ +
asset_id + /workflows/ + workflow_id + /
). Obtain the workflow
and asset IDs from the Lambda function’s entry point event
object:
# Lambda function entrypoint: def lambda_handler(event, context): workflow_id = str(event["WorkflowExecutionId"]) asset_id = event['AssetId']
Get input media objects
Media objects are passed using their location in Amazon S3. Use the boto3 Amazon S3 client to access them from Amazon S3 using the locations specified in the Lambda function’s entry point event object:
def lambda_handler(event, context): if "Video" in event["Input"]["Media"]: s3bucket = event["Input"]["Media"]["Video"]["S3Bucket"] s3key = event["Input"]["Media"]["Video"]["S3Key"] elif "Image" in event["Input"]["Media"]: s3bucket = event["Input"]["Media"]["Image"]["S3Bucket"] s3key = event["Input"]["Media"]["Image"]["S3Key"]
Get operator configuration input
Operator configurations can be accessed from the Configuration attribute in the Lambda function’s entry point event object. For example, here's how the face search operator gets the user-specified face collection ID:
collection_id = event["Configuration"]["CollectionId"]
Write data to downstream operators
Metadata derived by an operator can be passed as an input to the next stage in a workflow by adding specified data to the operator's output_object. Do this with the add_workflow_metadata function in the OutputHelper, as shown below:
Important
The values for attributes must be strings. The values for attributes must not be empty strings.
from MediaInsightsEngineLambdaHelper import OutputHelper output_object = OutputHelper(“<OPERATOR-NAME>”) def lambda_handler(event, context): ... # Passing MyData objects to downstream operators output_object.add_workflow_metadata(MyData1=my_data_1) output_object.add_workflow_metadata(MyData2=my_data_2) # Multiple key value pairs can also be specified as a list, like this: output_object.add_workflow_metadata(MyData3=my_data_3, MyData4=my_data_4) ... return output_object.return_output_object()
Read data from upstream operators
Metadata that was output by upstream operators can be accessed from the Lambda function’s entry point event object:
my_data_1 = event["Input"]["MetaData"]["MyData1"]
Store media metadata to the data plane
Use store_asset_metadata()
to store results. For paged
results, call that function for each page:
from MediaInsightsEngineLambdaHelper import DataPlane dataplane = DataPlane() metadata_upload = dataplane.store_asset_metadata(asset_id, operator_name, workflow_id, response)
Store media objects to the data plane
Operators can derive new media objects. For example, the
Amazon Transcribe operator derives a new text object from an
input audio object. Save new media objects
with add_media_object()
as shown in the code:
from MediaInsightsEngineLambdaHelper import MediaInsightsOperationHelper operator_object = MediaInsightsOperationHelper(event) operator_object.add_media_object(my_media_type, bucket, key) The my_media_type variable should be "Video", "Audio", or "Text".
Retrieve media objects from the data plane
from MediaInsightsEngineLambdaHelper import MediaInsightsOperationHelper operator_object = MediaInsightsOperationHelper(event) bucket = operator_object.input["Media"][my_media_type]["S3Bucket"] key = operator_object.input["Media"][my_media_type]["S3Key"] s3_response = s3.get_object(Bucket=bucket, Key=key) Again, the my_media_type variable should be "Video", "Audio", or "Text".
Step 2: Add your operator to the Media Insights on AWS operator library
Time to complete: 30 minutes
This step involves editing the AWS CDK source for deploying the Media Insights on
AWS operator library, located at source/cdk/lib/operator-library.ts
-
Lambda functions
-
IAM roles
-
Register as operators in the control plane
-
Export operator names as outputs
Create the IAM role resource
Create an IAM resource to give your Lambda function the appropriate permissions.
Media Insights on AWS operators need AWSLambdaBasicExecutionRole
plus
policies for any other AWS resource and services accessed by the Lambda function.
Create Lambda function resource
Create a Lambda function resource for your operator Lambda function using the createLambdaFunction
The parameters of the Lambda function are listed in the following table.
Parameter | Description |
---|---|
|
Logical ID of the CloudFormation template resource. |
|
Name of the compressed Lambda function source files generated in build-s3-dist.sh |
|
Number of seconds which the Lambda function times out. |
|
Name of the method in Lambda function to process events. |
|
IAM resource created in the previous step. |
|
Tracing settings for Lambda function. |
|
Size of memory to allocate to Lambda function. |
|
Environment variables for Lambda function. |
Usage example:
const customOperator = createLambdaFunction( this, 'customOperator', "custom_operator.zip", 300, { handler: " custom_operator.lambda_handler", role: customOperatorLambdaRole, tracing: lambda.Tracing.PASS_THROUGH, environment: { OPERATOR_NAME: "CustomOperator", DataplaneEndpoint, DataLookupRole: genericDataLookupLambdaRole.roleArn, botoConfig } } );
Create the Media Insights on AWS operator resource using your Lambda function(s) and export the operator as output
The createCustomResource
The parameters of the Lambda function are listed in the following table.
Parameter | Description |
---|---|
|
Logical ID of the CloudFormation template resource. |
|
Specify the name of the custom resource. |
|
Specify the description of the custom resource. |
|
Specify whether your operator
is |
|
Specify the |
|
Specify the ARN of the Lambda function to start your operator. |
|
If your operator is |
|
Specify the |
|
Specify the |
Usage example:
createCustomResource( this, 'customOperatorOperation', { Name: "customOperator", Description: "Operation name of CustomOperator ", Async: false, Configuration: { MediaType: "Video", Enabled: false }, StartLambdaArn: startGenericDataLookup.functionArn, OutputName: "CustomOperator", ExportName: "CustomOperator" } );
Step 3: Update the build script to deploy your operator to AWS Lambda
Time to complete: 5 minutes
Update the Make lambda package
section
in build-s3-dist.sh
# Add operator code to a zip package for AWS Lambda zip my_operator.zip my_operator.py # Copy that zip to the regional distribution directory. cp "./dist/my_operator.zip" "$regional_dist_dir/ "
Step 4: Deploy your custom build
Run the build script to generate CloudFormation templates,
then deploy them as described in
the README.md
Step 5: Test your new workflow and operator
To test workflows and operators, submit requests to the
workflow API endpoint using IAM authorization. Tools
like Postmanawscurl
:
Sample command to list all available workflows:
awscurl "$WORKFLOW_API_ENDPOINT"/workflow | cut -f 2 -d "'" | jq '.[].Name'
Sample command to list all stages in a workflow:
WORKFLOW_NAME="CasImageWorkflow" awscurl "$WORKFLOW_API_ENDPOINT"/workflow/"$WORKFLOW_NAME" | cut -f 2 -d "'" | jq -c '.Stages | .[].Name'
Sample command to get the workflow configuration for a stage:
WORKFLOW_NAME="CasImageWorkflow" STAGE_NAME="RekognitionStage" awscurl "$WORKFLOW_API_ENDPOINT"/workflow/"$WORKFLOW_NAME" | cut -f 2 -d "'" | jq -c '.Stages."$STAGE_NAME".Configuration'
Sample command to run a workflow with the default configuration:
WORKFLOW_NAME="CasImageWorkflow" aws s3 cp test_image.jpg s3://"$DATAPLANE_BUCKET"/ awscurl -X POST --data '{"Name":"$WORKFLOW_NAME", "Input":{"Media":{"Image":{"S3Bucket":"'$DATAPLANE_BUCKET'","S3Key":"test_image.jpg"}}}}' $WORKFLOW_API_ENDPOINT/workflow/execution
Sample command to run a workflow with a non-default configuration:
WORKFLOW_NAME="CasImageWorkflow" CONFIGURATION='{"RekognitionStage":{"faceDetectionImage":{"MediaType":"Image","Enabled":false},"celebrityRecognitionImage":{"MediaType":"Image","Enabled":false},"faceSearchImage":{"MediaType":"Image","Enabled":false},"contentModerationImage":{"MediaType":"Image","Enabled":false},"labelDetectionImage":{"MediaType":"Image","Enabled":false}}}' aws s3 cp test_image.jpg s3://"$DATAPLANE_BUCKET"/ awscurl -X POST --data '{"Name":"'$WORKFLOW_NAME'", "Configuration":'$CONFIGURATION', "Input":{"Media":{"Image":{"S3Bucket":"'$DATAPLANE_BUCKET'","S3Key":"test_image.jpg"}}}}' $WORKFLOW_API_ENDPOINT/workflow/execution
Monitor your test
You can monitor workflows with the following logs:
-
Your operator Lambda function. To find this log, search the Lambda functions for your operator name.
-
The data plane API Lambda function. To find this log, search Lambda functions for
MediaInsightsDataplaneApiStack
.
Validate metadata in the data plane
When your operator finishes successfully, then you can refer
to data saved from
the Dataplane.store_asset_metadata()
function in the
DataPlaneTable
in Amazon DynamoDB.
Implementing a new data stream consumer
The data plane stores each item as an object in Amazon S3 and
stores their Amazon S3 object identifier in DynamoDB. However,
many application scenarios involve data access patterns that
require capabilities beyond those provided by DynamoDB and
Amazon S3. For example, you might need Amazon OpenSearch Service
to support interactive analytics, Amazon SNS to provide
real-time messaging and notifications, or
Amazon QuickSight
The data plane provides a change-data-capture (CDC) stream from DynamoDB to communicate media analysis data to stream consumers, where ETL tasks can transform and load raw data to the downstream data stores that support end-user applications. This CDC stream is provided as a Kinesis Data Stream. The ARN for this is provided as an output called AnalyticsStreamArn in the base Media Insights on AWS CloudFormation stack, as shown below:
For more information about how to implement Kinesis data stream
consumers in Media Insights on AWS, refer to
the Media
Insights on AWS demo application