Pipeline Activities
The simplest functional pipeline connects a channel to a data store, which makes it
a pipeline with two activities:
a channel
activity and a datastore
activity. You can achieve more powerful message processing by adding
additional activities to your pipeline.
AWS IoT Analytics provides the RunPipelineActivity command which allows you to simulate the results of running a pipeline activity on a message payload you provide. You might find this helpful when you are developing and debugging your pipeline activities. RunPipelineActivity Example demonstrates how it is used.
Channel Activity
The first activity in a pipeline must be the channel
activity which determines the source of the messages
to be processed.
{ "channel": { "name": "MyChannelActivity", "channelName": "mychannel", "next": "MyLambdaActivity" } }
Datastore Activity
The datastore
activity, which specifies where to store the processed data, is the last activity.
{ "datastore": { "name": "MyDatastoreActivity", "datastoreName": "mydatastore" } }
Lambda Activity
A lambda
activity can be used to perform more complex processing on the message. Examples
include
enriching the message with data from the output of external APIs or filtering the
message based on logic
from DynamoDB. However, you can use this activity to perform any sort of message-based
processing,
including filtering which messages are stored in the data store.
The AWS Lambda function used in this activity must receive and return an array of JSON objects.
In the following example, the Lambda function modifies, and then returns,
its event
parameter.
The batchSize
determines how many messages your Lambda function receives on each invocation.
When you set it, keep in mind that an AWS Lambda function has a maximum timeout of
five minutes.
So the Lambda function must be able to process all messages in the batch in less than
five minutes.
{ "lambda": { "name": "MyLambdaActivity", "lambdaName": "mylambda", "batchSize": 10, "next": "MyDatastoreActivity" } }
You must add a function policy to allow AWS IoT Analytics to invoke your Lambda function. Use the following CLI command:
aws lambda add-permission --function-name <lambda-function-name> --statement-id <your-statement> --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction
Here is an example of a Lambda function used in AWS IoT Analytics. Given a device that publishes a message with a payload similar to:
{ "thingid": "00001234abcd", "temperature": 26, "humidity": 29, "location": { "lat": 52.4332935, "lon": 13.231694 }, "ip": "192.168.178.54", "datetime": "2018-02-15T07:06:01" }
and the following pipeline definition:
{ "pipeline": { "activities": [ { "channel": { "channelName": "foobar_channel", "name": "foobar_channel_activity", "next": "lambda_foobar_activity" } }, { "lambda": { "lambdaName": "MyAnalyticsLambdaFunction", "batchSize": 5, "name": "lambda_foobar_activity", "next": "foobar_store_activity" } }, { "datastore": { "datastoreName": "foobar_datastore", "name": "foobar_store_activity" } } ], "name": "foobar_pipeline", "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline" } }
The following Lambda Python function (MyAnalyticsLambdaFunction) adds the GMaps URL and the temperature, in Fahrenheit, to the message:
import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def c_to_f(c): return 9.0/5.0 * c + 32 def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) maps_url = 'N/A' for e in event: #e['foo'] = 'addedByLambda' if 'location' in e: lat = e['location']['lat'] lon = e['location']['lon'] maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon) if 'temperature' in e: e['temperature_f'] = c_to_f(e['temperature']) logger.info("maps_url: {}".format(maps_url)) e['maps_url'] = maps_url logger.info("event after processing: {}".format(event)) return event
AddAttributes Activity
An addAttributes
activity adds attributes based on existing attributes in the message. This
lets you alter the shape of the message before it is stored, for example, to normalize
data coming
from different generations of device firmware.
This is best explained by example. Consider the input message:
{ "device": { "id": "device-123", "coord": [ 47.6152543, -122.3354883 ] } }
and an addAttributes
activity that looks like this:
{ "addAttributes": { "name": "MyAddAttributesActivity", "attributes": { "device.id": "id", "device.coord[0]": "lat", "device.coord[1]": "lon" }, "next": "MyRemoveAttributesActivity" } }
This activity moves the device ID to the root level and extracts the values in the
coord
array,
promoting them to top-level attributes called lat
and lon
. As a result of this activity, the
input message is transformed to the following:
{ "device": { "id": "device-123", "coord": [ 47.6, -122.3 ] }, "id": "device-123", "lat": 47.6, "lon": -122.3 }
The original device attribute is still present. If you want to remove it, you
can use the removeAttributes
activity.
RemoveAttributes Activity
A removeAttributes
activity removes attributes from a message. For example, given the message that was
the result of the addAttributes
activity:
{ "device": { "id": "device-123", "coord": [ 47.6, -122.3 ] }, "id": "device-123", "lat": 47.6, "lon": -122.3 }
To normalize that message so that it includes only the required data at the root level,
use the following removeAttributes
activity:
{ "removeAttributes": { "name": "MyRemoveAttributesActivity", "attributes": [ "device" ], "next": "MyDatastoreActivity" } }
This results in the following message flowing along the pipeline:
{ "id": "device-123", "lat": 47.6, "lon": -122.3 }
SelectAttributes Activity
The selectAttributes
activity creates a new message using only the specified attributes from the original
message. Every other attribute is dropped. selectAttributes
creates new attributes
under the root of the message only. So given this message:
{ "device": { "id": "device-123", "coord": [ 47.6152543, -122.3354883 ], "temp": 50, "hum": 40 }, "light": 90 }
and this activity:
{ "selectAttributes": { "name": "MySelectAttributesActivity", "attributes": [ "device.temp", "device.hum", "light" ], "next": "MyDatastoreActivity" } }
The result is the following message flowing through the pipeline:
{ "temp": 50, "hum": 40, "light": 90 }
Again, selectAttributes
can only create root-level objects.
Filter Activity
A filter
activity filters a message based on its attributes. The expression used in this activity
looks
like an SQL WHERE clause which must return a boolean:
{ "filter": { "name": "MyFilterActivity", "filter": "temp > 40 AND hum < 20", "next": "MyDatastoreActivity" } }
DeviceRegistryEnrich Activity
The deviceRegistryEnrich
activity allows you to add data from the AWS IoT device registry to your message
payload. For example, given the following message:
{ "temp": 50, "hum": 40, "device" { "thingName": "my-thing" } }
and a deviceRegistryEnrich
activity that looks like this:
{ "deviceRegistryEnrich": { "name": "MyDeviceRegistryEnrichActivity", "attribute": "metadata", "thingName": "device.thingName", "roleArn": "arn:aws:iam::<your-account-number>:role:MyEnrichRole", "next": "MyDatastoreActivity" } }
The output message now looks like this:
{ "temp" : 50, "hum" : 40, "device" { "thingName" : "my-thing" }, "metadata" : { "defaultClientId": "my-thing", "thingTypeName": "my-thing", "thingArn": "arn:aws:iot:us-east-1:<your-account-number>:thing/my-thing", "version": 1, "thingName": "my-thing", "attributes": {}, "thingId": "aaabbbccc-dddeeef-gghh-jjkk-llmmnnoopp" } }
You must specify a role in the roleArn
field of the activity definition that has the appropriate
permissions attached. The role must have a permissions policy that looks like:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iot:DescribeThing" ], "Resource": [ "arn:aws:iot:<region>:<account-id>:thing/<thing-name> ] } ] }
and a trust policy that looks like:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "sts:AssumeRole" ] } ] }
DeviceShadowEnrich Activity
A deviceShadowEnrich
activity adds information from the AWS IoT Device Shadows service to a
message. For example, given the message:
{ "temp": 50, "hum": 40, "device": { "thingName": "my-thing" } }
and the following deviceShadowEnrich
activity:
{ "deviceShadowEnrich": { "name": "MyDeviceShadowEnrichActivity", "attribute": "shadow", "thingName": "device.thingName", "roleArn": "arn:aws:iam::<your-account-number>:role:MyEnrichRole", "next": "MyDatastoreActivity" } }
the result is a message that looks like this:
{ "temp": 50, "hum": 40, "device": { "thingName": "my-thing" }, "shadow": { "state": { "desired": { "attributeX": valueX, ... }, "reported": { "attributeX": valueX, ... }, "delta": { "attributeX": valueX, ... } }, "metadata": { "desired": { "attribute1": { "timestamp": timestamp }, ... }, "reported": ": { "attribute1": { "timestamp": timestamp }, ... } }, "timestamp": timestamp, "clientToken": "token", "version": version } }
You must specify a role in the roleArn
field of the activity definition that has the appropriate
permissions attached. The role must have a permissions policy that looks like:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iot:GetThingShadow" ], "Resource": [ "arn:aws:iot:<region>:<account-id>:thing/<thing-name> ] } ] }
and a trust policy that looks like:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "sts:AssumeRole" ] } ] }
Math Activity
A math
activity computes an arithmetic expression using the message's attributes. The expression
must
return a number. For example, given the following input message:
{ "tempF": 50, }
after processing by the following math
activity:
{ "math": { "name": "MyMathActivity", "math": "(tempF - 32) / 2", "attribute": "tempC", "next": "MyDatastoreActivity" } }
the resulting message looks like:
{ "tempF" : 50, "tempC": 9 }
RunPipelineActivity Example
Here is an example of how you would use the "RunPipelineActivity" command to test a pipeline activity. For this example we test a Math activity:
-
Create a file "maths.json" which contains the definition of the pipeline activity you want to test:
{ "math": { "name": "MyMathActivity", "math": "((temp - 32) * 5.0) / 9.0", "attribute": "tempC" } }
-
Create a file "payloads.json" which contains the example payloads that are used to test the pipeline activity:
[ "{\"humidity\": 52, \"temp\": 68 }", "{\"humidity\": 52, \"temp\": 32 }" ]
-
Call "RunPipelineActivities" from the command line:
aws iotanalytics run-pipeline-activity --pipeline-activity file://maths.json --payloads file://payloads.json
-
This produces the following results:
{ "logResult": "", "payloads": [ "eyJodW1pZGl0eSI6NTIsInRlbXAiOjY4LCJ0ZW1wQyI6MjB9", "eyJodW1pZGl0eSI6NTIsInRlbXAiOjMyLCJ0ZW1wQyI6MH0=" ] }
-
The "payloads" listed in the results are Base64-encoded strings. When these strings are decoded, you get the following results:
{"humidity":52,"temp":68,"tempC":20} {"humidity":52,"temp":32,"tempC":0}