翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
AWS IoT Core MQTT メッセージをパブリッシュ/サブスクライブする
AWS IoT Core MQTT メッセージング IPC サービスを使用すると、AWS IoT Core と MQTT メッセージの送受信を行えます。コンポーネントは AWS IoT Core にメッセージをパブリッシュできるとともに、トピックをサブスクライブして、他のソースからの MQTT メッセージに対応できます。AWS IoT Core の MQTT の実装の詳細については、「AWS IoT Core デベロッパーガイド」の「MQTT」を参照してください。
この MQTT メッセージング IPC サービスを使用すると、AWS IoT Core とメッセージを交換できるようになります。コンポーネント間でメッセージを交換する方法の詳細については、「ローカルメッセージをパブリッシュ/サブスクライブする」を参照してください。
最小 SDK バージョン
以下の表に、AWS IoT Core と MQTT メッセージの発行およびサブスクライブのやりとりを行う際に使用が必要となる AWS IoT Device SDK の最小バージョンを示します。
認証
AWS IoT Core MQTT メッセージングをカスタムコンポーネントで使用するには、コンポーネントでトピックに関するメッセージを送受信できるようにする承認ポリシーを定義する必要があります。承認ポリシーの定義については、「コンポーネントに IPC オペレーションの実行を許可する」を参照してください。
AWS IoT Core MQTT メッセージングの承認ポリシーには以下のプロパティがあります。
IPC サービス識別子: aws.greengrass.ipc.mqttproxy
操作 |
説明 |
リソース |
aws.greengrass#PublishToIoTCore
|
コンポーネントが、指定した MQTT トピックの AWS IoT Core にメッセージを発行できるようにします。
|
test/topic などのトピック文字列、または * ですべてのトピックへのアクセスを許可します。MQTT トピックのワイルドカード (# および + ) を使用して、複数のリソースを一致させることができます。
|
aws.greengrass#SubscribeToIoTCore
|
コンポーネントが、指定したトピックの AWS IoT Core からのメッセージをサブスクライブできるようにします。
|
test/topic などのトピック文字列、または * ですべてのトピックへのアクセスを許可します。MQTT トピックのワイルドカード (# および + ) を使用して、複数のリソースを一致させることができます。
|
*
|
コンポーネントが、指定したトピックの AWS IoT Core MQTT メッセージを発行およびサブスクライブできるようにします。
|
test/topic などのトピック文字列、または * ですべてのトピックへのアクセスを許可します。MQTT トピックのワイルドカード (# および + ) を使用して、複数のリソースを一致させることができます。
|
AWS IoT Core MQTT 承認ポリシーの MQTT ワイルドカード
AWS IoT Core MQTT IPC 承認ポリシーで MQTT ワイルドカードを使用できます。これによりコンポーネントは、承認ポリシーで許可するトピックフィルターに一致するトピックを発行およびサブスクライブできます。例えば、コンポーネントの承認ポリシーで test/topic/#
へのアクセス権が付与されている場合、コンポーネントは test/topic/#
をサブスクライブでき、test/topic/filter
を発行およびサブスクライブできます。
AWS IoT Core MQTT 承認ポリシーのレシピ変数
v2.6.0 以降の Greengrass nucleus を使用している場合、承認ポリシーの {iot:thingName}
recipe 変数を使用できます。この機能を使用すると、コアデバイスのグループに対して 1 つの承認ポリシーを設定できます。各コアデバイスは自身の名前を含むトピックにのみアクセスできます。例えば、コンポーネントに次のトピックリソースへのアクセスを許可できます。
devices/{iot:thingName}/messages
詳細については、「レシピ変数 と マージ更新で recipe 変数を使用する」を参照してください。
承認ポリシーの例
次の承認ポリシーの例を参照して、コンポーネントの承認ポリシー設定の参考にできます。
例 アクセスが制限されていない承認ポリシーの例
以下の承認ポリシーの例では、コンポーネントがすべてのトピックを公開およびサブスクライブすることを許可します。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.MyIoTCorePubSubComponent:mqttproxy:1:
policyDescription: Allows access to publish/subscribe to all topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
例 アクセスが制限されている承認ポリシーの例
次の承認ポリシーの例では、コンポーネントが、factory/1/events
および factory/1/actions
という名前の 2 つのトピックを公開およびサブスクライブすることを許可します。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1":
policyDescription: Allows access to publish/subscribe to factory 1 topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- factory/1/actions
- factory/1/events
例 コアデバイスのグループに対する承認ポリシーの例
次の承認ポリシーの例は、コンポーネントが実行されているコアデバイスの名前を含むトピックを、コンポーネントが発行およびサブスクライブできるようにします。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1":
policyDescription: Allows access to publish/subscribe to all topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- factory/1/devices/{iot:thingName}/controls
PublishToIoTCore
トピックに関して、AWS IoT Core に MQTT メッセージを発行します。
MQTT メッセージを AWS IoT Core に公開する場合、1 秒あたり 100 トランザクションのクォータがあります。このクォータを超えると、メッセージは Greengrass デバイスでの処理のためにキューに入れられます。また、1 秒あたり 512 KB のデータというクォータと、アカウント全体で 1 秒あたり 20,000 パブリッシャー (一部の AWS リージョン では 2,000 件) のクォータがあります。AWS IoT Core の MQTT メッセージブローカー制限の詳細については、「AWS IoT Core メッセージブローカーとプロトコルの制限とクォータ」を参照してください。
これらのクォータを超えると、Greengrass デバイスはメッセージの公開を AWS IoT Core に制限します。メッセージはメモリ内のスプーラに保存されます。デフォルトでは、スプーラに割り当てられるメモリは 2.5 MB です。スプーラがいっぱいになると、新しいメッセージは拒否されます。スプーラのサイズを増やすことができます。詳細については、Greengrass nucleus ドキュメントの「構成」を参照してください。スプーラがいっぱいになり、割り当てられるメモリを増やす必要がないように、公開要求は 1 秒あたり 100 要求以下に制限してください。
アプリケーションがメッセージをより高いレートで送信したり、より大きなメッセージを送信したりする必要がある場合は、ストリームマネージャー を使用して Kinesis データストリームにメッセージを送信することを検討してください。ストリームマネージャーコンポーネントは、大量のデータを AWS クラウド に転送するように設計されています。詳細については、「Greengrass コアデバイスでのデータストリームの管理」を参照してください。
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topicName
(Python: topic_name
)
-
メッセージの発行先として指定するトピック。
qos
-
使用する MQTT QoS。この列挙型 (QOS
) には以下の値があります。
payload
-
(オプション) BLOB としてのメッセージペイロード。
MQTT 5を使用する際、Greengrass nucleus の v2.10.0 以降で以下の機能が使用できます。MQTT 3.1.1 を使用している場合、これらの機能は無視されます。これらの機能を利用するために必要な AWS IoT デバイス SDK の最小バージョンは、次の表のとおりです。
payloadFormat
-
(オプション) メッセージペイロードのフォーマット。payloadFormat
を設定しない場合、タイプは BYTES
とみなされます。この列挙型には以下の値があります。
retain
-
(オプション) 発行時に MQTT 保持オプションを true
に設定するか否かを示します。
userProperties
-
(オプション) 送信するアプリケーション固有の UserProperty
オブジェクトのリストです。UserProperty
オブジェクトは次のように定義されます。
UserProperty:
key: string
value: string
messageExpiryIntervalSeconds
-
(オプション) メッセージが期限切れとなり、サーバーによって削除されるまでの秒数です。この値を設定しない場合、メッセージに有効期限は設定されません。
correlationData
-
(オプション) リクエストに付加される情報で、リクエストとレスポンスの関連付けに使用できます。
responseTopic
-
(オプション) レスポンスメッセージに使用するトピックです。
contentType
-
(オプション) メッセージのコンテンツタイプを示すアプリケーション固有の識別子です。
レスポンス
このオペレーションはレスポンスで一切の情報を提供しません。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
- Java (IPC client V2)
-
例:メッセージを発行する
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import java.nio.charset.StandardCharsets;
public class PublishToIoTCore {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest()
.withTopicName(topic)
.withPayload(message.getBytes(StandardCharsets.UTF_8))
.withQos(qos));
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
- Python (IPC client V2)
-
例:メッセージを発行する
この例では、AWS IoT Device SDK for Python v2 のバージョン 1.5.4 以降を使用していることを前提としています。
import awsiot.greengrasscoreipc.clientv2 as clientV2
topic = 'my/topic'
qos = '1'
payload = 'Hello, World'
ipc_client = clientV2.GreengrassCoreIPCClientV2()
resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload)
ipc_client.close()
- Java (IPC client V1)
-
例:メッセージを発行する
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PublishToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
PublishToIoTCoreResponseHandler responseHandler =
PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos);
CompletableFuture<PublishToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
throw e;
}
}
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) {
PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest();
publishToIoTCoreRequest.setTopicName(topic);
publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8));
publishToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty());
}
}
- Python (IPC client V1)
-
例:メッセージを発行する
この例では、AWS IoT Device SDK for Python v2 のバージョン 1.5.4 以降を使用していることを前提としています。
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
topic = "my/topic"
message = "Hello, World"
qos = QOS.AT_LEAST_ONCE
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = bytes(message, "utf-8")
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
- C++
-
例:メッセージを発行する
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String message("Hello, World!");
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
- JavaScript
-
例:メッセージを発行する
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
class PublishToIoTCore {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.publishToIoTCore().then(r => console.log("Started workflow"));
}
private async publishToIoTCore() {
try {
const request: PublishToIoTCoreRequest = {
topicName: this.topic,
qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case
}
this.ipcClient = await getIpcClient();
await this.ipcClient.publishToIoTCore(request);
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const publishToIoTCore = new PublishToIoTCore();
SubscribeToIoTCore
トピックまたはトピックフィルターに関する AWS IoT Core からの MQTT メッセージをサブスクライブします。コンポーネントがライフサイクルの終わりに達すると、AWS IoT Greengrass Core ソフトウェアはサブスクリプションを削除します。
このオペレーションはサブスクリプションオペレーションで、イベントメッセージのストリームをサブスクライブするというものです。このオペレーションを使用するには、イベントメッセージ、エラー、およびストリームクロージャを処理する関数を使用して、ストリームレスポンスハンドラーを定義します。(詳細については、IPC イベントストリームへのサブスクライブ を参照してください)。
イベントメッセージの種類: IoTCoreMessage
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topicName
(Python: topic_name
)
-
サブスクライブ先のトピック。MQTT トピックのワイルドカード (#
および +
) を使用して、複数のトピックにサブスクライブできます。
qos
-
使用する MQTT QoS。この列挙型 (QOS
) には以下の値があります。
レスポンス
このオペレーションのレスポンスには以下の情報が含まれます。
messages
-
MQTT メッセージのストリーム。このオブジェクト (IoTCoreMessage
) には、次の情報が含まれます。
message
-
MQTT メッセージ。このオブジェクト (MQTTMessage
) には、次の情報が含まれます。
topicName
(Python: topic_name
)
-
メッセージが発行されたトピック。
payload
-
(オプション) BLOB としてのメッセージペイロード。
MQTT 5を使用する際、Greengrass nucleus の v2.10.0 以降で以下の機能が使用できます。MQTT 3.1.1 を使用している場合、これらの機能は無視されます。これらの機能を利用するために必要な AWS IoT デバイス SDK の最小バージョンは、次の表のとおりです。
payloadFormat
-
(オプション) メッセージペイロードのフォーマット。payloadFormat
を設定しない場合、タイプは BYTES
とみなされます。この列挙型には以下の値があります。
retain
-
(オプション) 発行時に MQTT 保持オプションを true
に設定するか否かを示します。
userProperties
-
(オプション) 送信するアプリケーション固有の UserProperty
オブジェクトのリストです。UserProperty
オブジェクトは次のように定義されます。
UserProperty:
key: string
value: string
messageExpiryIntervalSeconds
-
(オプション) メッセージが期限切れとなり、サーバーによって削除されるまでの秒数です。この値を設定しない場合、メッセージに有効期限は設定されません。
correlationData
-
(オプション) リクエストに付加される情報で、リクエストとレスポンスの関連付けに使用できます。
responseTopic
-
(オプション) レスポンスメッセージに使用するトピックです。
contentType
-
(オプション)メッセージのコンテンツタイプのアプリケーション固有の識別子です。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
- Java (IPC client V2)
-
例:メッセージをサブスクライブする
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class SubscribeToIoTCore {
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage ->
System.out.printf("Received new message on topic %s: %s%n",
ioTCoreMessage.getMessage().getTopicName(),
new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8));
Optional<Function<Throwable, Boolean>> onStreamError =
Optional.of(e -> {
System.err.println("Received a stream error.");
e.printStackTrace();
return false;
});
Optional<Runnable> onStreamClosed = Optional.of(() ->
System.out.println("Subscribe to IoT Core stream closed."));
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest()
.withTopicName(topic)
.withQos(qos);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler>
streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed);
streamingResponse.getResponse();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
while (true) {
Thread.sleep(10000);
}
// To stop subscribing, close the stream.
streamingResponse.getHandler().closeStream();
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
- Python (IPC client V2)
-
例: メッセージをサブスクライブする
この例では、AWS IoT Device SDK for Python v2 のバージョン 1.5.4 以降を使用していることを前提としています。
import threading
import traceback
import awsiot.greengrasscoreipc.clientv2 as clientV2
topic = 'my/topic'
qos = '1'
def on_stream_event(event):
try:
topic_name = event.message.topic_name
message = str(event.message.payload, 'utf-8')
print(f'Received new message on topic {topic_name}: {message}')
except:
traceback.print_exc()
def on_stream_error(error):
# Return True to close stream, False to keep stream open.
return True
def on_stream_closed():
pass
ipc_client = clientV2.GreengrassCoreIPCClientV2()
resp, operation = ipc_client.subscribe_to_iot_core(
topic_name=topic,
qos=qos,
on_stream_event=on_stream_event,
on_stream_error=on_stream_error,
on_stream_closed=on_stream_closed
)
# Keep the main thread alive, or the process will exit.
event = threading.Event()
event.wait()
# To stop subscribing, close the operation stream.
operation.close()
ipc_client.close()
- Java (IPC client V1)
-
例:メッセージをサブスクライブする
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
StreamResponseHandler<IoTCoreMessage> streamResponseHandler =
new SubscriptionResponseHandler();
SubscribeToIoTCoreResponseHandler responseHandler =
SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos,
streamResponseHandler);
CompletableFuture<SubscribeToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
throw e;
}
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) {
SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest();
subscribeToIoTCoreRequest.setTopicName(topic);
subscribeToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest,
Optional.of(streamResponseHandler));
}
public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> {
@Override
public void onStreamEvent(IoTCoreMessage ioTCoreMessage) {
try {
String topic = ioTCoreMessage.getMessage().getTopicName();
String message = new String(ioTCoreMessage.getMessage().getPayload(),
StandardCharsets.UTF_8);
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false;
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to IoT Core stream closed.");
}
}
}
- Python (IPC client V1)
-
例:メッセージをサブスクライブする
この例では、AWS IoT Device SDK for Python v2 のバージョン 1.5.4 以降を使用していることを前提としています。
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
try:
message = str(event.message.payload, "utf-8")
topic_name = event.message.topic_name
# Handle message.
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
# Handle error.
return True # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
# Handle close.
pass
topic = "my/topic"
qos = QOS.AT_MOST_ONCE
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
# Keep the main thread alive, or the process will exit.
while True:
time.sleep(10)
# To stop subscribing, close the operation stream.
operation.close()
- C++
-
例:メッセージをサブスクライブする
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string topicName = message.value().GetTopicName().value().c_str();
// Handle message.
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例:メッセージをサブスクライブする
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToIoTCore {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToIoTCore().then(r => console.log("Started workflow"));
}
private async subscribeToIoTCore() {
try {
const request: SubscribeToIoTCoreRequest = {
topicName: this.topic,
qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case
}
this.ipcClient = await getIpcClient();
const streamingOperation = this.ipcClient.subscribeToIoTCore(request);
streamingOperation.on('message', (message: IoTCoreMessage) => {
// parse the message depending on your use cases, e.g.
if (message.message && message.message.payload) {
const receivedMessage = message.message.payload.toString();
}
});
streamingOperation.on('streamError', (error : RpcError) => {
// define your own error handling logic
});
streamingOperation.on('ended', () => {
// define your own logic
});
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToIoTCore = new SubscribeToIoTCore();
例
コンポーネントの AWS IoT Core MQTT IPC サービスの使用方法については、以下の例を参照してください。
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCorePublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes MQTT messages to IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCorePublisherCpp:mqttproxy:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_iotcore_publisher"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCorePublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes MQTT messages to IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCorePublisherCpp:mqttproxy:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_iotcore_publisher"
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher
Permission:
Execute: OWNER
以下の C++ アプリケーションの例は、AWS IoT Core MQTT IPC サービスを使用して AWS IoT Core にメッセージを発行する方法を示します。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCoreSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCoreSubscriberCpp:mqttproxy:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_iotcore_subscriber"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCoreSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to MQTT messages from IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCoreSubscriberCpp:mqttproxy:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_iotcore_subscriber"
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber
Permission:
Execute: OWNER
以下の C++ アプリケーションの例は、AWS IoT Core MQTT IPC サービスを使用して AWS IoT Core からメッセージをサブスクライブする方法を示します。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}