Publikasi/berlangganan pesan MQTT AWS IoT Core - AWS IoT Greengrass

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Publikasi/berlangganan pesan MQTT AWS IoT Core

Layanan IPC olah pesan MQTT AWS IoT Core memungkinkan Anda mengirim dan menerima pesan MQTT ke dan dari AWS IoT Core. Komponen dapat mempublikasikan pesan ke AWS IoT Core dan berlangganan topik untuk bertindak atas pesan MQTT dari sumber lain. Untuk informasi lebih lanjut tentang implementasi MQTT AWS IoT Core, lihat MQTT di Panduan Developer AWS IoT Core.

catatan

Layanan IPC olah pesan MQTT ini memungkinkan Anda bertukar pesan dengan AWS IoT Core. Untuk informasi selengkapnya tentang cara bertukar pesan antar komponen, lihat Pesan lokal publikasi/berlangganan.

SDK (Versi Minimum)

Tabel berikut menjelaskan versi minimum dariAWS IoT Device SDKyang harus Anda gunakan untuk mempublikasikan dan berlangganan pesan MQTT ke dan dariAWS IoT Core.

Otorisasi

Untuk menggunakanAWS IoT CorePesan MQTT dalam komponen kustom, Anda harus menentukan kebijakan otorisasi yang memungkinkan komponen Anda untuk mengirim dan menerima pesan pada topik. Untuk informasi tentang cara menentukan kebijakan otorisasi, lihat Otorisasi komponen untuk melakukan operasi IPC.

Kebijakan otorisasi untuk olah pesan MQTT AWS IoT Core memiliki properti berikut.

Pengenal Layanan: aws.greengrass.ipc.mqttproxy

Operasi Deskripsi Sumber daya

aws.greengrass#PublishToIoTCore

Memungkinkan komponen untuk mempublikasikan pesan ke AWS IoT Core pada topik MQTT yang Anda tentukan.

Topik string, seperti test/topic, atau * untuk mengizinkan akses ke semua topik. Anda dapat menggunakan wildcard topik MQTT (#dan+) untuk mencocokkan beberapa sumber daya.

aws.greengrass#SubscribeToIoTCore

Memungkinkan komponen untuk mempublikasikan pesan dari AWS IoT Core pada topik MQTT yang Anda tentukan.

Topik string, seperti test/topic, atau * untuk mengizinkan akses ke semua topik. Anda dapat menggunakan wildcard topik MQTT (#dan+) untuk mencocokkan beberapa sumber daya.

*

Memungkinkan komponen untuk mempublikasikan dan berlanggan pesan MQTT AWS IoT Core untuk topik yang Anda tentukan.

Topik string, seperti test/topic, atau * untuk mengizinkan akses ke semua topik. Anda dapat menggunakan wildcard topik MQTT (#dan+) untuk mencocokkan beberapa sumber daya.

Wildcard MQTT dalamAWS IoT CoreKebijakan otorisasi MQTT

Anda dapat menggunakan wildcard dalamAWS IoT CoreKebijakan otorisasi MQTT IPC. Komponen dapat mempublikasikan dan berlangganan topik yang cocok dengan filter topik yang Anda izinkan dalam kebijakan otorisasi. Misalnya, jika kebijakan otorisasi komponen memberikan akses ketest/topic/#, komponen dapat berlangganantest/topic/#, dan dapat mempublikasikan dan berlangganantest/topic/filter.

Variabel resep dalamAWS IoT CoreKebijakan otorisasi MQTT

Jika Anda menggunakan v2.6.0 atau versi yang lebih baruInti Greengrass, Anda dapat menggunakan{iot:thingName}variabel resep dalam kebijakan otorisasi. Fitur ini memungkinkan Anda mengonfigurasi kebijakan otorisasi tunggal untuk sekelompok perangkat inti, di mana setiap perangkat inti hanya dapat mengakses topik yang berisi namanya sendiri. Misalnya, Anda dapat mengizinkan akses komponen ke sumber daya topik berikut.

devices/{iot:thingName}/messages

Untuk informasi selengkapnya, lihat Variabel resep dan Gunakan variabel resep di update merge.

Contoh kebijakan otorisasi

Anda dapat mereferensikan contoh kebijakan otorisasi berikut untuk membantu Anda mengonfigurasi kebijakan otorisasi untuk komponen Anda.

contoh Contoh kebijakan otorisasi dengan akses tidak terbatas

Kebijakan otorisasi contoh berikut ini memungkinkan komponen untuk mempublikasikan dan berlangganan semua topik.

{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }

contoh Contoh kebijakan otorisasi dengan akses terbatas

Kebijakan otorisasi contoh berikut ini memungkinkan komponen untuk mempublikasikan dan berlangganan dua topik bernamafactory/1/eventsdanfactory/1/actions.

{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to factory 1 topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/actions", "factory/1/events" ] } } } }

contoh Contoh kebijakan otorisasi untuk sekelompok perangkat inti

penting

Contoh ini menggunakan fitur yang tersedia untuk v2.6.0 dan versi kemudianKomponen inti Greengrass. Greengrass nucleus v2.6.0 menambahkan dukungan untuk sebagian besarvariabel resep, seperti{iot:thingName}, dalam konfigurasi komponen.

Kebijakan otorisasi contoh berikut ini memungkinkan komponen untuk mempublikasikan dan berlangganan topik yang berisi nama perangkat inti yang menjalankan komponen.

{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/devices/{iot:thingName}/controls" ] } } } }

PublishToIoTCore

Mempublikasikan pesan MQTT ke AWS IoT Core mengenai suatu topik.

Permintaan

Permintaan operasi ini memiliki parameter berikut:

topicName(Python:topic_name)

Topik untuk mempublikasikan pesan.

qos

QoS MQTT yang akan digunakan. Enum ini, QOS, memiliki nilai-nilai berikut:

  • AT_MOST_ONCE – QoS 0. Pesan MQTT dikirim paling banyak sekali.

  • AT_LEAST_ONCE – QoS 1. Pesan MQTT dikirim paling sedikit sekali.

payload

(Opsional) Pesan muatan sebagai gumpalan.

Response

Operasi ini tidak memberikan informasi apa pun dalam tanggapannya.

Contoh

Contoh-contoh berikut ini menunjukkan cara memanggil operasi ini dalam kode komponen kustom.

Java (IPC client V1)

contoh Contoh: Publikasikan Pesan

catatan

Contoh ini menggunakanIPCUtilskelas untuk membuat koneksi keAWS IoT GreengrassLayanan IPC inti. Untuk informasi selengkapnya, lihat Hubungkan ke layanan IPC inti AWS IoT Greengrass.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PublishToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); PublishToIoTCoreResponseHandler responseHandler = PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos); CompletableFuture<PublishToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) { PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()); } }
Python (IPC client V1)

contoh Contoh: Publikasikan Pesan

catatan

Contoh ini mengasumsikan bahwa Anda menggunakan versi 1.5.4 atau yang lebih baru dari AWS IoT Device SDK untuk Python v2. Jika Anda menggunakan versi 1.5.3 SDK, lihat GunakanAWS IoT Device SDKuntuk Python v2 (klien IPC V1) untuk informasi tentang cara terhubung ke layanan IPC inti AWS IoT Greengrass.

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT)
C++

contoh Contoh: Publikasikan Pesan

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }

SubscribeToIoTCore

Berlangganan pesan MQTT dari AWS IoT Core pada filter topik atau topik. Perangkat lunak inti AWS IoT Greengrass menghapus langganan saat komponen mencapai akhir siklus hidupnya.

Operasi ini adalah operasi berlangganan di mana Anda berlangganan aliran pesan peristiwa. Untuk menggunakan operasi ini, tentukan bagian yang menangani respons aliran dengan fungsi yang menangani pesan peristiwa, kesalahan, dan penutupan aliran. Untuk informasi selengkapnya, lihat Berlangganan pengaliran peristiwa IPC.

Jenis pesan peristiwa: IoTCoreMessage

Permintaan

Permintaan operasi ini memiliki parameter berikut:

topicName(Python:topic_name)

Topik yang harus dijadikan langganan. Anda dapat menggunakan wildcard topik MQTT (# dan +) untuk berlangganan beberapa topik.

qos

QoS MQTT yang akan digunakan. Enum ini, QOS, memiliki nilai-nilai berikut:

  • AT_MOST_ONCE – QoS 0. Pesan MQTT dikirim paling banyak sekali.

  • AT_LEAST_ONCE – QoS 1. Pesan MQTT dikirim paling sedikit sekali.

Response

Tanggapan operasi ini memiliki informasi berikut:

messages

Aliran pesan MQTT. Objek ini, IoTCoreMessage, berisi informasi berikut:

message

Pesan MQTT. Objek ini, MQTTMessage, berisi informasi berikut:

topicName(Python:topic_name)

Topik yang pesannya dipublikasikan.

payload

(Opsional) Pesan muatan sebagai gumpalan.

Contoh

Contoh-contoh berikut ini menunjukkan cara memanggil operasi ini dalam kode komponen kustom.

Java (IPC client V1)

contoh Contoh: Berlangganan pesan

catatan

Contoh ini menggunakanIPCUtilskelas untuk membuat koneksi keAWS IoT GreengrassLayanan IPC inti. Untuk informasi selengkapnya, lihat Hubungkan ke layanan IPC inti AWS IoT Greengrass.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new SubscriptionResponseHandler(); SubscribeToIoTCoreResponseHandler responseHandler = SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos, streamResponseHandler); CompletableFuture<SubscribeToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) { SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String topic = ioTCoreMessage.getMessage().getTopicName(); String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; } @Override public void onStreamClosed() { System.out.println("Subscribe to IoT Core stream closed."); } } }
Python (IPC client V1)

contoh Contoh: Berlangganan pesan

catatan

Contoh ini mengasumsikan bahwa Anda menggunakan versi 1.5.4 atau yang lebih baru dari AWS IoT Device SDK untuk Python v2. Jika Anda menggunakan versi 1.5.3 SDK, lihat GunakanAWS IoT Device SDKuntuk Python v2 (klien IPC V1) untuk informasi tentang cara terhubung ke layanan IPC inti AWS IoT Greengrass.

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++

contoh Contoh: Berlangganan pesan

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string topicName = message.value().GetTopicName().value().c_str(); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }

Contoh

Gunakan contoh berikut untuk mempelajari cara menggunakan layanan IPC MQTT AWS IoT Core dalam komponen Anda.

Contoh resep berikut memungkinkan komponen untuk mempublikasikan ke semua topik.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

Aplikasi contoh C ++ berikut ini menunjukkan cara menggunakan layanan IPC MQTT AWS IoT Core untuk mempublikasikan pesan ke AWS IoT Core.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

Contoh resep berikut memungkinkan komponen untuk berlangganan semua topik.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

Aplikasi contoh C ++ berikut ini menunjukkan cara menggunakan layanan IPC MQTT AWS IoT Core untuk berlangganan pesan dari AWS IoT Core.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }