AWS IoT examples using SDK for Python (Boto3) - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

AWS IoT examples using SDK for Python (Boto3)

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Python (Boto3) with AWS IoT.

Basics are code examples that show you how to perform the essential operations within a service.

Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.

Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.

Get started

The following code example shows how to get started using AWS IoT.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def hello_iot(): """ Use the AWS SDK for Python (Boto3) to create an AWS IoT client and list up to 10 things in your AWS IoT account. This example uses the default settings specified in your shared credentials and config files. """ try: iot_client = boto3.client("iot") response = iot_client.list_things(maxResults=10) things = response.get("things", []) print("Hello, AWS IoT! Here are your things:") if things: for i, thing in enumerate(things, 1): print(f"{i}. {thing['thingName']}") else: print("No things found in your AWS IoT account.") except ClientError as e: if e.response["Error"]["Code"] == "UnauthorizedException": print("You don't have permission to access AWS IoT.") else: print(f"Couldn't access AWS IoT. Error: {e}") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except Exception as e: print(f"An unexpected error occurred: {e}")
  • For API details, see listThings in AWS SDK for Python (Boto3) API Reference.

Basics

The following code example shows how to:

  • Create an AWS IoT Thing.

  • Generate a device certificate.

  • Update an AWS IoT Thing with Attributes.

  • Return a unique endpoint.

  • List your AWS IoT certificates.

  • Update an AWS IoT shadow.

  • Write out state information.

  • Creates a rule.

  • List your rules.

  • Search things using the Thing name.

  • Delete an AWS IoT Thing.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create an IoT wrapper class to manage operations.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_thing(self, thing_name): """ Creates an AWS IoT thing. :param thing_name: The name of the thing to create. :return: The name and ARN of the created thing. """ try: response = self.iot_client.create_thing(thingName=thing_name) logger.info("Created thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Thing %s already exists. Skipping creation.", thing_name) return None logger.error( "Couldn't create thing %s. Here's why: %s: %s", thing_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def list_things(self): """ Lists AWS IoT things. :return: The list of things. """ try: things = [] paginator = self.iot_client.get_paginator("list_things") for page in paginator.paginate(): things.extend(page["things"]) logger.info("Retrieved %s things.", len(things)) return things except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list things. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def create_keys_and_certificate(self): """ Creates keys and a certificate for an AWS IoT thing. :return: The certificate ID, ARN, and PEM. """ try: response = self.iot_client.create_keys_and_certificate(setAsActive=True) logger.info("Created certificate %s.", response["certificateId"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't create keys and certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def attach_thing_principal(self, thing_name, principal): """ Attaches a certificate to an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.attach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Attached principal %s to thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot attach principal. Resource not found.") return logger.error( "Couldn't attach principal to thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def describe_endpoint(self, endpoint_type="iot:Data-ATS"): """ Gets the AWS IoT endpoint. :param endpoint_type: The endpoint type. :return: The endpoint. """ try: response = self.iot_client.describe_endpoint(endpointType=endpoint_type) logger.info("Retrieved endpoint %s.", response["endpointAddress"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't describe endpoint. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["endpointAddress"] def list_certificates(self): """ Lists AWS IoT certificates. :return: The list of certificates. """ try: certificates = [] paginator = self.iot_client.get_paginator("list_certificates") for page in paginator.paginate(): certificates.extend(page["certificates"]) logger.info("Retrieved %s certificates.", len(certificates)) return certificates except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list certificates. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def detach_thing_principal(self, thing_name, principal): """ Detaches a certificate from an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.detach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Detached principal %s from thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot detach principal. Resource not found.") return logger.error( "Couldn't detach principal from thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_certificate(self, certificate_id): """ Deletes an AWS IoT certificate. :param certificate_id: The ID of the certificate to delete. """ try: self.iot_client.update_certificate( certificateId=certificate_id, newStatus="INACTIVE" ) self.iot_client.delete_certificate(certificateId=certificate_id) logger.info("Deleted certificate %s.", certificate_id) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete certificate. Resource not found.") return logger.error( "Couldn't delete certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def create_topic_rule(self, rule_name, topic, sns_action_arn, role_arn): """ Creates an AWS IoT topic rule. :param rule_name: The name of the rule. :param topic: The MQTT topic to subscribe to. :param sns_action_arn: The ARN of the SNS topic to publish to. :param role_arn: The ARN of the IAM role. """ try: self.iot_client.create_topic_rule( ruleName=rule_name, topicRulePayload={ "sql": f"SELECT * FROM '{topic}'", "actions": [ {"sns": {"targetArn": sns_action_arn, "roleArn": role_arn}} ], }, ) logger.info("Created topic rule %s.", rule_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Topic rule %s already exists. Skipping creation.", rule_name) return logger.error( "Couldn't create topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_topic_rules(self): """ Lists AWS IoT topic rules. :return: The list of topic rules. """ try: rules = [] paginator = self.iot_client.get_paginator("list_topic_rules") for page in paginator.paginate(): rules.extend(page["rules"]) logger.info("Retrieved %s topic rules.", len(rules)) return rules except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list topic rules. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def search_index(self, query): """ Searches the AWS IoT index. :param query: The search query. :return: The list of things found. """ try: response = self.iot_client.search_index(queryString=query) logger.info("Found %s things.", len(response.get("things", []))) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't search index. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response.get("things", []) def update_indexing_configuration(self): """ Updates the AWS IoT indexing configuration to enable thing indexing. """ try: self.iot_client.update_indexing_configuration( thingIndexingConfiguration={"thingIndexingMode": "REGISTRY"} ) logger.info("Updated indexing configuration.") except ClientError as err: logger.error( "Couldn't update indexing configuration. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_thing(self, thing_name): """ Deletes an AWS IoT thing. :param thing_name: The name of the thing to delete. """ try: self.iot_client.delete_thing(thingName=thing_name) logger.info("Deleted thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete thing. Resource not found.") return logger.error( "Couldn't delete thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_topic_rule(self, rule_name): """ Deletes an AWS IoT topic rule. :param rule_name: The name of the rule to delete. """ try: self.iot_client.delete_topic_rule(ruleName=rule_name) logger.info("Deleted topic rule %s.", rule_name) except ClientError as err: logger.error( "Couldn't delete topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def update_thing_shadow(self, thing_name, shadow_state): """ Updates the shadow for an AWS IoT thing. :param thing_name: The name of the thing. :param shadow_state: The shadow state as a dictionary. """ import json try: self.iot_data_client.update_thing_shadow( thingName=thing_name, payload=json.dumps(shadow_state) ) logger.info("Updated shadow for thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot update thing shadow. Resource not found.") return logger.error( "Couldn't update thing shadow. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_thing_shadow(self, thing_name): """ Gets the shadow for an AWS IoT thing. :param thing_name: The name of the thing. :return: The shadow state as a dictionary. """ import json try: response = self.iot_data_client.get_thing_shadow(thingName=thing_name) shadow = json.loads(response["payload"].read()) logger.info("Retrieved shadow for thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot get thing shadow. Resource not found.") return None logger.error( "Couldn't get thing shadow. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return shadow

Run an interactive scenario demonstrating IoT basics.

class IoTScenario: """Runs an interactive scenario that shows how to use AWS IoT.""" is_interactive = True def __init__(self, iot_wrapper, iot_data_client, cfn_client, stack_name="IoTBasicsStack", template_path=None): """ :param iot_wrapper: An instance of the IoTWrapper class. :param iot_data_client: A Boto3 IoT Data Plane client. :param cfn_client: A Boto3 CloudFormation client. :param stack_name: Name for the CloudFormation stack. :param template_path: Path to the CloudFormation template file. """ self.iot_wrapper = iot_wrapper self.iot_data_client = iot_data_client self.cfn_client = cfn_client self.thing_name = None self.certificate_arn = None self.certificate_id = None self.rule_name = None self.stack_name = stack_name self.template_path = template_path or "../../../scenarios/basics/iot/iot_usecase/resources/cfn_template.yaml" def _deploy_stack(self): """Deploy CloudFormation stack and return outputs.""" with open(self.template_path, "r") as f: template_body = f.read() try: self.cfn_client.create_stack( StackName=self.stack_name, TemplateBody=template_body, Capabilities=["CAPABILITY_NAMED_IAM"] ) waiter = self.cfn_client.get_waiter("stack_create_complete") waiter.wait(StackName=self.stack_name) response = self.cfn_client.describe_stacks(StackName=self.stack_name) outputs = {output["OutputKey"]: output["OutputValue"] for output in response["Stacks"][0]["Outputs"]} return outputs["SNSTopicArn"], outputs["RoleArn"] except ClientError as err: if err.response["Error"]["Code"] == "AlreadyExistsException": response = self.cfn_client.describe_stacks(StackName=self.stack_name) outputs = {output["OutputKey"]: output["OutputValue"] for output in response["Stacks"][0]["Outputs"]} return outputs["SNSTopicArn"], outputs["RoleArn"] raise def _cleanup_stack(self): """Delete CloudFormation stack.""" try: self.cfn_client.delete_stack(StackName=self.stack_name) waiter = self.cfn_client.get_waiter("stack_delete_complete") waiter.wait(StackName=self.stack_name) print("CloudFormation stack deleted successfully.") except ClientError as err: logger.error(f"Failed to delete stack: {err}") def run_scenario(self, thing_name, rule_name): """ Runs the IoT basics scenario. :param thing_name: The name of the thing to create. :param rule_name: The name of the topic rule to create. """ print("-" * 88) print("Welcome to the AWS IoT basics scenario!") print("-" * 88) print( "This scenario demonstrates how to interact with AWS IoT using the AWS SDK for Python (Boto3).\n" "AWS IoT provides secure, bi-directional communication between Internet-connected devices\n" "and the AWS cloud. You can manage device connections, process device data, and build IoT applications.\n" ) self.thing_name = thing_name self.rule_name = rule_name try: print("\nDeploying CloudFormation stack...") sns_topic_arn, role_arn = self._deploy_stack() print(f"Stack deployed. SNS Topic: {sns_topic_arn}") input("\nNext, we'll create an AWS IoT thing. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("1. Create an AWS IoT thing") print("-" * 88) response = self.iot_wrapper.create_thing(thing_name) print(f"Created thing: {response['thingName']}") print(f"Thing ARN: {response['thingArn']}") input("\nNext, we'll list things. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("2. List things") print("-" * 88) things = self.iot_wrapper.list_things() print(f"Found {len(things)} thing(s) in your account") for thing in things[:5]: # Show first 5 print(f" Thing name: {thing['thingName']}") input("\nNext, we'll generate a device certificate. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("3. Generate a device certificate") print("-" * 88) cert_response = self.iot_wrapper.create_keys_and_certificate() self.certificate_arn = cert_response["certificateArn"] self.certificate_id = cert_response["certificateId"] print(f"Created certificate: {self.certificate_id}") input("\nNext, we'll attach the certificate to the thing. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("4. Attach the certificate to the thing") print("-" * 88) self.iot_wrapper.attach_thing_principal(thing_name, self.certificate_arn) print(f"Attached certificate to thing: {thing_name}") input("\nNext, we'll update the thing shadow. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("5. Update the thing shadow") print("-" * 88) shadow_state = {"state": {"reported": {"temperature": 25, "humidity": 50}}} self.iot_wrapper.update_thing_shadow(thing_name, shadow_state) print(f"Updated shadow for thing: {thing_name}") input("\nNext, we'll get the thing shadow. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("6. Get the thing shadow") print("-" * 88) shadow = self.iot_wrapper.get_thing_shadow(thing_name) print(f"Shadow state: {json.dumps(shadow['state'], indent=2)}") input("\nNext, we'll get the AWS IoT endpoint. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("7. Get the AWS IoT endpoint") print("-" * 88) endpoint = self.iot_wrapper.describe_endpoint() print(f"IoT endpoint: {endpoint}") input("\nNext, we'll list certificates. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("8. List certificates") print("-" * 88) certificates = self.iot_wrapper.list_certificates() print(f"Found {len(certificates)} certificate(s)") for cert in certificates: print(f" Certificate ID: {cert['certificateId']}") print(f" Certificate ARN: {cert['certificateArn']}") print(f" Status: {cert['status']}") input("\nNext, we'll create a topic rule. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("9. Create a topic rule") print("-" * 88) self.iot_wrapper.create_topic_rule( rule_name, f"device/{thing_name}/data", sns_topic_arn, role_arn ) print(f"Created topic rule: {rule_name}") input("\nNext, we'll list topic rules. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("10. List topic rules") print("-" * 88) rules = self.iot_wrapper.list_topic_rules() print(f"Found {len(rules)} topic rule(s)") for rule in rules: print(f" Rule name: {rule['ruleName']}") print(f" Rule ARN: {rule['ruleArn']}") input("\nNext, we'll configure thing indexing. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("11. Configure thing indexing") print("-" * 88) self.iot_wrapper.update_indexing_configuration() print("Enabled thing indexing") print("Waiting for indexing to be ready...") time.sleep(10) input("\nNext, we'll search for things. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("12. Search for things") print("-" * 88) try: things = self.iot_wrapper.search_index(f"thingName:{thing_name}") if things: print(f"Found {len(things)} thing(s) matching the query") for thing in things: print(f" Thing name: {thing.get('thingName', 'N/A')}") print(f" Thing ID: {thing.get('thingId', 'N/A')}") else: print("No things found. Indexing may take a few minutes.") except ClientError as err: if err.response["Error"]["Code"] in [ "IndexNotReadyException", "InvalidRequestException", ]: print("Search index not ready yet. This is expected.") else: raise except ClientError as err: logger.error( "Scenario failed: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise finally: self._cleanup() def _cleanup(self): """Cleans up resources created during the scenario.""" if not self.thing_name: return print("\n" + "-" * 88) print("Cleanup") print("-" * 88) if q.ask("Do you want to delete the resources? (y/n) ", q.is_yesno): try: if self.certificate_arn: print(f"Detaching certificate from thing: {self.thing_name}") self.iot_wrapper.detach_thing_principal( self.thing_name, self.certificate_arn ) if self.certificate_id: print(f"Deleting certificate: {self.certificate_id}") self.iot_wrapper.delete_certificate(self.certificate_id) if self.thing_name: print(f"Deleting thing: {self.thing_name}") self.iot_wrapper.delete_thing(self.thing_name) if self.rule_name: print(f"Deleting topic rule: {self.rule_name}") self.iot_wrapper.delete_topic_rule(self.rule_name) self._cleanup_stack() print("Resources deleted successfully.") except ClientError as err: logger.error( "Cleanup failed: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) else: print("Resources will remain in your account.") print("\n" + "-" * 88) print("Thanks for using AWS IoT!") print("-" * 88)

Actions

The following code example shows how to use AttachThingPrincipal.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def attach_thing_principal(self, thing_name, principal): """ Attaches a certificate to an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.attach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Attached principal %s to thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot attach principal. Resource not found.") return logger.error( "Couldn't attach principal to thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

The following code example shows how to use CreateKeysAndCertificate.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_keys_and_certificate(self): """ Creates keys and a certificate for an AWS IoT thing. :return: The certificate ID, ARN, and PEM. """ try: response = self.iot_client.create_keys_and_certificate(setAsActive=True) logger.info("Created certificate %s.", response["certificateId"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't create keys and certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response

The following code example shows how to use CreateThing.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_thing(self, thing_name): """ Creates an AWS IoT thing. :param thing_name: The name of the thing to create. :return: The name and ARN of the created thing. """ try: response = self.iot_client.create_thing(thingName=thing_name) logger.info("Created thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Thing %s already exists. Skipping creation.", thing_name) return None logger.error( "Couldn't create thing %s. Here's why: %s: %s", thing_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • For API details, see CreateThing in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use CreateTopicRule.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_topic_rule(self, rule_name, topic, sns_action_arn, role_arn): """ Creates an AWS IoT topic rule. :param rule_name: The name of the rule. :param topic: The MQTT topic to subscribe to. :param sns_action_arn: The ARN of the SNS topic to publish to. :param role_arn: The ARN of the IAM role. """ try: self.iot_client.create_topic_rule( ruleName=rule_name, topicRulePayload={ "sql": f"SELECT * FROM '{topic}'", "actions": [ {"sns": {"targetArn": sns_action_arn, "roleArn": role_arn}} ], }, ) logger.info("Created topic rule %s.", rule_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Topic rule %s already exists. Skipping creation.", rule_name) return logger.error( "Couldn't create topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • For API details, see CreateTopicRule in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteCertificate.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def delete_certificate(self, certificate_id): """ Deletes an AWS IoT certificate. :param certificate_id: The ID of the certificate to delete. """ try: self.iot_client.update_certificate( certificateId=certificate_id, newStatus="INACTIVE" ) self.iot_client.delete_certificate(certificateId=certificate_id) logger.info("Deleted certificate %s.", certificate_id) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete certificate. Resource not found.") return logger.error( "Couldn't delete certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • For API details, see DeleteCertificate in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteThing.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def delete_thing(self, thing_name): """ Deletes an AWS IoT thing. :param thing_name: The name of the thing to delete. """ try: self.iot_client.delete_thing(thingName=thing_name) logger.info("Deleted thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete thing. Resource not found.") return logger.error( "Couldn't delete thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • For API details, see DeleteThing in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteTopicRule.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def delete_topic_rule(self, rule_name): """ Deletes an AWS IoT topic rule. :param rule_name: The name of the rule to delete. """ try: self.iot_client.delete_topic_rule(ruleName=rule_name) logger.info("Deleted topic rule %s.", rule_name) except ClientError as err: logger.error( "Couldn't delete topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • For API details, see DeleteTopicRule in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DescribeEndpoint.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def describe_endpoint(self, endpoint_type="iot:Data-ATS"): """ Gets the AWS IoT endpoint. :param endpoint_type: The endpoint type. :return: The endpoint. """ try: response = self.iot_client.describe_endpoint(endpointType=endpoint_type) logger.info("Retrieved endpoint %s.", response["endpointAddress"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't describe endpoint. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["endpointAddress"]
  • For API details, see DescribeEndpoint in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DetachThingPrincipal.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def detach_thing_principal(self, thing_name, principal): """ Detaches a certificate from an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.detach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Detached principal %s from thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot detach principal. Resource not found.") return logger.error( "Couldn't detach principal from thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

The following code example shows how to use ListCertificates.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def list_certificates(self): """ Lists AWS IoT certificates. :return: The list of certificates. """ try: certificates = [] paginator = self.iot_client.get_paginator("list_certificates") for page in paginator.paginate(): certificates.extend(page["certificates"]) logger.info("Retrieved %s certificates.", len(certificates)) return certificates except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list certificates. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • For API details, see ListCertificates in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use ListThings.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def list_things(self): """ Lists AWS IoT things. :return: The list of things. """ try: things = [] paginator = self.iot_client.get_paginator("list_things") for page in paginator.paginate(): things.extend(page["things"]) logger.info("Retrieved %s things.", len(things)) return things except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list things. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • For API details, see ListThings in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use SearchIndex.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def search_index(self, query): """ Searches the AWS IoT index. :param query: The search query. :return: The list of things found. """ try: response = self.iot_client.search_index(queryString=query) logger.info("Found %s things.", len(response.get("things", []))) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't search index. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response.get("things", [])
  • For API details, see SearchIndex in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use UpdateIndexingConfiguration.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def update_indexing_configuration(self): """ Updates the AWS IoT indexing configuration to enable thing indexing. """ try: self.iot_client.update_indexing_configuration( thingIndexingConfiguration={"thingIndexingMode": "REGISTRY"} ) logger.info("Updated indexing configuration.") except ClientError as err: logger.error( "Couldn't update indexing configuration. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise