AWS クラウド へのデータストリームエクスポート (CLI) - AWS IoT Greengrass

AWS IoT Greengrass Version 1 は機能更新を受信しなくなり、2023 年 6 月 30 日までセキュリティパッチとバグ修正のみ受信します。詳細については、「AWS IoT Greengrass V1 メンテナンスポリシー」を参照してください。重要な新機能新たなプラットフォームのサポートが追加された AWS IoT Greengrass Version 2 への移行を強くお勧めします。

AWS クラウド へのデータストリームエクスポート (CLI)

このチュートリアルでは、AWS CLI を使用して、ストリームマネージャーを有効にし AWS IoT Greengrass グループを設定およびデプロイする方法について説明します。グループには、ストリームマネージャーのストリームに書き込むユーザー定義 Lambda 関数が含まれています。ストリームマネージャーは、自動的に AWS クラウド にエクスポートされます。

ストリームマネージャーにより、大量のデータストリームの取り込み、処理、エクスポートが効率化され、信頼性が向上します。このチュートリアルでは、IoT データを消費する TransferStream Lambda 関数を作成します。Lambda 関数は、AWS IoT Greengrass Core SDK を使用して、ストリームマネージャーでストリームを作成し、ストリームでの読み取りと書き込みを行います。ストリームマネージャーは、ストリームを Kinesis Data Streams にエクスポートします。以下の図表に、このワークフローを示しています。


      ストリーム管理ワークフローの図。

このチュートリアルでは、ユーザー定義の Lambda 関数が AWS IoT Greengrass Core SDK 内の StreamManagerClient オブジェクトを使用してストリームマネージャーとやり取りする方法について説明します。簡単にするために、このチュートリアルで作成する Python Lambda 関数は、シミュレートされたデバイスデータを生成します。

AWS CLI の Greengrass コマンドを含む AWS IoT Greengrass API を使用してグループを作成すると、ストリームマネージャーはデフォルトで無効になります。コアでストリームマネージャーを有効にするには、システム GGStreamManager Lambda 関数に加え、新しい関数定義バージョンを参照するグループバージョンを含む関数定義バージョンを作成します。次に、フローをデプロイします。

前提条件

このチュートリアルを完了するには、以下が必要です。

  • Greengrass グループと Greengrass コア (v1.10 以降)。Greengrass のグループまたはコアを作成する方法については、「AWS IoT Greengrassで開始」を参照してください。開始方法チュートリアルには、AWS IoT Greengrass Core ソフトウェアのインストール手順も含まれています。

    注記

    ストリームマネージャーは OpenWrt ディストリビューションではサポートされていません。

  • コアデバイスにインストールされている Java 8 ランタイム (JDK 8)。

    • Debian ベースのディストリビューション (Raspbian を含む) または Ubuntu ベースのディストリビューションの場合は、次のコマンドを実行します。

      sudo apt install openjdk-8-jdk
    • Red Hat ベースのディストリビューション (Amazon Linux を含む) の場合は、次のコマンドを実行します。

      sudo yum install java-1.8.0-openjdk

      詳細については、OpenJDK ドキュメントの「How to download and install prebuilt OpenJDK packages」を参照してください。

  • AWS IoT Greengrass Core SDK for Python v1.5.0 以降。AWS IoT Greengrass Core SDK for Python で StreamManagerClient を使用するには、以下を行う必要があります。

    • Python 3.7 以降をコアデバイスにインストールします。

    • SDK とその依存関係を Lambda 関数デプロイパッケージに含めます。このチュートリアルでは、手順を説明しています。

    ヒント

    StreamManagerClient を Java または NodeJS で使用できます。コードの例については、GitHub の 「AWS IoT Greengrass Core SDK for Java」と「AWS IoT Greengrass Core SDK for Node.js」を参照してください。

  • Greengrass グループと同じ AWS リージョン の Amazon Kinesis Data Streams で作成された MyKinesisStream という名前の送信先ストリーム。詳細については、「Amazon Kinesis デベロッパーガイド」の「ストリームを作成する」を参照してください。

    注記

    このチュートリアルでは、ストリームマネージャーが Kinesis Data Streams にデータをエクスポートするため、AWS アカウント に課金されます。料金の詳細については、「Amazon Kinesis Data Streams の料金」を参照してください。

    料金が発生しないようにするには、Kinesis データストリームを作成せずにこのチュートリアルを実行します。この場合、ログを確認して、ストリームマネージャーがストリームを Kinesis Data Streams にエクスポートしようとしたことを確認します。

  • 次の例に示すように、ターゲットデータストリームで kinesis:PutRecords アクションを許可する Greengrass グループのロール に IAM ポリシーが追加されている。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }
  • コンピュータにインストールされて設定されている AWS CLI。詳細については、「AWS Command Line Interface ユーザーガイド」の「AWS Command Line Interface のインストール」および「AWS CLI の設定」を参照してください。

     

    このチュートリアルのコマンドの例は、Linux やその他の Unix ベースのシステム向けに書かれています。Windows を使用している場合、構文の違いについては、「AWS CLI のパラメータ値の指定」を参照してください。

    コマンドに JSON 文字列が含まれている場合、このチュートリアルでは、1 行形式の JSON の例を示しています。システムによっては、この形式を使用したほうが、コマンドの編集と実行を効率化できる場合があります。

 

このチュートリアルには、以下の手順の概要が含まれます。

このチュートリアルは完了までに約 30 分かかります。

ステップ 1: Lambda 関数デプロイパッケージを作成する

この手順では、Python 関数コードと依存関係を含む Lambda 関数デプロイパッケージを作成します。このパッケージは、AWS Lambda で Lambda 関数を作成するときに後でアップロードします。Lambda 関数は AWS IoT Greengrass Core SDK を使用して、ローカルストリームを作成および操作します。

注記

ユーザー定義の Lambda 関数は、AWS IoT Greengrass Core SDK を使用してストリームマネージャーと対話する必要があります。Greengrass ストリームマネージャーの要件の詳細については、「Greengrass ストリームマネージャーの要件」を参照してください。

  1. v1.5.0 以降の AWS IoT Greengrass Core SDK for Python をダウンロードします。

  2. ダウンロードしたパッケージを解凍し、SDK を取得します。SDK は greengrasssdk フォルダです。

  3. パッケージ依存関係をインストールして、Lambda 関数デプロイパッケージに SDK を含めます。

    1. requirements.txt ファイルが格納されている SDK ディレクトリに移動します。このファイルには、依存関係が一覧表示されます。

    2. SDK の依存関係をインストールします。例えば、次の pip コマンドを実行して、現在のディレクトリにインストールします。

      pip install --target . -r requirements.txt
  4. 以下の Python コード関数を transfer_stream.py というローカルファイルに保存します。

    ヒント

    Java と NodeJS を使用するコードの例については、GitHub の「AWS IoT Greengrass Core SDK for Java」と「AWS IoT Greengrass Core SDK for Node.js」を参照してください。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 以下の項目を transfer_stream_python.zip という名前のファイルに圧縮します。これが Lambda 関数デプロイパッケージです。

    • transfer_stream.py。アプリケーションロジック。

    • greengrasssdk。MQTT メッセージを発行する Python Greengrass Lambda 関数で必須のライブラリ。

      ストリームマネージャーの操作は、AWS IoT Greengrass Core SDK for Python のバージョン 1.5.0 以降で使用できます。

    • AWS IoT Greengrass Core SDK for Python にインストールした依存関係 (例えば、cbor2 ディレクトリ)。

    zip ファイルを作成するときは、これらの項目のみを含み、それらが含まれているフォルダは含みません。

ステップ 2: Lambda 関数を作成する

  1. 関数の作成時に ARN を渡せるように、IAM ロールを作成します。

    JSON Expanded
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }'
    JSON Single-line
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
    注記

    AWS IoT Greengrass はこのロールを使用しません。Greengrass Lambda 関数に対するアクセス許可は Greengrass グループロールで指定するためです。このチュートリアルでは、空のロールを作成します。

  2. 出力から Arn をコピーします。

  3. AWS Lambda API を使用して TransferStream 関数を作成します。以下のコマンドでは、zip ファイルが現在のディレクトリにあるとします。

    • role-arn を、コピーした Arn に置き換えます。

    aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role role-arn \ --handler transfer_stream.function_handler \ --runtime python3.7
  4. 関数のバージョンを発行します。

    aws lambda publish-version --function-name TransferStream --description 'First version'
  5. 発行されるバージョンのエイリアスを作成します。

    Greengrass グループは、Lambda 関数をエイリアス別 (推奨) またはバージョン別に参照できます。エイリアスを使用すると、関数コードを更新する時にサブスクリプションテーブルやグループ定義を変更する必要がないため、コード更新を簡単に管理できます。その代わりに、新しい関数バージョンにエイリアスを指定するだけで済みます。

    aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
    注記

    AWS IoT Greengrass は、$LATEST バージョンの Lambda エイリアスをサポートしていません。

  6. 出力から AliasArn をコピーします。この値は、関数を AWS IoT Greengrass に設定するときに使用します。

これで、AWS IoT Greengrass の関数を設定する準備ができました。

ステップ 3: 関数の定義とバージョンを作成する

この手順では、システム GGStreamManager Lambda 関数とユーザー定義の TransferStream Lambda 関数を参照する関数定義バージョンを作成します。AWS IoT Greengrass API を使用するときにストリームマネージャーを有効にするには、関数定義バージョンに GGStreamManager 関数が含まれている必要があります。

  1. システムとユーザー定義の Lambda 関数を含む初期バージョンを使用して関数定義を作成します。

    次の定義バージョンにより、ストリームマネージャーがデフォルトのパラメータ設定で有効になります。カスタム設定を構成するには、対応するストリームマネージャーのパラメータの環境変数を定義する必要があります。例については、「ストリームマネージャー (CLI) を有効化、無効化、設定するには」を参照してください。AWS IoT Greengrass は、省略されたパラメータにデフォルト設定を使用します。MemorySize は少なくとも 128000 でなければなりません。Pinnedtrue に設定する必要があります。

    注記

    存続期間の長い (または固定された) Lambda 関数は、AWS IoT Greengrass の起動後に自動的に起動し、独自のコンテナで実行し続けます。これはオンデマンド Lambda 関数とは対照的です。この関数は呼び出されたときに開始し、実行するタスクが残っていないときに停止します。詳細については、「Greengrass Lambda 関数のライフサイクル設定」を参照してください。

    • arbitrary-function-id を関数の名前 (stream-manager など) で置き換えます。

    • alias-arn を、TransferStream Lambda 関数のエイリアスの作成時にコピーした AliasArn に置き換えます。

     

    JSON expanded
    aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{ "Functions": [ { "Id": "arbitrary-function-id", "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": { "MemorySize": 128000, "Pinned": true, "Timeout": 3 } }, { "Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": { "Executable": "transfer_stream.function_handler", "MemorySize": 16000, "Pinned": true, "Timeout": 5 } } ] }'
    JSON single
    aws greengrass create-function-definition \ --name MyGreengrassFunctions \ --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
    注記

    Timeout は関数定義バージョンで必要ですが、GGStreamManager は使用しません。Timeout および、その他のグループレベルの設定については、「グループ固有の設定による Greengrass Lambda 関数の実行の制御」を参照してください。

  2. 出力から LatestVersionArn をコピーします。この値を使用して、Core にデプロイするグループバージョンに、関数定義バージョンを追加します。

ステップ 4: ロガー定義とバージョンの作成

グループのログ記録設定を定義します。このチュートリアルでは、コアデバイスのファイルシステムにログを書き込むように、AWS IoT Greengrass システムコンポーネントとユーザー定義の Lambda 関数を設定します。ログを使用して、発生する可能性のある問題のトラブルシューティングを行うことができます。詳細については、「AWS IoT Greengrass ログでのモニタリング」を参照してください。

  1. 初期バージョンを含む関数定義を作成します。

    JSON Expanded
    aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{ "Loggers": [ { "Id": "1", "Component": "GreengrassSystem", "Level": "INFO", "Space": 10240, "Type": "FileSystem" }, { "Id": "2", "Component": "Lambda", "Level": "INFO", "Space": 10240, "Type": "FileSystem" } ] }'
    JSON Single-line
    aws greengrass create-logger-definition \ --name "LoggingConfigs" \ --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
  2. 出力から ロガー定義の LatestVersionArn をコピーします。この値を使用して、コアにデプロイするグループバージョンに、ロガー定義のバージョンを追加します。

ステップ 5: コア定義バージョンの ARN を取得する

新しいグループバージョンに追加するコア定義バージョンの ARN を取得します。グループバージョンをデプロイするには、グループバージョンが、1 つのコアが含まれているコア定義バージョンを参照する必要があります。

  1. ターゲットの Greengrass グループとグループのバージョンの ID を取得します。この手順では、これが最新のグループおよびグループのバージョンであると仮定します。次のクエリは、最後に作成されたグループを返します。

    aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"

    または、名前でクエリを実行することもできます。グループ名は一意である必要はないため、複数のグループが返されることがあります。

    aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
    注記

    これらの値は AWS IoT コンソールにもあります。グループ ID は、グループの [Settings (設定)] ページに表示されます。グループバージョン ID は、グループの [Deployments] (デプロイ) タブに表示されます。

  2. 出力からターゲットグループの Id をコピーします。この情報は、Core 定義バージョンの取得時とグループのデプロイ時に使用します。

  3. 出力から LatestVersion をコピーします。これは、グループに追加された最後のバージョンの ID です。この情報は、Core 定義バージョンの取得時に使用します。

  4. Core 定義バージョンの ARN を取得します。

    1. グループバージョンを取得します。

      • group-id を、グループのコピー済み Id に置き換えます。

      • group-version-id を、グループのコピー済み LatestVersion に置き換えます。

      aws greengrass get-group-version \ --group-id group-id \ --group-version-id group-version-id
    2. 出力から CoreDefinitionVersionArn をコピーします。この値を使用して、コアにデプロイするグループバージョンにコア定義バージョンを追加します。

ステップ 6: グループバージョンを作成する

デプロイするすべてのエンティティを含むグループバージョンを作成する準備ができました。ここでは、各コンポーネントタイプのターゲットバージョンを参照するグループバージョンを作成します。このチュートリアルでは、コア定義バージョン、関数定義バージョン、ロガー定義バージョンが含まれます。

  1. グループバージョンを作成します。

    • group-id を、グループのコピー済み Id に置き換えます。

    • core-definition-version-arn を、Core 定義バージョンのコピー済み CoreDefinitionVersionArn に置き換えます。

    • 新しい関数定義バージョンにコピーした LatestVersionArnfunction-definition-version-arn を置き換えます。

    • logger-definition-version-arn を、新しいロガー定義バージョン用にコピーした LatestVersionArn に置き換えます。

    aws greengrass create-group-version \ --group-id group-id \ --core-definition-version-arn core-definition-version-arn \ --function-definition-version-arn function-definition-version-arn \ --logger-definition-version-arn logger-definition-version-arn
  2. 出力から Version をコピーします。これは新しいグループバージョンの ID です。

ステップ 7: デプロイを作成する

Core デバイスにグループをデプロイします。

  1. AWS IoT Greengrass Core が実行されていることを確認します。必要に応じて、Raspberry Pi のターミナルで以下のコマンドを実行します。

    1. デーモンが実行中であるかどうかを確認するには

      ps aux | grep -E 'greengrass.*daemon'

      出力に root/greengrass/ggc/packages/ggc-version/bin/daemon エントリが含まれる場合、デーモンは実行されています。

      注記

      パスのバージョンは、コアデバイスにインストールされている AWS IoT Greengrass Core ソフトウェアのバージョンによって異なります。

    2. 次のようにしてデーモンを開始します。

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. デプロイメントを作成する。

    • group-id を、グループのコピー済み Id に置き換えます。

    • 新しいグループバージョンにコピーした group-version-idVersion を置換えます。

    aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id group-id \ --group-version-id group-version-id
  3. 出力から DeploymentId をコピーします。

  4. デプロイのステータスを取得します。

    • group-id を、グループのコピー済み Id に置き換えます。

    • deployment-id を、デプロイのコピー済み DeploymentId に置き換えます。

    aws greengrass get-deployment-status \ --group-id group-id \ --deployment-id deployment-id

    ステータスが Success の場合、デプロイは成功しています。トラブルシューティングヘルプについては、AWS IoT Greengrass のトラブルシューティング を参照してください。

ステップ 8: アプリケーションのテスト

この TransferStream Lambda 関数は、シミュレートされたデバイスデータを生成します。ストリームマネージャーがターゲットの Kinesis データストリームにエクスポートするストリームにデータを書き込みます。

  1. Amazon Kinesis コンソールの [Kinesis data streams] (Kinesis データストリーム) で、[MyKinesisStream] を選択します。

    注記

    ターゲットの Kinesis データストリームを使用せずにチュートリアルを実行した場合は、ストリームマネージャーのログファイルを確認します (GGStreamManager)。エラーメッセージに export stream MyKinesisStream doesn't exist が含まれている場合、テストは成功します。このエラーは、サービスがストリームにエクスポートしようとしましたが、ストリームが存在しないことを意味します。

  2. [MyKinesisStream] ページで、[Monitoring (モニタリング)] を選択します。テストが成功すると、Put Records (レコードの配置) グラフにデータが表示されます。接続によっては、データが表示されるまでに 1 分かかることがあります。

    重要

    テストが終了したら、Kinesis データストリームを削除して、それ以上の料金が発生しないようにします。

    または、次のコマンドを実行して Greengrass デーモンを停止します。これにより、テストを続行する準備が整うまで、コアがメッセージを送信できなくなります。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. コアから TransferStream Lambda 関数を削除します。

    1. 新しいグループバージョンを作成するには、ステップ 6: グループバージョンを作成する に従います。ただし、create-group-version コマンド内の --function-definition-version-arn オプションを削除します。または、TransferStream Lambda 関数を含まない関数定義バージョンを作成します。

      注記

      デプロイされたグループのバージョンからシステム GGStreamManager Lambda 関数を省略すると、コアでのストリーム管理が無効になります。

    2. 新しいグループバージョンをデプロイするには、ステップ 7: デプロイを作成する に従います。

ロギング情報を表示したり、ストリームに関する問題をトラブルシューティングしたりするには、TransferStream および GGStreamManager 関数のログを確認します。ファイルシステムの AWS IoT Greengrass ログを読み取る root 権限が必要です。

  • TransferStream は、ログエントリを greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log に書き込みます。

  • GGStreamManager は、ログエントリを greengrass-root/ggc/var/log/system/GGStreamManager.log に書き込みます。

さらにトラブルシューティング情報が必要な場合は、Lambda ログレベルを DEBUG に設定し、新しいグループバージョンを作成してデプロイできます。

以下も参照してください。