AWS Code Sample

The AWS Documentation website is getting a new look!
Try it now and let us know what you think. Switch to the new look >>

You can return to the original look by selecting English in the language selector above. demonstrates how to mock an AWS IoT Things Graph camera device in an AWS IoT Greengrass group.

# # Copyright 2010-2019, Inc. or its affiliates. All Rights Reserved. # # This file is licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. A copy of # the License is located at # # # # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR # CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. # import os import sys import time import uuid import json import logging import argparse from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException AllowedActions = ['both', 'publish', 'subscribe'] # General message notification callback def customOnMessage(message): print('Received message on topic %s: %s\n' % (message.topic, message.payload)) MAX_DISCOVERY_RETRIES = 10 GROUP_CA_PATH = "./groupCA/" # Read in command-line parameters parser = argparse.ArgumentParser() parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint") parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") parser.add_argument("-n", "--thingName", action="store", dest="thingName", default="Bot", help="Targeted thing name") args = parser.parse_args() host = rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath clientId = args.thingName thingName = args.thingName #topic = args.topic images = ["", "", "", ""] if not args.certificatePath or not args.privateKeyPath: parser.error("Missing credentials for authentication.") exit(2) # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # Progressive back off core backOffCore = ProgressiveBackOffCore() # Discover GGCs discoveryInfoProvider = DiscoveryInfoProvider() discoveryInfoProvider.configureEndpoint(host) discoveryInfoProvider.configureCredentials(rootCAPath, certificatePath, privateKeyPath) discoveryInfoProvider.configureTimeout(10) # 10 sec retryCount = MAX_DISCOVERY_RETRIES discovered = False groupCA = None coreInfo = None while retryCount != 0: try: discoveryInfo = caList = discoveryInfo.getAllCas() coreList = discoveryInfo.getAllCores() # We only pick the first ca and core info groupId, ca = caList[0] coreInfo = coreList[0] print("Discovered GGC: %s from Group: %s" % (coreInfo.coreThingArn, groupId)) print("Now we persist the connectivity/identity information...") groupCA = GROUP_CA_PATH + groupId + "_CA_" + str(uuid.uuid4()) + ".crt" if not os.path.exists(GROUP_CA_PATH): os.makedirs(GROUP_CA_PATH) groupCAFile = open(groupCA, "w") groupCAFile.write(ca) groupCAFile.close() discovered = True print("Now proceed to the connecting flow...") break except DiscoveryInvalidRequestException as e: print("Invalid discovery request detected!") print("Type: %s" % str(type(e))) print("Error message: %s" % e.message) print("Stopping...") break except BaseException as e: print("Error in discovery!") print("Type: %s" % str(type(e))) print("Error message: %s" % e.message) retryCount -= 1 print("\n%d/%d retries left\n" % (retryCount, MAX_DISCOVERY_RETRIES)) print("Backing off...\n") backOffCore.backOff() if not discovered: print("Discovery failed after %d retries. Exiting...\n" % (MAX_DISCOVERY_RETRIES)) sys.exit(-1) # Iterate through all connection options for the core and use the first successful one myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) myAWSIoTMQTTClient.configureCredentials(groupCA, privateKeyPath, certificatePath) myCount = 0 # General message notification callback def customOnMessage(message): print('Received message on topic %s: %s\n' % (message.topic, message.payload)) message = {} global myCount myCount = myCount + 1 message['lastClickedImage'] = images[myCount%4] messageJson = json.dumps(message) myAWSIoTMQTTClient.publish(thingName + "/capture/finished", messageJson, 0) print('Published topic %s: %s\n' % (thingName + "/capture/finished", messageJson)) myAWSIoTMQTTClient.onMessage = customOnMessage connected = False for connectivityInfo in coreInfo.connectivityInfoList: currentHost = currentPort = connectivityInfo.port print("Trying to connect to core at %s:%d" % (currentHost, currentPort)) myAWSIoTMQTTClient.configureEndpoint(currentHost, currentPort) try: myAWSIoTMQTTClient.connect() connected = True break except BaseException as e: print("Error in connect!") print("Type: %s" % str(type(e))) print("Error message: %s" % e.message) if not connected: print("Cannot connect to core %s. Exiting..." % coreInfo.coreThingArn) sys.exit(-2) # Successfully connected to the core #if args.mode == 'both' or args.mode == 'subscribe': myAWSIoTMQTTClient.subscribe(thingName + "/capture", 0, None) time.sleep(2) while True: time.sleep(1)

Sample Details

Service: iotthingsgraph

Last tested: 2019-07-25

Author: AWS

Type: full-example

On this page: