Export data streams to the AWS Cloud (console) - AWS IoT Greengrass

You are viewing the documentation for AWS IoT Greengrass Version 1. AWS IoT Greengrass Version 2 is the latest major version of AWS IoT Greengrass. For more information about using AWS IoT Greengrass V2, see the AWS IoT Greengrass Version 2 Developer Guide.

Export data streams to the AWS Cloud (console)

This tutorial shows you how to use the AWS IoT console to configure and deploy an AWS IoT Greengrass group with stream manager enabled. The group contains a user-defined Lambda function that writes to a stream in stream manager, which is then exported automatically to the AWS Cloud.

Stream manager makes ingesting, processing, and exporting high-volume data streams more efficient and reliable. In this tutorial, you create a TransferStream Lambda function that consumes IoT data. The Lambda function uses the AWS IoT Greengrass Core SDK to create a stream in stream manager and then read and write to it. Stream manager then exports the stream to Kinesis Data Streams. The following diagram shows this workflow.


      Diagram of the stream management workflow.

The focus of this tutorial is to show how user-defined Lambda functions use the StreamManagerClient object in the AWS IoT Greengrass Core SDK to interact with stream manager. For simplicity, the Python Lambda function that you create for this tutorial generates simulated device data.

Prerequisites

To complete this tutorial, you need:

  • A Greengrass group and a Greengrass core (v1.10 or later). For information about how to create a Greengrass group and core, see Getting started with AWS IoT Greengrass. The Getting Started tutorial also includes steps for installing the AWS IoT Greengrass Core software.

    Note

    Stream manager is not supported on OpenWrt distributions.

  • The Java 8 runtime (JDK 8) installed on the core device.

    • For Debian-based distributions (including Raspbian) or Ubuntu-based distributions, run the following command:

      sudo apt install openjdk-8-jdk
    • For Red Hat-based distributions (including Amazon Linux), run the following command:

      sudo yum install java-1.8.0-openjdk

      For more information, see How to download and install prebuilt OpenJDK packages in the OpenJDK documentation.

  • AWS IoT Greengrass Core SDK for Python v1.5.0 or later. To use StreamManagerClient in the AWS IoT Greengrass Core SDK for Python, you must:

    • Install Python 3.7 or later on the core device.

    • Include the SDK and its dependencies in your Lambda function deployment package. Instructions are provided in this tutorial.

    Tip

    You can use StreamManagerClient with Java or NodeJS. For example code, see the AWS IoT Greengrass Core SDK for Java and AWS IoT Greengrass Core SDK for Node.js on GitHub.

  • A destination stream named MyKinesisStream created in Amazon Kinesis Data Streams in the same AWS Region as your Greengrass group. For more information, see Create a stream in the Amazon Kinesis Developer Guide.

    Note

    In this tutorial, stream manager exports data to Kinesis Data Streams, which results in charges to your AWS account. For information about pricing, see Kinesis Data Streams pricing.

    To avoid incurring charges, you can run this tutorial without creating a Kinesis data stream. In this case, you check the logs to see that stream manager attempted to export the stream to Kinesis Data Streams.

  • An IAM policy added to the Greengrass group role that allows the kinesis:PutRecords action on the target data stream, as shown in the following example:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

The tutorial contains the following high-level steps:

The tutorial should take about 20 minutes to complete.

Step 1: Create a Lambda function deployment package

In this step, you create a Lambda function deployment package that contains Python function code and dependencies. You upload this package later when you create the Lambda function in AWS Lambda. The Lambda function uses the AWS IoT Greengrass Core SDK to create and interact with local streams.

Note

Your user-defined Lambda functions must use the AWS IoT Greengrass Core SDK to interact with stream manager. For more information about requirements for the Greengrass stream manager, see Greengrass stream manager requirements.

  1. Download the AWS IoT Greengrass Core SDK for Python v1.5.0 or later.

  2. Unzip the downloaded package to get the SDK. The SDK is the greengrasssdk folder.

  3. Install package dependencies to include with the SDK in your Lambda function deployment package.

    1. Navigate to the SDK directory that contains the requirements.txt file. This file lists the dependencies.

    2. Install the SDK dependencies. For example, run the following pip command to install them in the current directory:

      pip install --target . -r requirements.txt
  4. Save the following Python code function in a local file named transfer_stream.py.

    Tip

    For example code that uses Java and NodeJS, see the AWS IoT Greengrass Core SDK for Java and AWS IoT Greengrass Core SDK for Node.js on GitHub.

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. Zip the following items into a file named transfer_stream_python.zip. This is your Lambda function deployment package.

    • transfer_stream.py. App logic.

    • greengrasssdk. Required library for Python Greengrass Lambda functions that publish MQTT messages.

      Stream manager operations are available in version 1.5.0 or later of the AWS IoT Greengrass Core SDK for Python.

    • The dependencies you installed for the AWS IoT Greengrass Core SDK for Python (for example, the cbor2 directories).

    When you create the zip file, include only these items, not the containing folder.

Step 2: Create a Lambda function

In this step, you use the AWS Lambda console to create a Lambda function and configure it to use your deployment package. Then, you publish a function version and create an alias.

  1. First, create the Lambda function.

    1. In the AWS Management Console, choose Services, and open the AWS Lambda console.

    2. Choose Create function and then choose Author from scratch.

    3. In the Basic information section, use the following values:

      • For Function name, enter TransferStream.

      • For Runtime, choose Python 3.7.

      • For Permissions, keep the default setting. This creates an execution role that grants basic Lambda permissions. This role isn't used by AWS IoT Greengrass.

    4. At the bottom of the page, choose Create function.

  2. Next, register the handler and upload your Lambda function deployment package.

    1. On the Code tab, under Code source, choose Upload from. From the dropdown, choose .zip file.

      
                The Upload from dropdown with .zip file highlighted.
    2. Choose Upload, and then choose your transfer_stream_python.zip deployment package. Then, choose Save.

    3. On the Code tab for the function, under Runtime settings, choose Edit, and then enter the following values.

      • For Runtime, choose Python 3.7.

      • For Handler, enter transfer_stream.function_handler

    4. Choose Save.

      Note

      The Test button on the AWS Lambda console doesn't work with this function. The AWS IoT Greengrass Core SDK doesn't contain modules that are required to run your Greengrass Lambda functions independently in the AWS Lambda console. These modules (for example, greengrass_common) are supplied to the functions after they are deployed to your Greengrass core.

  3. Now, publish the first version of your Lambda function and create an alias for the version.

    Note

    Greengrass groups can reference a Lambda function by alias (recommended) or by version. Using an alias makes it easier to manage code updates because you don't have to change your subscription table or group definition when the function code is updated. Instead, you just point the alias to the new function version.

    1. From the Actions menu, choose Publish new version.

    2. For Version description, enter First version, and then choose Publish.

    3. On the TransferStream: 1 configuration page, from the Actions menu, choose Create alias.

    4. On the Create a new alias page, use the following values:

      • For Name, enter GG_TransferStream.

      • For Version, choose 1.

      Note

      AWS IoT Greengrass doesn't support Lambda aliases for $LATEST versions.

    5. Choose Create.

Now you're ready to add the Lambda function to your Greengrass group.

Step 3: Add a Lambda function to the Greengrass group

In this step, you add the Lambda function to the group and then configure its lifecycle and environment variables. For more information, see Controlling execution of Greengrass Lambda functions by using group-specific configuration.

  1. In the AWS IoT console, in the navigation pane, choose Greengrass, Classic (V1), Groups.

  2. Choose the target group.

  3. On the group configuration page, choose Lambdas, and then choose Add Lambda.

    
                The group page with Lambdas and Add Lambda highlighted.
  4. On the Add a Lambda to your Greengrass Group page, choose Use existing Lambda.

    
                The Add a Lambda to your Greengrass Group page with Use existing Lambda highlighted.
  5. On the Use existing Lambda page, choose TransferStream, and then choose Next.

  6. On the Select a Lambda version page, choose Alias:GG_TransferStream, and then choose Finish.

    Now, configure properties that determine the behavior of the Lambda function in the Greengrass group.

  7. For the TransferStream Lambda function, choose the ellipsis (), and then choose Edit Configuration.

  8. On the Group-specific Lambda configuration page, make the following changes:

    • Set Memory limit to 32 MB.

    • For Lambda lifecycle, choose Make this function long-lived and keep it running indefinitely.

    Note

    A long-lived (or pinned) Lambda function starts automatically after AWS IoT Greengrass starts and keeps running in its own container. This is in contrast to an on-demand Lambda function, which starts when invoked and stops when there are no tasks left to run. For more information, see Lifecycle configuration for Greengrass Lambda functions.

  9. Choose Update.

Step 4: Enable stream manager

In this step, you make sure that stream manager is enabled.

  1. On the group configuration page, choose Settings.

    
                Group settings page.
  2. Under Stream manager, check the enabled or disabled status. If disabled, choose Edit. Then, choose Enable and Save. You can use the default parameter settings for this tutorial. For more information, see Configure AWS IoT Greengrass stream manager.

    
                        The Stream manager section on the group's Settings page.
Note

When you use the console to enable stream manager and deploy the group, the memory size for stream manager is set to 4194304 KB (4 GB) by default. We recommend that you set the memory size to at least 128000 KB.

Step 5: Configure local logging

In this step, you configure AWS IoT Greengrass system components, user-defined Lambda functions, and connectors in the group to write logs to the file system of the core device. You can use logs to troubleshoot any issues you might encounter. For more information, see Monitoring with AWS IoT Greengrass logs.

  1. Under Local logs configuration, check if local logging is configured.

    
                Logs configuration section showing Greengrass system logs and user Lambda logs configuration.
  2. If logs aren't configured for Greengrass system components or user-defined Lambda functions, choose Edit.

  3. Choose Add another log type, choose User Lambdas and Greengrass system, and then choose Update.

  4. Keep the default values for logging level and disk space limit, and then choose Save.

Step 6: Deploy the Greengrass group

Deploy the group to the core device.

  1. Make sure that the AWS IoT Greengrass core is running. Run the following commands in your Raspberry Pi terminal, as needed.

    1. To check whether the daemon is running:

      ps aux | grep -E 'greengrass.*daemon'

      If the output contains a root entry for /greengrass/ggc/packages/ggc-version/bin/daemon, then the daemon is running.

      Note

      The version in the path depends on the AWS IoT Greengrass Core software version that's installed on your core device.

    2. To start the daemon:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. On the group configuration page, choose Deployments, and from the Actions menu, choose Deploy.

    
                The group page with Deployments and Deploy highlighted.
  3. If prompted, on the Configure how devices discover your core page, choose Automatic detection.

    This enables devices to automatically acquire connectivity information for the core, such as IP address, DNS, and port number. Automatic detection is recommended, but AWS IoT Greengrass also supports manually specified endpoints. You're only prompted for the discovery method the first time that the group is deployed.

    
                The Configure how devices discover your core page with Automatic detection highlighted.
    Note

    If prompted, grant permission to create the Greengrass service role and associate it with your AWS account in the current AWS Region. This role allows AWS IoT Greengrass to access your resources in AWS services.

    The Deployments page shows the deployment timestamp, version ID, and status. When completed, the status displayed for the deployment should be Successfully completed.

    For troubleshooting help, see Troubleshooting AWS IoT Greengrass.

Step 7: Test the application

The TransferStream Lambda function generates simulated device data. It writes data to a stream that stream manager exports to the target Kinesis data stream.

  1. In the Amazon Kinesis console, under Kinesis data streams, choose MyKinesisStream.

    Note

    If you ran the tutorial without a target Kinesis data stream, check the log file for the stream manager (GGStreamManager). If it contains export stream MyKinesisStream doesn't exist in an error message, then the test is successful. This error means that the service tried to export to the stream but the stream doesn't exist.

  2. On the MyKinesisStream page, choose Monitoring. If the test is successful, you should see data in the Put Records charts. Depending on your connection, it might take a minute before the data is displayed.

    Important

    When you're finished testing, delete the Kinesis data stream to avoid incurring more charges.

    Or, run the following commands to stop the Greengrass daemon. This prevents the core from sending messages until you're ready to continue testing.

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. Remove the TransferStream Lambda function from the core.

    1. In the AWS IoT console, in the navigation pane, choose Greengrass, Classic (V1), Groups.

    2. Under Greengrass groups, choose your group.

    3. On the Lambdas page, choose the ellipses () for the TransferStream function, and then choose Remove function.

    4. From Actions, choose Deploy.

To view logging information or troubleshoot issues with streams, check the logs for the TransferStream and GGStreamManager functions. You must have root permissions to read AWS IoT Greengrass logs on the file system.

  • TransferStream writes log entries to greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log.

  • GGStreamManager writes log entries to greengrass-root/ggc/var/log/system/GGStreamManager.log.

If you need more troubleshooting information, you can set the logging level for User Lambda logs to Debug logs and then deploy the group again.

See also