Create custom components that use
stream manager
Use stream manager in custom Greengrass components to store, process, and export IoT device data.
Use the procedures and examples in this section to create component recipes, artifacts, and
applications that work with stream manager. For more information about how to develop and test
components, see Create AWS IoT Greengrass components.
Define component recipes that use stream manager
To use stream manager in a custom component, you must define the
aws.greengrass.StreamManager
component as a dependency. You must also provide the
Stream Manager SDK. Complete the following tasks to download and use the Stream Manager SDK in the language of your
choice.
The Stream Manager SDK for Java is available as a JAR file that you can use to compile your
component. Then, you can create an application JAR that includes the Stream Manager SDK, define the
application JAR as a component artifact, and run the application JAR in the component
lifecycle.
To use the Stream Manager SDK for Java
-
Download the Stream Manager SDK for Java JAR file.
-
Do one of the following to create component artifacts from your Java application and
the Stream Manager SDK JAR file:
-
Build your application as a JAR file that includes the Stream Manager SDK JAR, and run
this JAR file in your component recipe.
-
Define the Stream Manager SDK JAR as a component artifact. Add that artifact to the
classpath when you run your application in your component recipe.
Your component recipe might look like the following example. This component runs a
modified version of the StreamManagerS3.java example, where StreamManagerS3.jar
includes the Stream Manager SDK JAR.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.StreamManagerS3Java",
"ComponentVersion": "1.0.0",
"ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
"ComponentPublisher": "Amazon",
"ComponentDependencies": {
"aws.greengrass.StreamManager": {
"VersionRequirement": "^2.0.0"
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/StreamManagerS3.jar"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar"
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.StreamManagerS3Java
ComponentVersion: 1.0.0
ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
ComponentPublisher: Amazon
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: "^2.0.0"
Manifests:
- Lifecycle:
run: java -jar {artifacts:path}/StreamManagerS3.jar
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar
For more information about how to develop and test components, see Create AWS IoT Greengrass components.
The Stream Manager SDK for Python is available as source code that you can include in your
component. Create a ZIP file of the Stream Manager SDK, define the ZIP file as a component artifact,
and install the SDK's requirements in the component lifecycle.
To use the Stream Manager SDK for Python
-
Clone or download the aws-greengrass-stream-manager-sdk-python repository.
git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
-
Create a ZIP file that contains the stream_manager
folder,
which contains the source code of the Stream Manager SDK for Python. You can provide this ZIP
file as a component artifact that the AWS IoT Greengrass Core software unzips when it installs your
component. Do the following:
-
Open the folder that contains the repository that you cloned or downloaded in
the previous step.
cd aws-greengrass-stream-manager-sdk-python
-
Zip the stream_manager
folder into a ZIP file named
stream_manager_sdk.zip
.
- Linux or Unix
-
zip -rv stream_manager_sdk.zip stream_manager
- Windows Command Prompt (CMD)
-
tar -acvf stream_manager_sdk.zip stream_manager
- PowerShell
-
Compress-Archive stream_manager stream_manager_sdk.zip
-
Verify that the stream_manager_sdk.zip
file contains the
stream_manager
folder and its contents. Run the following
command to list the contents of the ZIP file.
- Linux or Unix
-
unzip -l stream_manager_sdk.zip
- Windows Command Prompt (CMD)
-
tar -tf stream_manager_sdk.zip
The output should look similar to the following.
Archive: aws-greengrass-stream-manager-sdk-python/stream_manager.zip
Length Date Time Name
--------- ---------- ----- ----
0 02-24-2021 20:45 stream_manager/
913 02-24-2021 20:45 stream_manager/__init__.py
9719 02-24-2021 20:45 stream_manager/utilinternal.py
1412 02-24-2021 20:45 stream_manager/exceptions.py
1004 02-24-2021 20:45 stream_manager/util.py
0 02-24-2021 20:45 stream_manager/data/
254463 02-24-2021 20:45 stream_manager/data/__init__.py
26515 02-24-2021 20:45 stream_manager/streammanagerclient.py
--------- -------
294026 8 files
-
Copy the Stream Manager SDK artifacts to your component's artifacts folder. In addition to
the Stream Manager SDK ZIP file, your component uses the SDK's
requirements.txt
file to install the dependencies of the
Stream Manager SDK. Replace ~/greengrass-components
with the path to the
folder that you use for local development.
- Linux or Unix
-
cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components
/artifacts/com.example.StreamManagerS3Python/1.0.0/
- Windows Command Prompt (CMD)
-
robocopy . %USERPROFILE%\greengrass-components
\artifacts\com.example.StreamManagerS3Python\1.0.0 stream_manager_sdk.zip
robocopy . %USERPROFILE%\greengrass-components
\artifacts\com.example.StreamManagerS3Python\1.0.0 requirements.txt
- PowerShell
-
cp .\stream_manager_sdk.zip,.\requirements.txt ~\greengrass-components
\artifacts\com.example.StreamManagerS3Python\1.0.0\
-
Create your component recipe. In the recipe, do the following:
-
Define stream_manager_sdk.zip
and
requirements.txt
as artifacts.
-
Define your Python application as an artifact.
-
In the install lifecycle, install the Stream Manager SDK requirements from
requirements.txt
.
-
In the run lifecycle, append the Stream Manager SDK to PYTHONPATH
, and run
your Python application.
Your component recipe might look like the following example. This component runs
the stream_manager_s3.py example.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.StreamManagerS3Python",
"ComponentVersion": "1.0.0",
"ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
"ComponentPublisher": "Amazon",
"ComponentDependencies": {
"aws.greengrass.StreamManager": {
"VersionRequirement": "^2.0.0"
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "pip3 install --user -r {artifacts:path}/requirements.txt",
"run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip",
"Unarchive": "ZIP"
},
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py"
},
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt"
}
]
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "pip3 install --user -r {artifacts:path}/requirements.txt",
"run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip",
"Unarchive": "ZIP"
},
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py"
},
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt"
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.StreamManagerS3Python
ComponentVersion: 1.0.0
ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
ComponentPublisher: Amazon
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: "^2.0.0"
Manifests:
- Platform:
os: linux
Lifecycle:
install: pip3 install --user -r {artifacts:path}/requirements.txt
run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 {artifacts:path}/stream_manager_s3.py
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip
Unarchive: ZIP
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt
- Platform:
os: windows
Lifecycle:
install: pip3 install --user -r {artifacts:path}/requirements.txt
run: |
set "PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk"
py -3 {artifacts:path}/stream_manager_s3.py
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip
Unarchive: ZIP
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt
For more information about how to develop and test components, see Create AWS IoT Greengrass components.
The Stream Manager SDK for JavaScript is available as source code that you can include in your
component. Create a ZIP file of the Stream Manager SDK, define the ZIP file as a component artifact,
and install the SDK in the component lifecycle.
To use the Stream Manager SDK for JavaScript
-
Clone or download the aws-greengrass-stream-manager-sdk-js repository.
git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
-
Create a ZIP file that contains the
aws-greengrass-stream-manager-sdk
folder, which contains the
source code of the Stream Manager SDK for JavaScript. You can provide this ZIP file as a
component artifact that the AWS IoT Greengrass Core software unzips when it installs your
component. Do the following:
-
Open the folder that contains the repository that you cloned or downloaded in
the previous step.
cd aws-greengrass-stream-manager-sdk-js
-
Zip the aws-greengrass-stream-manager-sdk
folder into a
ZIP file named stream-manager-sdk.zip
.
- Linux or Unix
-
zip -rv stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
- Windows Command Prompt (CMD)
-
tar -acvf stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
- PowerShell
-
Compress-Archive aws-greengrass-stream-manager-sdk stream-manager-sdk.zip
-
Verify that the stream-manager-sdk.zip
file contains the
aws-greengrass-stream-manager-sdk
folder and its contents.
Run the following command to list the contents of the ZIP file.
- Linux or Unix
-
unzip -l stream-manager-sdk.zip
- Windows Command Prompt (CMD)
-
tar -tf stream-manager-sdk.zip
The output should look similar to the following.
Archive: stream-manager-sdk.zip
Length Date Time Name
--------- ---------- ----- ----
0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/
369 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/package.json
1017 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/util.js
8374 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/utilInternal.js
1937 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/exceptions.js
0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/
353343 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/index.js
22599 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/client.js
216 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/index.js
--------- -------
387855 9 files
-
Copy the Stream Manager SDK artifact to your component's artifacts folder. Replace
~/greengrass-components
with the path to the folder that
you use for local development.
- Linux or Unix
-
cp stream-manager-sdk.zip ~/greengrass-components
/artifacts/com.example.StreamManagerS3JS/1.0.0/
- Windows Command Prompt (CMD)
-
robocopy . %USERPROFILE%\greengrass-components
\artifacts\com.example.StreamManagerS3JS\1.0.0 stream-manager-sdk.zip
- PowerShell
-
cp .\stream-manager-sdk.zip ~\greengrass-components
\artifacts\com.example.StreamManagerS3JS\1.0.0\
-
Create your component recipe. In the recipe, do the following:
-
Define stream-manager-sdk.zip
as an artifact.
-
Define your JavaScript application as an artifact.
-
In the install lifecycle, install the Stream Manager SDK from the
stream-manager-sdk.zip
artifact. This npm
install
command creates a node_modules
folder that
contains the Stream Manager SDK and its dependencies.
-
In the run lifecycle, append the node_modules
folder to
NODE_PATH
, and run your JavaScript application.
Your component recipe might look like the following example. This component runs
the StreamManagerS3 example.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.StreamManagerS3JS",
"ComponentVersion": "1.0.0",
"ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
"ComponentPublisher": "Amazon",
"ComponentDependencies": {
"aws.greengrass.StreamManager": {
"VersionRequirement": "^2.0.0"
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk",
"run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip",
"Unarchive": "ZIP"
},
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js"
}
]
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk",
"run": "set \"NODE_PATH=%NODE_PATH%;{work:path}/node_modules\" & node {artifacts:path}/index.js"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip",
"Unarchive": "ZIP"
},
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js"
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.StreamManagerS3JS
ComponentVersion: 1.0.0
ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
ComponentPublisher: Amazon
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: "^2.0.0"
Manifests:
- Platform:
os: linux
Lifecycle:
install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk
run: |
export NODE_PATH=$NODE_PATH:{work:path}/node_modules
node {artifacts:path}/index.js
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip
Unarchive: ZIP
- URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js
- Platform:
os: windows
Lifecycle:
install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk
run: |
set "NODE_PATH=%NODE_PATH%;{work:path}/node_modules"
node {artifacts:path}/index.js
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip
Unarchive: ZIP
- URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js
For more information about how to develop and test components, see Create AWS IoT Greengrass components.
Connect to stream manager in application code
To connect to stream manager in your application, create an instance of
StreamManagerClient
from the Stream Manager SDK. This client connects to the stream
manager component on its default port 8088, or the port that you specify. For more information
about how to use StreamManagerClient
after you create an instance, see Use StreamManagerClient to work with streams.
Example: Connect to stream manager with default port
- Java
-
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
public class MyStreamManagerComponent {
void connectToStreamManagerWithDefaultPort() {
StreamManagerClient client = StreamManagerClientFactory.standard().build();
// Use the client.
}
}
- Python
-
from stream_manager import (
StreamManagerClient
)
def connect_to_stream_manager_with_default_port():
client = StreamManagerClient()
# Use the client.
- JavaScript
-
const {
StreamManagerClient
} = require('aws-greengrass-stream-manager-sdk');
function connectToStreamManagerWithDefaultPort() {
const client = new StreamManagerClient();
// Use the client.
}
Example: Connect to stream manager with non-default port
If you configure stream manager with a port other than the default, you must use interprocess communication to retrieve the port
from the component configuration.
The port
configuration parameter contains the value that you specify in
STREAM_MANAGER_SERVER_PORT
when you deploy stream manager.
- Java
-
void connectToStreamManagerWithCustomPort() {
EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection();
GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection);
List<String> keyPath = new ArrayList<>();
keyPath.add("port");
GetConfigurationRequest request = new GetConfigurationRequest();
request.setComponentName("aws.greengrass.StreamManager");
request.setKeyPath(keyPath);
GetConfigurationResponse response =
greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get();
String port = response.getValue().get("port").toString();
System.out.print("Stream Manager is running on port: " + port);
final StreamManagerClientConfig config = StreamManagerClientConfig.builder()
.serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build();
StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build();
// Use the client.
}
- Python
-
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
GetConfigurationRequest
)
from stream_manager import (
StreamManagerClient
)
TIMEOUT = 10
def connect_to_stream_manager_with_custom_port():
# Use IPC to get the port from the stream manager component configuration.
ipc_client = awsiot.greengrasscoreipc.connect()
request = GetConfigurationRequest()
request.component_name = "aws.greengrass.StreamManager"
request.key_path = ["port"]
operation = ipc_client.new_get_configuration()
operation.activate(request)
future_response = operation.get_response()
response = future_response.result(TIMEOUT)
stream_manager_port = str(response.value["port"])
# Use port to create a stream manager client.
stream_client = StreamManagerClient(port=stream_manager_port)
# Use the client.