Kinesis Firehose - AWS IoT Greengrass

AWS IoT Greengrass Version 1 は 2023 年 6 月 30 日に延長ライフフェーズに参加しました。詳細については、「AWS IoT Greengrass V1 メンテナンスポリシー」を参照してください。この日以降、 AWS IoT Greengrass V1 は機能、機能強化、バグ修正、またはセキュリティパッチを提供する更新をリリースしません。で実行されるデバイスは中断 AWS IoT Greengrass V1 されず、引き続き動作し、クラウドに接続します。に移行することを強くお勧めします。 AWS IoT Greengrass Version 2これにより、重要な新機能が追加され、プラットフォーム のサポートが追加されます

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Firehose

Kinesis Firehose コネクタは、Amazon Data Firehose 配信ストリームを介して、Amazon S3、Amazon Redshift、Amazon OpenSearch Service などの送信先にデータを発行します。

このコネクタは Kinesis 配信ストリームのデータプロデューサーです。MQTT トピックの入力データを受信し、指定された配信ストリームにデータを送信します。その後、配信ストリームは、設定されたターゲット (S3 バケットなど) にデータレコードを送信します。

このコネクタには、次のバージョンがあります。

バージョン

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

バージョンの変更については、「Changelog」を参照してください。

要件

このコネクタには以下の要件があります。

Version 4 - 5
  • AWS IoT Greengrass Core ソフトウェア v1.9.3 以降。

  • Python バージョン 3.7 または 3.8 が Core デバイスにインストールされ、PATH 環境変数に追加されている。

    注記

    Python 3.8 を使用するには、次のコマンドを実行して、Python 3.7 のデフォルトのインストールフォルダからインストール済みの Python 3.8 バイナリへのシンボリックリンクを作成します。

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    これにより、 AWS IoT Greengrass の Python 要件を満たすようにデバイスが設定されます。

  • 設定された Kinesis 配信ストリーム。詳細については、「Amazon Kinesis Firehose デベロッパーガイド」の「Amazon Data Firehose 配信ストリームの作成」を参照してください。 Amazon Kinesis Firehose

  • 以下の IAM ポリシーの例に示すように、ターゲット配信ストリームに対する firehose:PutRecord アクションと firehose:PutRecordBatch アクションを許可するために Greengrass グループロールが設定されている。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    このコネクタにより、入力メッセージペイロードのデフォルトの配信ストリームを動的に上書きできるようになります。実装でこの機能を使用する場合、IAM ポリシーには、すべてのターゲットストリームがリソースとして含まれている必要があります。リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。

    グループロール要件では、必要なアクセス許可を付与するようにロールを設定し、ロールがグループに追加されていることを確認する必要があります。詳細については、Greengrass グループロールの管理 (コンソール)またはGreengrass グループロールの管理 (CLI)を参照してください。

Versions 2 - 3
  • AWS IoT Greengrass Core ソフトウェア v1.7 以降。

  • Python バージョン 2.7 が Core デバイスにインストールされ、PATH 環境変数に追加されている。

  • 設定された Kinesis 配信ストリーム。詳細については、「Amazon Kinesis Firehose デベロッパーガイド」の「Amazon Data Firehose 配信ストリームの作成」を参照してください。 Amazon Kinesis Firehose

  • 以下の IAM ポリシーの例に示すように、ターゲット配信ストリームに対する firehose:PutRecord アクションと firehose:PutRecordBatch アクションを許可するために Greengrass グループロールが設定されている。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    このコネクタにより、入力メッセージペイロードのデフォルトの配信ストリームを動的に上書きできるようになります。実装でこの機能を使用する場合、IAM ポリシーには、すべてのターゲットストリームがリソースとして含まれている必要があります。リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。

    グループロール要件では、必要なアクセス許可を付与するようにロールを設定し、ロールがグループに追加されていることを確認する必要があります。詳細については、Greengrass グループロールの管理 (コンソール)またはGreengrass グループロールの管理 (CLI)を参照してください。

Version 1
  • AWS IoT Greengrass Core ソフトウェア v1.7 以降。

  • Python バージョン 2.7 が Core デバイスにインストールされ、PATH 環境変数に追加されている。

  • 設定された Kinesis 配信ストリーム。詳細については、「Amazon Kinesis Firehose デベロッパーガイド」の「Amazon Data Firehose 配信ストリームの作成」を参照してください。 Amazon Kinesis Firehose

  • 以下の IAM ポリシーの例に示すように、ターゲット配信ストリームに対する firehose:PutRecord アクションを許可するために Greengrass グループロールが設定されている。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    このコネクタにより、入力メッセージペイロードのデフォルトの配信ストリームを動的に上書きできるようになります。実装でこの機能を使用する場合、IAM ポリシーには、すべてのターゲットストリームがリソースとして含まれている必要があります。リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。

    グループロール要件では、必要なアクセス許可を付与するようにロールを設定し、ロールがグループに追加されていることを確認する必要があります。詳細については、Greengrass グループロールの管理 (コンソール)またはGreengrass グループロールの管理 (CLI)を参照してください。

コネクタパラメータ

このコネクタには、以下のパラメータが用意されています。

Versions 5
DefaultDeliveryStreamArn

データの送信先となるデフォルトの Firehose 配信ストリームの ARN。ターゲットストリームは、入力メッセージペイロードの delivery_stream_arn プロパティによって上書きできます。

注記

グループロールを使用して、すべてのターゲット配信ストリームで適切なアクションを許可する必要があります。詳細については、「要件」を参照してください。

AWS IoT コンソールでの表示名: デフォルトの配信ストリーム ARN

必須: true

タイプ: string

有効なパターン: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

同じ配信ストリームの新しいレコードを拒否する前に、メモリに保持するレコードの最大数。最小値は 2000 です。

AWS IoT コンソールの表示名: バッファするレコードの最大数 (ストリームあたり)

必須: true

タイプ: string

有効なパターン: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

このコネクタに割り当てるメモリ量 (KB 単位)。

AWS IoT コンソールの表示名: メモリサイズ

必須: true

タイプ: string

有効なパターン: ^[0-9]+$

PublishInterval

Firehose にレコードを発行する間隔 (秒単位)。バッチ適用を無効にするには、この値を 0 に設定します。

AWS IoT コンソールでの表示名: 発行間隔

必須: true

タイプ: string

有効な値: 0 - 900

有効なパターン: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

このコネクタのコンテナ化モード。デフォルトは です。つまりGreengrassContainer、コネクタは AWS IoT Greengrass コンテナ内の分離されたランタイム環境で実行されます。

注記

グループの既定のコンテナ化設定は、コネクタには適用されません。

AWS IoT コンソールの表示名: コンテナ分離モード

必須: false

タイプ: string

有効な値: GreengrassContainer または NoContainer

有効なパターン: ^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

データの送信先となるデフォルトの Firehose 配信ストリームの ARN。ターゲットストリームは、入力メッセージペイロードの delivery_stream_arn プロパティによって上書きできます。

注記

グループロールを使用して、すべてのターゲット配信ストリームで適切なアクションを許可する必要があります。詳細については、「要件」を参照してください。

AWS IoT コンソールでの表示名: デフォルトの配信ストリーム ARN

必須: true

タイプ: string

有効なパターン: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

同じ配信ストリームの新しいレコードを拒否する前に、メモリに保持するレコードの最大数。最小値は 2000 です。

AWS IoT コンソールの表示名: バッファするレコードの最大数 (ストリームあたり)

必須: true

タイプ: string

有効なパターン: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

このコネクタに割り当てるメモリ量 (KB 単位)。

AWS IoT コンソールの表示名: メモリサイズ

必須: true

タイプ: string

有効なパターン: ^[0-9]+$

PublishInterval

Firehose にレコードを発行する間隔 (秒単位)。バッチ適用を無効にするには、この値を 0 に設定します。

AWS IoT コンソールでの表示名: 発行間隔

必須: true

タイプ: string

有効な値: 0 - 900

有効なパターン: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

データの送信先となるデフォルトの Firehose 配信ストリームの ARN。ターゲットストリームは、入力メッセージペイロードの delivery_stream_arn プロパティによって上書きできます。

注記

グループロールを使用して、すべてのターゲット配信ストリームで適切なアクションを許可する必要があります。詳細については、「要件」を参照してください。

AWS IoT コンソールでの表示名: デフォルトの配信ストリーム ARN

必須: true

タイプ: string

有効なパターン: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

サンプルコネクタを作成する (AWS CLI)

以下の CLI コマンドは、コネクタを含む初期バージョンで ConnectorDefinition を作成します。

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

AWS IoT Greengrass コンソールでは、グループのコネクタページからコネクタを追加できます。詳細については、「Greengrass コネクタの開始方法 (コンソール)」を参照してください。

入力データ

このコネクタは、MQTT トピックのストリームコンテンツを受け取り、そのコンテンツをターゲット配信ストリームに送信します。受け取る入力データは以下の 2 種類です。

  • kinesisfirehose/message トピックの JSON データ。

  • kinesisfirehose/message/binary/# トピックのバイナリデータ。

Versions 2 - 5
トピックのフィルター: kinesisfirehose/message

このトピックを使用して、JSON データを含むメッセージを送信します。

メッセージのプロパティ
request

配信ストリームに送信するデータ、およびターゲット配信ストリーム (デフォルトストリームと異なる場合)。

必須: true

タイプ: object。以下のプロパティを含みます。

data

配信ストリームに送信するデータ。

必須: true

タイプ: string

delivery_stream_arn

ターゲット Kinesis 配信ストリームの ARN。デフォルトの配信ストリームを上書きするには、このプロパティを含めます。

必須: false

タイプ: string

有効なパターン: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

リクエストの任意の ID。このプロパティでは、入力リクエストを出力レスポンスにマッピングします。指定すると、レスポンスオブジェクトの id プロパティはこの値に設定されます。この機能を使用しない場合は、このプロパティを省略するか空の文字列を指定できます。

必須: false

タイプ: string

有効なパターン: .*

入力例
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

トピックのフィルター: kinesisfirehose/message/binary/#

このトピックを使用して、バイナリデータを含むメッセージを送信します。コネクタではバイナリデータは解析されません。データはそのままストリーミングされます。

入力リクエストを出力レスポンスにマッピングするには、メッセージトピックの # ワイルドカードを任意のリクエスト ID に置き換えます。例えば、メッセージを kinesisfirehose/message/binary/request123 に発行する場合、レスポンスオブジェクトの id プロパティを request123 に設定します。

リクエストをレスポンスにマッピングしない場合は、メッセージを kinesisfirehose/message/binary/ に発行できます。末尾にスラッシュを含めてください。

Version 1
トピックのフィルター: kinesisfirehose/message

このトピックを使用して、JSON データを含むメッセージを送信します。

メッセージのプロパティ
request

配信ストリームに送信するデータ、およびターゲット配信ストリーム (デフォルトストリームと異なる場合)。

必須: true

タイプ: object。以下のプロパティを含みます。

data

配信ストリームに送信するデータ。

必須: true

タイプ: string

delivery_stream_arn

ターゲット Kinesis 配信ストリームの ARN。デフォルトの配信ストリームを上書きするには、このプロパティを含めます。

必須: false

タイプ: string

有効なパターン: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

リクエストの任意の ID。このプロパティでは、入力リクエストを出力レスポンスにマッピングします。指定すると、レスポンスオブジェクトの id プロパティはこの値に設定されます。この機能を使用しない場合は、このプロパティを省略するか空の文字列を指定できます。

必須: false

タイプ: string

有効なパターン: .*

入力例
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

トピックのフィルター: kinesisfirehose/message/binary/#

このトピックを使用して、バイナリデータを含むメッセージを送信します。コネクタではバイナリデータは解析されません。データはそのままストリーミングされます。

入力リクエストを出力レスポンスにマッピングするには、メッセージトピックの # ワイルドカードを任意のリクエスト ID に置き換えます。例えば、メッセージを kinesisfirehose/message/binary/request123 に発行する場合、レスポンスオブジェクトの id プロパティを request123 に設定します。

リクエストをレスポンスにマッピングしない場合は、メッセージを kinesisfirehose/message/binary/ に発行できます。末尾にスラッシュを含めてください。

出力データ

このコネクターは、MQTT トピックで出力データとしてステータス情報を発行します。

Versions 2 - 5
サブスクリプションのトピックフィルター

kinesisfirehose/message/status

出力例

応答には、バッチで送信される各データレコードのステータスが含まれます。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
注記

コネクタが再試行可能なエラー (接続エラーなど) を検出した場合は、次のバッチ処理で再発行を試します。エクスポネンシャルバックオフは AWS SDK によって処理されます。再試行可能なエラーで失敗したリクエストは、今後の発行するためにキューの最後に追加されます。

Version 1
サブスクリプションのトピックフィルター

kinesisfirehose/message/status

出力例: 成功
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
出力例: 失敗
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

使用例

コネクタの試用に利用できる Python 3.7 Lambda 関数の例を設定するには、次のステップ (概要) を使用します。

注記
  1. コネクタの要件を満たしていることを確認します。

    グループロール要件では、必要なアクセス許可を付与するようにロールを設定し、ロールがグループに追加されていることを確認する必要があります。詳細については、Greengrass グループロールの管理 (コンソール)またはGreengrass グループロールの管理 (CLI)を参照してください。

  2. 入力データをコネクタに送信する Lambda 関数を作成して発行します。

    サンプルコードを PY ファイルとして保存します。AWS IoT Greengrass Core SDK for Python をダウンロードして解凍します。次に、PY ファイルとルートレベルの greengrasssdk フォルダを含む zip パッケージを作成します。この zip パッケージは、 AWS Lambda にアップロードするデプロイパッケージです。

    Python 3.7 Lambda 関数を作成したら、関数バージョンを公開し、エイリアスを作成します。

  3. Greengrass グループを設定します。

    1. エイリアスで Lambda 関数を追加します (推奨)。Lambda ライフサイクルを長期間有効に (または CLI で "Pinned": true に) 設定します。

    2. コネクタを追加し、そのパラメータを設定します。

    3. コネクタが JSON 入力データを受信し、サポートされているトピックフィルターで出力データを送信できるようにするサブスクリプションを追加します。

      • Lambda 関数をソースに、コネクタをターゲットに設定し、サポートされている入力トピックフィルターを使用します。

      • コネクタをソースとして、 AWS IoT Core をターゲットとして設定し、サポートされている出力トピックフィルターを使用します。このサブスクリプションを使用して、 AWS IoT コンソールでステータスメッセージを表示します。

  4. グループをデプロイします。

  5. AWS IoT コンソールのテストページで、出力データトピックにサブスクライブしてコネクタからのステータスメッセージを表示します。この例の Lambda 関数は長期間有効であり、グループがデプロイされた直後にメッセージの送信を開始します。

    テストが終了したら、Lambda ライフサイクルをオンデマンドに (または CLI で "Pinned": false に) 設定して、グループをデプロイできます。これにより、関数がメッセージの送信を停止します。

次の例では、Lambda 関数で入力メッセージをコネクタに送信します。このメッセージには JSON データが含まれています。

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

ライセンス

Kinesis Firehose コネクタには、以下のサードパーティーのソフトウェアおよびライセンスが含まれています。

このコネクタは、Greengrass Core ソフトウェアライセンス契約に従ってリリースされます。

変更ログ

次の表に、コネクタの各バージョンにおける変更点を示します。

バージョン

変更

5

コネクタのコンテナ化モードを設定するための IsolationMode パラメータが追加されました。

4

Lambda ランタイムを Python 3.7 にアップグレードしたことで、ランタイム要件が変更。

3

過度のログ記録を減らすための修正、およびその他の軽微なバグの修正。

2

バッチ処理されたデータレコードを指定された間隔で Firehose に送信するためのサポートを追加しました。

  • また、グループロールの firehose:PutRecordBatch アクションが必要になります。

  • 新しい MemorySizeDeliveryStreamQueueSize、および PublishInterval パラメータ。

  • 出力メッセージには、公開されたデータレコードのステータス応答の配列が含まれます。

1

初回リリース。

Greengrass グループには、一度に 1 つのバージョンのコネクタしか含めることができません。コネクタのバージョンのアップグレードについては、「コネクタのバージョンのアップグレード」を参照してください。

以下も参照してください。