AWS IoT Analytics
AWS IoT Analytics User Guide

Automating Your Workflow

AWS IoT Analytics enables you to perform continuous analysis by automating the execution of containers hosting custom analytical code or Jupyter Notebooks. It also allows you to import AWS IoT Analytics or third party custom code containers so that you don't have to recreate existing analytical tools. You can take input data from a data store and feed it into an automated workflow, using the following capabilities:

Create data set contents on a recurring schedule.

You can schedule the automatic creation of data set contents by specifying a trigger when you call CreateDataset (triggers:schedule:expression). Data which has arrived in a specified data store is used to create the data set contents, with the fields you want selected by a SQL query (actions:queryAction:sqlQuery).

You can also define a non-overlapping, contiguous time interval that ensures the new data set contents contain only that data which has arrived since the last time. Use the actions:queryAction:filters:deltaTime and :offsetSeconds fields to specify the delta time interval. As before, you also specify the trigger that creates the data set contents when the time interval has elapsed.

Create data set contents upon completion of another data set.

You can trigger creation of new data set contents upon completion of another data set contents creation (triggers:dataset:name).

Automatically run your analysis applications.

You can containerize your own, custom data analysis applications and trigger them to run upon creation of another data set contents. In this way, you can feed your application with data from data set contents created on a recurring schedule, and you can automatically take action on the results of your analysis from within your application. (actions:containerAction)

Use Cases

Automate product quality measurement to lower OpEx

You have a system with a smart valve that measures pressure, humidity and temperature. The system collates events on a periodic basis and also upon certain events such as valve open and close. With IoT Analytics, you can automate an analysis that would aggregate the non-overlapping data within these periodic windows and create KPI reports on end-product quality. After processing each product batch, you can measure the overall product quality and lower your operational expenditure through maximized run volume.

Automate the analysis of a fleet of devices

You run analytics (algorithm, data science or ML for KPI) every 15 minutes on data generated by 100s of devices, with each analytics cycle generating and storing state for next analytics run. For each of your analyses, you want to use only that data received within a specified time window. With IoT Analytics you can orchestrate your analyses and create the KPI and report for each run then store the data for future analytics.

Automate anomaly detection

IoT Analytics allows you to automate your anomaly detection workflow that you manually have to run every 15 minutes on new data which has arrived in a data store. You can also automate a dashboard that shows device usage and top users within a specified period of time.

Predict industrial process outcomes

You have industrial production lines. Using the data sent to IoT Analytics, including available process measurements, you can operationalize the analytical workflows to predict process outcomes. Data for the model can be arranged in an M x N matrix where each row contains data from various time points where laboratory samples are taken. IoT Analytics helps you operationalize your analytical workflow by creating delta windows and using your data science tools to create KPIs and save the state of the measurement devices.

How to Proceed

Note

This section includes informaton about how to build your own Docker container. There is a security risk if you re-use Docker containers built by third parties: these containers can execute arbitrary code with your user permissions. Make sure you trust the author of any third-party container before using it.

Here are the steps you would take to set up periodic data analysis on data which has arrived since the last analysis was performed:

  1. Create a Docker container that contains your data application plus any required libraries or other dependencies.

    The IotAnalytics Jupyter extension provides a containerization API to assist in the containerization process. You can also run images of your own creation in which you create or assemble your application toolset to perform the desired data analysis or computation. IoT Analytics allows you to define the source of the input data to the containerized application and the destination for the output data of the Docker container by means of variables. (Custom Docker Container Input/Output Variables contains more information about using variables with a custom container.)

  2. Upload the container to an Amazon ECR registry.

  3. Create a data store to receive and store messages (data) from devices (iotanalytics:CreateDatastore)

  4. Create a channel where the messages are sent (iotanalytics:CreateChannel).

  5. Create a pipeline to connect the channel to the data store (iotanalytics:CreatePipeline).

  6. Create an IAM role that grants permission to send message data to an AWS IoT Analytics channel (iam:CreateRole).

  7. Create an IoT rule that uses a SQL query to connect a channel to the source of the message data (iot:CreateTopicRule field topicRulePayload:actions:iotAnalytics). When a device sends a message with the appropriate topic via MQTT, it is routed to your channel. Or, you can use iotanalytics:BatchPutMessage to send messages directly into a channel from a device capable of using the AWS SDK or CLI.

  8. Create a SQL data set whose creation is triggered by a time schedule (iotanalytics:CreateDataset, field trigger:schedule) denoting the desired interval of N minutes/hours for data collection.

    You specify a query action with the appropriate SQL query to select the data fields you want (field "actions:queryAction:sqlQuery).

    You also specify a pre-filter to be applied to the message data to help limit the messages to those which have arrived since the last execution of the action. (Field actions:queryAction:filters:deltaTime:timeExpression gives an expression by which the time of a message may be determined, while field actions:queryAction:filters:deltaTime:offsetSeconds specifies possible latency in the arrival of a message.)

    The pre-filter, along with the trigger schedule, determines your "delta window". Each new SQL data set is created using messages received since the last time the SQL data set was created. (What about the first time the SQL data set is created? An estimate of when the "last time" the data set would have been created is made based on the schedule and the pre-filter.)

  9. Create another data set that is triggered by the creation of the first (CreateDataset field trigger:dataset). For this data set, you specify a "container action" (field actions:containerAction) that points to, and gives information needed to run, the Docker container you created in the first step. Here you also specify:

    • The ARN of the docker container stored in your account (image).

    • The ARN of the role which gives permission to the system to access needed resources in order to run the container action (executionRoleArn).

    • The configuration of the resource that executes the container action (resourceConfiguration).

    • The type of the compute resource used to execute the container action (computeType with possible values: ACU_1 [vCPU=4, memory=16GiB] or ACU_2 [vCPU=8, memory=32GiB]).

    • The size (in GB) of the persistent storage available to the resource instance used to execute the container action (volumeSizeInGB).

    • The values of variables used within the context of the execution of the application (basically, parameters passed to the application) (variables) .

      These variables are replaced at the time a container is executed. This allows you to run the same container with different variables (parameters) which are supplied at the time the dataset content is created.

      The IotAnalytics Jupyter extension simplifies this process by automatically recognizing the variables in a notebook and making them available as part of the containerization process. You can choose the recognized variables or add custom variables of your own. Before it runs a container, the system replaces each of these variables with the value current at the time of execution.

    • One of the variables is the name of the data set whose latest contents are used as input to the application (this is the name of the data set you created in the previous step) (datasetContentVersionValue:datasetName).

    • Another variable specifies an output file URI where the output data set contents are stored (usually the URI of a file in an S3 bucket (outputFileUriValue:filename).

With the SQL query and delta window to generate the data set, and the container with your application, IoT Analytics creates a scheduled production data set that runs at the interval you specify on data from the delta window, producing your desired output and sending notifications.

You can pause your production data set application and resume it whenever you choose to do so. When you resume your production data set application, IoT Analytics, by default, catches up all the data that has arrived since last execution, but hasn't been analyzed yet. You can also configure how you want to resume your production data set job besides the default catch up option. You can catch up in multiple fixed-length windows (as defined by your delta window length) by performing a series of consecutive runs. Alternatively, you can resume your Production data set application by capturing only the newly arrived data that fits within the specified size of your delta window. This option skips any data beyond the length of your delta window.

Please note the following limitations when creating/defining a data set which is triggered by the creation of another data set:

  • Only container data sets can be triggered by SQL data sets.

  • A SQL data set can trigger at most 10 container data sets.

The following errors may be returned when creating a container data set which is triggered by a SQL data set:

  • "Triggering dataset can only be added on a container dataset"

  • "There can only be one triggering dataset"

    This error occurs if you attempt to define a container data set which is triggered by two different SQL data sets.

  • "The triggering data set <dataset-name> cannot be triggered by a container dataset"

    This error occurs if you attempt to define a container data set which is triggered by another container data set.

  • "<N> datasets are already dependent on <dataset-name> dataset"

    This error occurs if you attempt to define another container data set which is triggered by a SQL data set which already triggers 10 container data sets.

  • "Exactly one trigger type should be provided"

    This error occurs is you attempt to define a data set which is triggered by both a schedule trigger and a data set trigger.

Custom Docker Container Input/Output Variables

This section demonstrates how the program which is run by your custom Docker image may read input variables and upload its output.

Params File

The input variables and the destinations to which you want to upload output are stored in a JSON file located at /opt/ml/input/data/iotanalytics/params on the instance that executes your docker image. Here is an example of the contents of that file:

{ "Context": { "OutputUris": { "html": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.html", "ipynb": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.ipynb" } }, "Variables": { "source_dataset_name": "mydataset", "source_dataset_version_id": "xxxx", "example_var": "hello world!", "custom_output": "s3://aws-iot-analytics/dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.txt" } }

In addition to the name and version ID of your dataset, the Variables section contains the variables specified in the iotanalytics:CreateDataset invocation-- in this example, a variable example_var was given the value hello world!. A custom output URI was also provided in the custom_output variable. The OutputUris field contains default locations to which the container can upload its output-- in this example, default output URIs were provided for both ipynb and html output.

Input Variables

The program launched by your Docker image can read variables from the params file. Here is an example program which opens the params file, parses it, and prints the value of the example_var variable:

import json with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] print(example_var)

Uploading Output

The program launched by your Docker image may also store its output in an AWS S3 location. The output must be loaded with a "bucket-owner-full-control" access control list. The access list grants the AWS IoT Analytics service control over the uploaded output. In this example we extend the previous one to upload the contents of example_var to the S3 location defined by custom_output in the params file:

import boto3 import json from urllib.parse import urlparse ACCESS_CONTROL_LIST = "bucket-owner-full-control" with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] outputUri = params["Variables"]["custom_output"] # break the S3 path into a bucket and key bucket = urlparse(outputUri).netloc key = urlparse(outputUri).path.lstrip("/") s3_client = boto3.client("s3") s3_client.put_object(Bucket=bucket, Key=key, Body=example_var, ACL=ACCESS_CONTROL_LIST)

Permissions

You must create two roles. One role grants permission to launch a SageMaker instance in order to containerize a notebook. Another role is needed to execute a container.

You can create the first role automatically or manually. If you create your new Amazon SageMaker instance with the AWS IoT Analytics console, you are given the option to automatically create a new role which grants all privileges necessary to execute SageMaker instances and containerize notebooks. Or, you may create a role with these privileges manually. To do this, create a role with the "AmazonSageMakerFullAccess" policy attached and add the following policy:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ecr:BatchDeleteImage", "ecr:BatchGetImage", "ecr:CompleteLayerUpload", "ecr:CreateRepository", "ecr:DescribeRepositories", "ecr:GetAuthorizationToken", "ecr:InitiateLayerUpload", "ecr:PutImage", "ecr:UploadLayerPart" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::iotanalytics-notebook-containers/*" } ] }

You must manually create the second role which grants permission to execute a container. (You must do this even if you used the AWS IoT Analytics console to create the first role automatically.) Create a role with the following policy and trust policy attached:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::aws-*-dataset-*/*" }, { "Effect": "Allow", "Action": [ "iotanalytics:*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ecr:GetAuthorizationToken", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "ecr:BatchCheckLayerAvailability", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:DescribeLogStreams", "logs:GetLogEvents", "logs:PutLogEvents" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:ListBucket", "s3:ListAllMyBuckets" ], "Resource": "*" } ] }

Trust policy:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": ["sagemaker.amazonaws.com", "iotanalytics.amazonaws.com"] }, "Action": "sts:AssumeRole" } ] }

The CreateDataset API

Creates a data set. A data set stores data retrieved from a data store by applying a "queryAction" (a SQL query) or a "containerAction" (executing a containerized application). This operation creates the skeleton of a data set. The data set can be populated manually by calling "CreateDatasetContent" or automatically according to a "trigger" you specify.

CLI Synopsis:

aws iotanalytics create-dataset --dataset-name <value> --actions <value> [--triggers <value>] [--content-delivery-rules <value>] [--retention-period <value>] [--tags <value>] [--cli-input-json <value>] [--generate-cli-skeleton]

cli-input-json format:

{ "datasetName": "string", "actions": [ { "actionName": "string", "queryAction": { "sqlQuery": "string", "filters": [ { "deltaTime": { "offsetSeconds": "integer", "timeExpression": "string" } } ] }, "containerAction": { "image": "string", "executionRoleArn": "string", "resourceConfiguration": { "computeType": "string", "volumeSizeInGB": "integer" }, "variables": [ { "name": "string", "stringValue": "string", "doubleValue": "double", "datasetContentVersionValue": { "datasetName": "string" }, "outputFileUriValue": { "fileName": "string" } } ] } } ], "triggers": [ { "schedule": { "expression": "string" }, "dataset": { "name": "string" } } ], "contentDeliveryRules": [ { "entryName": "string", "destination": { "iotEventsDestinationConfiguration": { "inputName": "string", "roleArn": "string" } } } ], "retentionPeriod": { "unlimited": "boolean", "numberOfDays": "integer" }, "tags": [ { "key": "string", "value": "string" } ] }

fields:

  • datasetName

    type: string; (length- max:128 min:1); (pattern: ^[a-zA-Z0-9_]+$)

    The name of the data set.

  • actions

    type: list member: DatasetAction

    A list of actions that create the data set contents.

  • actionName

    type: string; (length- max:128 min:1); (pattern: ^[a-zA-Z0-9_]+$)

    The name of the data set action by which data set contents are automatically created.

  • queryAction

    type: SqlQueryDatasetAction

    An "SqlQueryDatasetAction" object that uses an SQL query to automatically create data set contents.

  • sqlQuery

    type: string

    A SQL query string.

  • filters

    type: list member: QueryFilter

    Pre-filters applied to message data.

  • deltaTime

    type: DeltaTime

    Used to limit data to that which has arrived since the last execution of the action.

  • offsetSeconds

    type: integer java class: java.lang.Integer

    The number of seconds of estimated "in flight" lag time of message data. When you create data set contents using message data from a specified time frame, some message data may still be "in flight" when processing begins, and so will not arrive in time to be processed. Use this field to make allowances for the "in flight" time of your message data, so that data not processed from a previous time frame will be included with the next time frame. Without this, missed message data would be excluded from processing during the next time frame as well, because its timestamp places it within the previous time frame.

  • timeExpression

    type: string

    An expression by which the time of the message data may be determined. This may be the name of a timestamp field, or a SQL expression which is used to derive the time the message data was generated.

  • containerAction

    type: ContainerDatasetAction

    Information which allows the system to run a containerized application in order to create the data set contents. The application must be in a Docker container along with any needed support libraries.

  • image

    type: string; (length- max:255)

    The ARN of the Docker container stored in your account. The Docker container contains an application and needed support libraries and is used to generate data set contents.

  • executionRoleArn

    type: string; (length- max:2048 min:20)

    The ARN of the role which gives permission to the system to access needed resources in order to run the "containerAction". This includes, at minimum, permission to retrieve the data set contents which are the input to the containerized application.

  • resourceConfiguration

    type: ResourceConfiguration

    Configuration of the resource which executes the "containerAction".

  • computeType

    type: string

    The type of the compute resource used to execute the "containerAction". Possible values are: ACU_1 (vCPU=4, memory=16GiB) or ACU_2 (vCPU=8, memory=32GiB). enum: ACU_1 | ACU_2

  • volumeSizeInGB

    type: integer range- max:50 min:1

    The size (in GB) of the persistent storage available to the resource instance used to execute the "containerAction" (min: 1, max: 50).

  • variables

    type: list member: Variable

    The values of variables used within the context of the execution of the containerized application (basically, parameters passed to the application). Each variable must have a name and a value given by one of "stringValue", "datasetContentVersionValue", or "outputFileUriValue".

  • name

    type: string; (length- max:256 min:1)

    The name of the variable.

  • stringValue

    type: string; (length- max:1024 min:0)

    The value of the variable as a string.

  • datasetContentVersionValue

    type: DatasetContentVersionValue

    The value of the variable as a structure that specifies a data set content version.

  • datasetName

    type: string; (length- max:128 min:1); (pattern: ^[a-zA-Z0-9_]+$)

    The name of the data set whose latest contents are used as input to the notebook or application.

  • outputFileUriValue

    type: OutputFileUriValue

    The value of the variable as a structure that specifies an output file URI.

  • fileName

    type: string; (pattern: [w.-]{1,255})

    The URI of the location where data set contents are stored, usually the URI of a file in an S3 bucket.

  • triggers

    type: list member: DatasetTrigger

    A list of triggers. A trigger causes data set contents to be populated at a specified time interval or when another data set's contents are created. The list of triggers can be empty or contain up to five DataSetTrigger objects.

  • schedule

    type: Schedule

    The "Schedule" when the trigger is initiated.

  • expression

    type: string

    The expression that defines when to trigger an update. For more information, see Schedule Expressions for Rules in the Amazon CloudWatch documentation.

  • dataset

    type: TriggeringDataset

    The data set whose content creation triggers the creation of this data set's contents.

  • name

    type: string; (length- max:128 min:1); (pattern: ^[a-zA-Z0-9_]+$)

    The name of the data set whose content generation triggers the new data set content generation.

  • contentDeliveryRules

    type: list member: DatasetContentDeliveryRule

    When data set contents are created they are delivered to destinations specified here.

  • entryName

    type: string

    The name of the data set content delivery rules entry.

  • destination

    type: DatasetContentDeliveryDestination

    The destination to which data set contents are delivered.

  • iotEventsDestinationConfiguration

    type: IotEventsDestinationConfiguration

    Configuration information for delivery of data set contents to AWS IoT Events.

  • inputName

    type: string; (length- max:128 min:1); (pattern: ^[a-zA-Z][a-zA-Z0-9_]*$)

    The name of the AWS IoT Events input to which data set contents are delivered.

  • roleArn

    type: string; (length- max:2048 min:20)

    The ARN of the role which grants AWS IoT Analytics permission to deliver data set contents to an AWS IoT Events input.

  • retentionPeriod

    type: RetentionPeriod

    [Optional] How long, in days, message data is kept for the data set. If not given or set to null, the latest version of the dataset content plus the latest succeeded version (if they are different) are retained for at most 90 days.

  • unlimited

    type: boolean

    If true, message data is kept indefinitely.

  • numberOfDays

    type: integer java class: java.lang.Integer range- min:1

    The number of days that message data is kept. The "unlimited" parameter must be false.

  • tags

    type: list member: Tag

    Metadata which can be used to manage the data set.

  • key

    type: string; (length- max:256 min:1)

    The tag's key.

  • value

    type: string; (length- max:256 min:1)

    The tag's value.

Output:

{ "datasetName": "string", "datasetArn": "string", "retentionPeriod": { "unlimited": "boolean", "numberOfDays": "integer" } }

fields:

  • datasetName

    type: string; (length- max:128 min:1); (pattern: ^[a-zA-Z0-9_]+$)

    The name of the data set.

  • datasetArn

    type: string

    The ARN of the data set.

  • retentionPeriod

    type: RetentionPeriod

    How long, in days, message data is kept for the data set.

  • unlimited

    type: boolean

    If true, message data is kept indefinitely.

  • numberOfDays

    type: integer java class: java.lang.Integer range- min:1

    The number of days that message data is kept. The "unlimited" parameter must be false.

Errors:

  • InvalidRequestException

    The request was not valid.

    HTTP response code: 400

  • ResourceAlreadyExistsException

    A resource with the same name already exists.

    HTTP response code: 409

  • InternalFailureException

    There was an internal failure.

    HTTP response code: 500

  • ServiceUnavailableException

    The service is temporarily unavailable.

    HTTP response code: 503

  • ThrottlingException

    The request was denied due to request throttling.

    HTTP response code: 429

  • LimitExceededException

    The command caused an internal limit to be exceeded.

    HTTP response code: 410

Examples Using CreateDataset

Example 1 -- Creating a SQL data set (java):

CreateDatasetRequest request = new CreateDatasetRequest(); request.setDatasetName(dataSetName); DatasetAction action = new DatasetAction(); //Create Action action.setActionName("SQLAction1"); action.setQueryAction(new SqlQueryDatasetAction().withSqlQuery("select * from DataStoreName")); // Add Action to Actions List List<DatasetAction> actions = new ArrayList<DatasetAction>(); actions.add(action); //Create Trigger DatasetTrigger trigger = new DatasetTrigger(); trigger.setSchedule(new Schedule().withExpression("cron(0 12 * * ? *)")); //Add Trigger to Triggers List List<DatasetTrigger> triggers = new ArrayList<DatasetTrigger>(); triggers.add(trigger); // Add Triggers and Actions to CreateDatasetRequest object request.setActions(actions); request.setTriggers(triggers); // Add RetentionPeriod to CreateDatasetRequest object request.setRetentionPeriod(new RetentionPeriod().withNumberOfDays(10)); final CreateDatasetResult result = iot.createDataset(request);

Output on success:

{DatasetName: <datatsetName>, DatasetArn: <datatsetARN>, RetentionPeriod: {unlimited: true} or {numberOfDays: 10, unlimited: false}}

Example 2 -- Creating a SQL Dataset with a delta window (java):

CreateDatasetRequest request = new CreateDatasetRequest(); request.setDatasetName(dataSetName); DatasetAction action = new DatasetAction(); //Create Filter for DeltaTime QueryFilter deltaTimeFilter = new QueryFilter(); deltaTimeFilter.withDeltaTime( new DeltaTime() .withOffsetSeconds(-1 * EstimatedDataDelayInSeconds) .withTimeExpression("from_unixtime(timestamp)")); //Create Action action.setActionName("SQLActionWithDeltaTime"); action.setQueryAction(new SqlQueryDatasetAction() .withSqlQuery("SELECT * from DataStoreName") .withFilters(deltaTimeFilter)); // Add Action to Actions List List<DatasetAction> actions = new ArrayList<DatasetAction>(); actions.add(action); //Create Trigger DatasetTrigger trigger = new DatasetTrigger(); trigger.setSchedule(new Schedule().withExpression("cron(0 12 * * ? *)")); //Add Trigger to Triggers List List<DatasetTrigger> triggers = new ArrayList<DatasetTrigger>(); triggers.add(trigger); // Add Triggers and Actions to CreateDatasetRequest object request.setActions(actions); request.setTriggers(triggers); // Add RetentionPeriod to CreateDatasetRequest object request.setRetentionPeriod(new RetentionPeriod().withNumberOfDays(10)); final CreateDatasetResult result = iot.createDataset(request);

Output on success:

{DatasetName: <datatsetName>, DatasetArn: <datatsetARN>, RetentionPeriod: {unlimited: true} or {numberOfDays: 10, unlimited: false}}

Example 3 -- Creating a container data set with its own schedule trigger (java):

CreateDatasetRequest request = new CreateDatasetRequest(); request.setDatasetName(dataSetName); DatasetAction action = new DatasetAction(); //Create Action action.setActionName("ContainerActionDataset"); action.setContainerAction(new ContainerDatasetAction() .withImage(ImageURI) .withExecutionRoleArn(ExecutionRoleArn) .withResourceConfiguration( new ResourceConfiguration() .withComputeType(new ComputeType().withAcu(1)) .withVolumeSizeInGB(1)) .withVariables(new Variable() .withName("VariableName") .withStringValue("VariableValue")); // Add Action to Actions List List<DatasetAction> actions = new ArrayList<DatasetAction>(); actions.add(action); //Create Trigger DatasetTrigger trigger = new DatasetTrigger(); trigger.setSchedule(new Schedule().withExpression("cron(0 12 * * ? *)")); //Add Trigger to Triggers List List<DatasetTrigger> triggers = new ArrayList<DatasetTrigger>(); triggers.add(trigger); // Add Triggers and Actions to CreateDatasetRequest object request.setActions(actions); request.setTriggers(triggers); // Add RetentionPeriod to CreateDatasetRequest object request.setRetentionPeriod(new RetentionPeriod().withNumberOfDays(10)); final CreateDatasetResult result = iot.createDataset(request);

Output on success:

{DatasetName: <datatsetName>, DatasetArn: <datatsetARN>, RetentionPeriod: {unlimited: true} or {numberOfDays: 10, unlimited: false}}

Example 4 -- Creating a container data set with a SQL data set as a trigger (java):

CreateDatasetRequest request = new CreateDatasetRequest(); request.setDatasetName(dataSetName); DatasetAction action = new DatasetAction(); //Create Action action.setActionName("ContainerActionDataset"); action.setContainerAction(new ContainerDatasetAction() .withImage(ImageURI) .withExecutionRoleArn(ExecutionRoleArn) .withResourceConfiguration( new ResourceConfiguration() .withComputeType(new ComputeType().withAcu(1)) .withVolumeSizeInGB(1)) .withVariables(new Variable() .withName("VariableName") .withStringValue("VariableValue")); // Add Action to Actions List List<DatasetAction> actions = new ArrayList<DatasetAction>(); actions.add(action); //Create Trigger DatasetTrigger trigger = new DatasetTrigger() .withDataset(new TriggeringDataset() .withName(TriggeringSQLDataSetName)); //Add Trigger to Triggers List List<DatasetTrigger> triggers = new ArrayList<DatasetTrigger>(); triggers.add(trigger); // Add Triggers and Actions to CreateDatasetRequest object request.setActions(actions); request.setTriggers(triggers); final CreateDatasetResult result = iot.createDataset(request);

Output on success:

{DatasetName: <datatsetName>, DatasetArn: <datatsetARN>}

Example 5 -- Creating a SQL dataset (CLI):

aws iotanalytics --endpoint <EndPoint> --region <Region> create-dataset --dataset-name="<dataSetName>" --actions="[{\"actionName\":\"<ActionName>\", \"queryAction\":{\"sqlQuery\":\"<SQLQuery>\"}}]" --retentionPeriod numberOfDays=10

Output on success:

{ "datasetName": "<datasetName>", "datasetArn": "<datatsetARN>", "retentionPeriod": {unlimited: true} or {numberOfDays: 10, unlimited: false} }

Containerizing A Notebook

Note

This section includes informaton about how to build a Docker container using a Jupyter notebook. There is a security risk if you re-use notebooks built by third parties: included containers can execute arbitrary code with your user permissions. In addition, the HTML generated by the notebook can be displayed in the IoT Analytics console, providing a potential attack vector on the computer displaying the HTML. Make sure you trust the author of any third-party notebook before using it.

One option to perform advanced analytical functions is to use a Jupyter Notebook. Jupyter Notebooks provide powerful data science tools that can perform machine learning and a range of statistical analyses. For more information, see Notebook Templates. You can package your Jupyter Notebooks and libraries into a container that periodically runs on a new batch of data as it is received by AWS IoT Analytics during a delta time window you define. You can schedule an analysis job that uses the container and the new, segmented data captured within the specified time window, then stores the job’s output for future scheduled analytics.

If you have created a SageMaker Instance using the Iot Analytics console after August 23, 2018, then the installation of the containerization extension has been done for you automatically. Otherwise, follow the steps listed in this section to enable notebook containerization on your SageMaker instance. In what follows, you modify your SageMaker Execution Role to allow you to upload the container image to AWS ECR and you install the containerization extension.

Enable Containerization Of Notebook Instances Not Created Via Iot Analytics Console

We recommend that you create a new SageMaker instance via the Iot Analytics console instead of following these steps. New instances automatically support containerization.

If you restart your SageMaker instance after enabling containerization as shown here, you won't have to re-add the IAM roles and policies, but you must re-install the extension, as shown in the final step.

  1. To grant your notebook instance access to AWS ECS, select your SageMaker instance on the Amazon SageMaker page:

  2. Under IAM role ARN choose the SageMaker Execution Role:

  3. Choose Attach Policy, then define and attach the policy shown in Permissions.

    If the "AmazonSageMakerFullAccess" policy is not already attached, attach it as well:

You also must download the containerization code from S3 and install it on your notebook instance. The first step is to access the SageMaker instance's terminal.

  1. Inside Jupyter, choose New:

  2. In the dropdown menu that appears, select Terminal:

  3. Inside the terminal, enter the following commands to download the code, unzip it, and install it. Note that these commands kill any processes being run by your notebooks on this SageMaker instance.

    cd /tmp aws s3 cp s3://iotanalytics-notebook-containers/iota_notebook_containers.zip /tmp unzip iota_notebook_containers.zip cd iota_notebook_containers chmod u+x install.sh ./install.sh

    Wait for a minute or two for the extension to be validated and installed.

Update Your Notebook Containerization Extension

If you created your SageMaker Instance via the Iot Analytics console after August 23, 2018, then the containerization extension was installed automatically. You can update the extension by restarting your instance from the SageMaker Console. If you installed the extension manually, then you may update it by re-running the terminal commands listed in Enable Containerization Of Notebook Instances Not Created Via Iot Analytics Console.

Create A Containerized Image

In this section we show the steps necessary to containerize a notebook. To begin, go to your Jupyter Notebook to create a notebook with a containerized kernel.

  1. In your Jupyter Notebook, choose New, then choose the kernel type you want from the dropdown list. (The kernel type should start with "Containerized" and end with whatever kernel you would have otherwise selected. For example, if you just want a plain python3 environment like "conda_python3", choose "Containerized conda_python3"):

  2. After you have completed work on your notebook and you want to containerize it, choose the containerize button:

  3. Enter a name for the containerized notebook. You may also enter an optional description:

  4. Specify the Input Variables (parameters) that your notebook should be invoked with. You can select the input variables that are automatically detected from your notebook or define custom variables. (Note that input variables are only detected if you have previously executed your notebook.) For each input variable choose a type. You can also enter an optional description of the input variable:

  5. Choose the AWS ECR repository where the image created from the notebook should be uploaded:

  6. You are presented with with an overview summarizing your input. Note that after you have started the containerization process you cannot cancel it. The process may last for over an hour.

    Choose containerize to begin the containerization process.

  7. The next page shows the progress:

  8. If you accidentally close your browser, you can monitor the status of the containerization process from the Notebooks section of the IoT Analytics console.

  9. After the process is complete, the containerized image is stored on AWS ECR ready for use:

Using Your Own Custom Container for Analysis

Note

This section includes informaton about how to build a Docker container using a Jupyter notebook. There is a security risk if you re-use notebooks built by third parties: included containers can execute arbitrary code with your user permissions. In addition, the HTML generated by the notebook can be displayed in the IoT Analytics console, providing a potential attack vector on the computer displaying the HTML. Make sure you trust the author of any third-party notebook before using it.

You can create your own custom container and run it with the IoT Analytics service. To do so, you setup a Docker image and upload it to Amazon ECR, then set up a data set to run a container action. This section gives an example of the process using Octave.

This tutorial assumes that you have:

  • Octave installed on your local computer

  • a Docker account set up on your local computer

  • an AWS account with ECR/IoT Analytics access

Step 1: Set up a Docker image

There are three main files you need for this tutorial. Their names and contents are here:

  • Dockerfile - The initial setup for Docker's containerization process.

FROM ubuntu:16.04 # Get required set of software RUN apt-get update RUN apt-get install -y software-properties-common RUN apt-get install -y octave RUN apt-get install -y python3-pip # Get boto3 for S3 and other libraries RUN pip3 install --upgrade pip RUN pip3 install boto3 RUN pip3 install urllib3 # Move scripts over ADD moment moment ADD run-octave.py run-octave.py # Start python script ENTRYPOINT ["python3", "run-octave.py"]
  • run-octave.py - Parses JSON from IoT Analytics, runs the Octave script, and uploads artifacts to S3.

import boto3 import json import os import sys from urllib.parse import urlparse # Parse the JSON from IoT Analytics with open('/opt/ml/input/data/iotanalytics/params') as params_file: params = json.load(params_file) variables = params['Variables'] order = variables['order'] input_s3_bucket = variables['inputDataS3BucketName'] input_s3_key = variables['inputDataS3Key'] output_s3_uri = variables['octaveResultS3URI'] local_input_filename = "input.txt" local_output_filename = "output.mat" # Pull input data from S3... s3 = boto3.resource('s3') s3.Bucket(input_s3_bucket).download_file(input_s3_key, local_input_filename) # Run Octave Script os.system("octave moment {} {} {}".format(local_input_filename, local_output_filename, order)) # # Upload the artifacts to S3 output_s3_url = urlparse(output_s3_uri) output_s3_bucket = output_s3_url.netloc output_s3_key = output_s3_url.path[1:] s3.Object(output_s3_bucket, output_s3_key).put(Body=open(local_output_filename, 'rb'), ACL='bucket-owner-full-control')
  • moment - A simple Octave script which calculates the moment based on an input/output file and a specified order.

#!/usr/bin/octave -qf arg_list = argv (); input_filename = arg_list{1}; output_filename = arg_list{2}; order = str2num(arg_list{3}); [D,delimiterOut]=importdata(input_filename) M = moment(D, order) save(output_filename,'M')
  1. Download the contents of each file. Create a new directory and place all the files in it. Then cd to that directory.

  2. Run:

    docker build -t octave-moment .
  3. You should see a new image in your Docker repo. Verify it by running:

    docker image ls | grep octave-moment

Step 2: Upload the Docker image to an ECR repository

  1. Create a new repository in ECR:

    aws ecr create-repository --repository-name octave-moment
  2. Get the login to your Docker environment:

    aws ecr get-login
  3. Copy the output and run it. The output should look something like:

    docker login -u AWS -p <password> -e none https://<your-aws-account-id>.dkr.ecr..amazonaws.com
  4. Tag the image you created with the ECR Repository Tag:

    docker tag <your-image-id> <your-aws-account-id>.dkr.ecr.<region>.amazonaws.com/octave-moment
  5. Push the image to ECR

    docker push <your-aws-account-id>.dkr.ecr.<region>.amazonaws.com/octave-moment

Step 3: Upload your sample data to an S3 bucket

  1. Download the following to file "input.txt":

    0.857549 -0.987565 -0.467288 -0.252233 -2.298007 0.030077 -1.243324 -0.692745 0.563276 0.772901 -0.508862 -0.404303 -1.363477 -1.812281 -0.296744 -0.203897 0.746533 0.048276 0.075284 0.125395 0.829358 1.246402 -1.310275 -2.737117 0.024629 1.206120 0.895101 1.075549 1.897416 1.383577
  2. Create a new S3 Bucket called "octave-sample-data-<your-account-id>".

  3. Upload the file "input.txt" to the S3 Bucket you just created. You should now have a bucket named octave-sample-data-<your-aws-account-id> containing the file input.txt.

Step 4: Create a container execution role

  1. Download the following to a file named "role1.json":

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "sagemaker.amazonaws.com", "iotanalytics.amazonaws.com" ] }, "Action": [ "sts:AssumeRole" ] } ] }
  2. Create a new role that gives access permissions to Amazon SageMaker and AWS IoT Analytics, using the file "role1.json" you just downloaded:

    aws iam create-role --role-name container-execution-role --assume-role-policy-document file://role1.json
  3. Download the following to a file named "policy1.json" and replace "<your-account-id>" with your account id (see the second ARN under Statement:Resource):

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::*-dataset-*/*", "arn:aws:s3:::octave-sample-data-<your-account-id>/*" }, { "Effect": "Allow", "Action": [ "iotanalytics:*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ecr:GetAuthorizationToken", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "ecr:BatchCheckLayerAvailability", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:DescribeLogStreams", "logs:GetLogEvents", "logs:PutLogEvents" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:ListBucket", "s3:ListAllMyBuckets" ], "Resource" : "*" } ] }
  4. Create an IAM policy, using the file "policy1.json" you just downloaded:

    aws iam create-policy --policy-name ContainerExecutionPolicy --policy-document file://policy1.json
  5. Attach the policy to the role:

    aws iam attach-role-policy --role-name container-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/ContainerExecutionPolicy

Step 5: Create a data set with a container action

  1. Download the following to a file named "cli-input.json" and replace all instances of "<your-account-id>" and "<region>" with the appropriate values:

    { "datasetName": "octave_dataset", "actions": [ { "actionName": "octave", "containerAction": { "image": "<your-account-id>.dkr.ecr.<region>.amazonaws.com/octave-moment", "executionRoleArn": "arn:aws:iam::<your-account-id>:role/container-execution-role", "resourceConfiguration": { "computeType": "ACU_1", "volumeSizeInGB": 1 }, "variables": [ { "name": "octaveResultS3URI", "outputFileUriValue": { "fileName": "output.mat" } }, { "name": "inputDataS3BucketName", "stringValue": "octave-sample-data-<your-account-id>" }, { "name": "inputDataS3Key", "stringValue": "input.txt" }, { "name": "order", "stringValue": "3" } ] } } ] }
  2. Create a data set using the file "cli-input.json" you just downloaded and edited:

    aws iotanalytics create-dataset —cli-input-json file://cli-input.json

Step 6: Invoke data set content generation

  1. Run:

    aws iotanalytics create-dataset-content --dataset-name octave-dataset

Step 7: Get data set content

  1. Run:

    aws iotanalytics get-dataset-content --dataset-name octave-dataset --version-id \$LATEST
  2. You may need to wait several minutes until the DatasetContentState is SUCCEEDED.

Step 8: Print the output on Octave

  1. Use the Octave shell to print the output from the container by running:

    bash> octave octave> load output.mat octave> disp(M) -0.016393 -0.098061 0.380311 -0.564377 -1.318744