パブリッシュ/サブスクライブ AWS IoT Core MQTT メッセージ - AWS IoT Greengrass

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

パブリッシュ/サブスクライブ AWS IoT Core MQTT メッセージ

- AWS IoT Core MQTT メッセージング IPC サービスでは、MQTT メッセージを送受信できます AWS IoT Core 。コンポーネントはメッセージを AWS IoT Core し、トピックを購読して、他のソースからの MQTT メッセージを処理します。の詳細については、「」を参照してください。 AWS IoT Core MQTT の実装については、MQTT() AWS IoT Core 開発者ガイド

注記

この MQTT メッセージング IPC サービスでは、 AWS IoT Core 。コンポーネント間でメッセージを交換する方法の詳細については、「」を参照してください。ローカルメッセージのパブリッシュ/サブスクライブ

Authorization

を使用するには AWS IoT Core カスタムコンポーネントの MQTT メッセージングを使用する場合は、コンポーネントがトピックに関するメッセージを送受信できるようにする承認ポリシーを定義する必要があります。承認ポリシーの定義については、「」を参照してください。IPC 操作を実行するためのコンポーネントの承認

の承認ポリシー AWS IoT Core MQTT メッセージングには以下のプロパティがあります。

IPC サービス識別子: aws.greengrass.ipc.mqttproxy

オペレーション 説明 リソース

aws.greengrass#PublishToIoTCore

コンポーネントがにメッセージを発行できるようにします。 AWS IoT Core を、指定した MQTT トピックで参照します。

のようなトピック文字列。test/topic, または*をクリックして、すべてのトピックへのアクセスを許可します。このトピック文字列は、MQTT トピックワイルドカード (#および+).

aws.greengrass#SubscribeToIoTCore

コンポーネントが AWS IoT Core を、指定したトピックで参照します。

のようなトピック文字列。test/topic, または*をクリックして、すべてのトピックへのアクセスを許可します。このトピック文字列は、MQTT トピックワイルドカード (#および+).

*

コンポーネントのパブリッシュとサブスクライブを許可します AWS IoT Core 指定したトピックの MQTT メッセージ

のようなトピック文字列。test/topic, または*をクリックして、すべてのトピックへのアクセスを許可します。このトピック文字列は、MQTT トピックワイルドカード (#および+).

例 認可ポリシーの例

次の認可ポリシー例では、コンポーネントがすべてのトピックをパブリッシュおよびサブスクライブすることを許可します。

{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }

PublishToIoTCore

MQTT メッセージを AWS IoT Core トピック。

Request

このオペレーションのリクエストには以下のパラメータがあります。

topicName

メッセージを発行するトピックです。

qos

使用する MQTT QoS この列挙、QOSには、以下の値があります。

  • AT_MOST_ONCE— QoS 0。MQTT メッセージは最大で 1 回配信されます。

  • AT_LEAST_ONCE— QoS 1. MQTT メッセージは少なくとも 1 回配信されます。

payload

(任意)メッセージペイロードを BLOB として指定します。

Response

この操作は、応答に情報を提供しません。

Examples

次の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。

Java

例: メッセージの発行

String topic = "my/topic"; String message = "Hello, World!"; QOS qos = QOS.AT_LEAST_ONCE; PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()).getResponse().get();
Python

例: メッセージの発行

注記

この例では、バージョン 1.5.4 以降を使用していることを前提としています。 AWS IoT Device SDK for Python v2. SDK バージョン 1.5.3 を使用している場合は、」を使用する AWS IoT Device SDK Python v2への接続方法については、「」を参照してください。AWS IoT Greengrassコア IPC サービス

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future = operation.get_response() future.result(TIMEOUT)
C++

例: メッセージの発行

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); PublishToIoTCoreOperation operation = ipcClient.NewPublishToIoTCore(); auto activate = operation.Activate(request, nullptr); activate.wait(); auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); // Handle operation error. } else { // Handle RPC error. } } return 0; }

SubscribeToIoTCore

から MQTT メッセージにサブスクライブします。 AWS IoT Core トピックまたはトピックフィルターで。-AWS IoT Greengrassコンポーネントがそのライフサイクルの終わりに達すると、コアソフトウェアによってサブスクリプションが削除されます。

この操作は、イベントメッセージのストリームをサブスクライブするサブスクリプション操作です。この操作を使用するには、イベントメッセージ、エラー、およびストリームのクロージャを処理する関数を持つストリーム応答ハンドラを定義します。詳細については、「IPC イベントストリームをサブスクライブする」を参照してください。

イベントメッセージの種類: IoTCoreMessage

Request

このオペレーションのリクエストには以下のパラメータがあります。

topicName

サブスクライブ先のトピック。MQTT トピックワイルドカード (#および+) を使用して、複数のトピックにサブスクライブします。

qos

使用する MQTT QoS この列挙、QOSには、以下の値があります。

  • AT_MOST_ONCE— QoS 0。MQTT メッセージは最大で 1 回配信されます。

  • AT_LEAST_ONCE— QoS 1. MQTT メッセージは少なくとも 1 回配信されます。

Response

このオペレーションのレスポンスには以下の情報が含まれています。

messages

MQTT メッセージのストリーム。このオブジェクト、IoTCoreMessageには、以下の情報が含まれています。

message

MQTT メッセージ。このオブジェクト、MQTTMessageには、以下の情報が含まれています。

topicName

メッセージが発行されたトピックです。

payload

(任意)メッセージペイロードを BLOB として指定します。

Examples

次の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。

Java

例: メッセージの購読

String topic = "my/topic"; QOS qos = QOS.AT_MOST_ONCE; SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new StreamResponseHandler<IoTCoreMessage>() { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); // Handle message. } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable throwable) { // Handle error. return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { // Handle close. } }; SubscribeToIoTCoreResponseHandler operationResponseHandler = greengrassCoreIPCClient .subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); operationResponseHandler.getResponse().get(); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. operationResponseHandler.closeStream();
Python

例: メッセージの購読

注記

この例では、バージョン 1.5.4 以降を使用していることを前提としています。 AWS IoT Device SDK for Python v2. SDK バージョン 1.5.3 を使用している場合は、」を使用する AWS IoT Device SDK Python v2への接続方法については、「」を参照してください。AWS IoT Greengrassコア IPC サービス

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++

例: メッセージの購読

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); IoTCoreResponseHandler streamHandler; SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation.Activate(request, nullptr); activate.wait(); auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation.Close(); return 0; }

Examples

以下の例では、使用方法を学習します。 AWS IoT Core MQTT IPC サービスをコンポーネントに実装します。

次のサンプルレシピでは、コンポーネントがすべてのトピックにパブリッシュできるようにします。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

次の例では、C++ アプリケーションを使用する方法を示します。 AWS IoT Core メッセージを発行する MQTT IPC サービス AWS IoT Core 。

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); PublishToIoTCoreOperation operation = ipcClient.NewPublishToIoTCore(); auto activate = operation.Activate(request, nullptr); activate.wait(); auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

次のサンプルレシピでは、コンポーネントにすべてのトピックをサブスクライブさせることができます。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

次の例では、C++ アプリケーションを使用する方法を示します。 AWS IoT Core からのメッセージを購読する MQTT IPC サービス AWS IoT Core 。

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); IoTCoreResponseHandler streamHandler; SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation.Activate(request, nullptr); activate.wait(); auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation.Close(); return 0; }