AWS IoT Analytics
AWS IoT Analytics User Guide

Pipeline Activities

The simplest functional pipeline connects a channel to a data store, which makes it a pipeline with two activities: a channel activity and a datastore activity. You can achieve more powerful message processing by adding additional activities to your pipeline.

AWS IoT Analytics provides the RunPipelineActivity command which allows you to simulate the results of running a pipeline activity on a message payload you provide. You might find this helpful when you are developing and debugging your pipeline activities. RunPipelineActivity Example demonstrates how it is used.

Channel Activity

The first activity in a pipeline must be the channel activity which determines the source of the messages to be processed.

{ "channel": { "name": "MyChannelActivity", "channelName": "mychannel", "next": "MyLambdaActivity" } }

Datastore Activity

The datastore activity, which specifies where to store the processed data, is the last activity.

{ "datastore": { "name": "MyDatastoreActivity", "datastoreName": "mydatastore" } }

Lambda Activity

A lambda activity can be used to perform more complex processing on the message. Examples include enriching the message with data from the output of external APIs or filtering the message based on logic from DynamoDB. However, you can use this activity to perform any sort of message-based processing, including filtering which messages are stored in the data store.

The AWS Lambda function used in this activity must receive and return an array of JSON objects. In the following example, the Lambda function modifies, and then returns, its event parameter.

The batchSize determines how many messages your Lambda function receives on each invocation. When you set it, keep in mind that an AWS Lambda function has a maximum timeout of five minutes. So the Lambda function must be able to process all messages in the batch in less than five minutes.

{ "lambda": { "name": "MyLambdaActivity", "lambdaName": "mylambda", "batchSize": 10, "next": "MyDatastoreActivity" } }

You must add a function policy to allow AWS IoT Analytics to invoke your Lambda function. Use the following CLI command:

aws lambda add-permission --function-name <lambda-function-name> --statement-id <your-statement> --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction

Here is an example of a Lambda function used in AWS IoT Analytics. Given a device that publishes a message with a payload similar to:

{ "thingid": "00001234abcd", "temperature": 26, "humidity": 29, "location": { "lat": 52.4332935, "lon": 13.231694 }, "ip": "192.168.178.54", "datetime": "2018-02-15T07:06:01" }

and the following pipeline definition:

{ "pipeline": { "activities": [ { "channel": { "channelName": "foobar_channel", "name": "foobar_channel_activity", "next": "lambda_foobar_activity" } }, { "lambda": { "lambdaName": "MyAnalyticsLambdaFunction", "batchSize": 5, "name": "lambda_foobar_activity", "next": "foobar_store_activity" } }, { "datastore": { "datastoreName": "foobar_datastore", "name": "foobar_store_activity" } } ], "name": "foobar_pipeline", "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline" } }

The following Lambda Python function (MyAnalyticsLambdaFunction) adds the GMaps URL and the temperature, in Fahrenheit, to the message:

import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def c_to_f(c): return 9.0/5.0 * c + 32 def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) maps_url = 'N/A' for e in event: #e['foo'] = 'addedByLambda' if 'location' in e: lat = e['location']['lat'] lon = e['location']['lon'] maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon) if 'temperature' in e: e['temperature_f'] = c_to_f(e['temperature']) logger.info("maps_url: {}".format(maps_url)) e['maps_url'] = maps_url logger.info("event after processing: {}".format(event)) return event

AddAttributes Activity

An addAttributes activity adds attributes based on existing attributes in the message. This lets you alter the shape of the message before it is stored, for example, to normalize data coming from different generations of device firmware.

This is best explained by example. Consider the input message:

{ "device": { "id": "device-123", "coord": [ 47.6152543, -122.3354883 ] } }

and an addAttributes activity that looks like this:

{ "addAttributes": { "name": "MyAddAttributesActivity", "attributes": { "device.id": "id", "device.coord[0]": "lat", "device.coord[1]": "lon" }, "next": "MyRemoveAttributesActivity" } }

This activity moves the device ID to the root level and extracts the values in the coord array, promoting them to top-level attributes called lat and lon. As a result of this activity, the input message is transformed to the following:

{ "device": { "id": "device-123", "coord": [ 47.6, -122.3 ] }, "id": "device-123", "lat": 47.6, "lon": -122.3 }

The original device attribute is still present. If you want to remove it, you can use the removeAttributes activity.

RemoveAttributes Activity

A removeAttributes activity removes attributes from a message. For example, given the message that was the result of the addAttributes activity:

{ "device": { "id": "device-123", "coord": [ 47.6, -122.3 ] }, "id": "device-123", "lat": 47.6, "lon": -122.3 }

To normalize that message so that it includes only the required data at the root level, use the following removeAttributes activity:

{ "removeAttributes": { "name": "MyRemoveAttributesActivity", "attributes": [ "device" ], "next": "MyDatastoreActivity" } }

This results in the following message flowing along the pipeline:

{ "id": "device-123", "lat": 47.6, "lon": -122.3 }

SelectAttributes Activity

The selectAttributes activity creates a new message using only the specified attributes from the original message. Every other attribute is dropped. selectAttributes creates new attributes under the root of the message only. So given this message:

{ "device": { "id": "device-123", "coord": [ 47.6152543, -122.3354883 ], "temp": 50, "hum": 40 }, "light": 90 }

and this activity:

{ "selectAttributes": { "name": "MySelectAttributesActivity", "attributes": [ "device.temp", "device.hum", "light" ], "next": "MyDatastoreActivity" } }

The result is the following message flowing through the pipeline:

{ "temp": 50, "hum": 40, "light": 90 }

Again, selectAttributes can only create root-level objects.

Filter Activity

A filter activity filters a message based on its attributes. The expression used in this activity looks like an SQL WHERE clause which must return a boolean:

{ "filter": { "name": "MyFilterActivity", "filter": "temp > 40 AND hum < 20", "next": "MyDatastoreActivity" } }

DeviceRegistryEnrich Activity

The deviceRegistryEnrich activity allows you to add data from the AWS IoT device registry to your message payload. For example, given the following message:

{ "temp": 50, "hum": 40, "device" { "thingName": "my-thing" } }

and a deviceRegistryEnrich activity that looks like this:

{ "deviceRegistryEnrich": { "name": "MyDeviceRegistryEnrichActivity", "attribute": "metadata", "thingName": "device.thingName", "roleArn": "arn:aws:iam::<your-account-number>:role:MyEnrichRole", "next": "MyDatastoreActivity" } }

The output message now looks like this:

{ "temp" : 50, "hum" : 40, "device" { "thingName" : "my-thing" }, "metadata" : { "defaultClientId": "my-thing", "thingTypeName": "my-thing", "thingArn": "arn:aws:iot:us-east-1:<your-account-number>:thing/my-thing", "version": 1, "thingName": "my-thing", "attributes": {}, "thingId": "aaabbbccc-dddeeef-gghh-jjkk-llmmnnoopp" } }

You must specify a role in the roleArn field of the activity definition that has the appropriate permissions attached. The role must have a permissions policy that looks like:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iot:DescribeThing" ], "Resource": [ "arn:aws:iot:<region>:<account-id>:thing/<thing-name> ] } ] }

and a trust policy that looks like:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "sts:AssumeRole" ] } ] }

DeviceShadowEnrich Activity

A deviceShadowEnrich activity adds information from the AWS IoT Device Shadows service to a message. For example, given the message:

{ "temp": 50, "hum": 40, "device": { "thingName": "my-thing" } }

and the following deviceShadowEnrich activity:

{ "deviceShadowEnrich": { "name": "MyDeviceShadowEnrichActivity", "attribute": "shadow", "thingName": "device.thingName", "roleArn": "arn:aws:iam::<your-account-number>:role:MyEnrichRole", "next": "MyDatastoreActivity" } }

the result is a message that looks like this:

{ "temp": 50, "hum": 40, "device": { "thingName": "my-thing" }, "shadow": { "state": { "desired": { "attributeX": valueX, ... }, "reported": { "attributeX": valueX, ... }, "delta": { "attributeX": valueX, ... } }, "metadata": { "desired": { "attribute1": { "timestamp": timestamp }, ... }, "reported": ": { "attribute1": { "timestamp": timestamp }, ... } }, "timestamp": timestamp, "clientToken": "token", "version": version } }

You must specify a role in the roleArn field of the activity definition that has the appropriate permissions attached. The role must have a permissions policy that looks like:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iot:GetThingShadow" ], "Resource": [ "arn:aws:iot:<region>:<account-id>:thing/<thing-name> ] } ] }

and a trust policy that looks like:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "sts:AssumeRole" ] } ] }

Math Activity

A math activity computes an arithmetic expression using the message's attributes. The expression must return a number. For example, given the following input message:

{ "tempF": 50, }

after processing by the following math activity:

{ "math": { "name": "MyMathActivity", "math": "(tempF - 32) / 2", "attribute": "tempC", "next": "MyDatastoreActivity" } }

the resulting message looks like:

{ "tempF" : 50, "tempC": 9 }

RunPipelineActivity Example

Here is an example of how you would use the "RunPipelineActivity" command to test a pipeline activity. For this example we test a Math activity:

  1. Create a file "maths.json" which contains the definition of the pipeline activity you want to test:

    { "math": { "name": "MyMathActivity", "math": "((temp - 32) * 5.0) / 9.0", "attribute": "tempC" } }
  2. Create a file "payloads.json" which contains the example payloads that are used to test the pipeline activity:

    [ "{\"humidity\": 52, \"temp\": 68 }", "{\"humidity\": 52, \"temp\": 32 }" ]
  3. Call "RunPipelineActivities" from the command line:

    aws iotanalytics run-pipeline-activity --pipeline-activity file://maths.json --payloads file://payloads.json
  4. This produces the following results:

    { "logResult": "", "payloads": [ "eyJodW1pZGl0eSI6NTIsInRlbXAiOjY4LCJ0ZW1wQyI6MjB9", "eyJodW1pZGl0eSI6NTIsInRlbXAiOjMyLCJ0ZW1wQyI6MH0=" ] }
  5. The "payloads" listed in the results are Base64-encoded strings. When these strings are decoded, you get the following results:

    {"humidity":52,"temp":68,"tempC":20} {"humidity":52,"temp":32,"tempC":0}