选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

使用 SDK for Python (Boto3) 的 Aurora 示例 - AWS SDK 代码示例

文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 SDK for Python (Boto3) 的 Aurora 示例

以下代码示例向您展示了如何通过 AWS SDK for Python (Boto3) 与 Aurora 一起使用来执行操作和实现常见场景。

基础知识是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您演示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例显示如何开始使用 Aurora。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 # Create an RDS client rds = boto3.client("rds") # Create a paginator for the describe_db_clusters operation paginator = rds.get_paginator("describe_db_clusters") # Use the paginator to get a list of DB clusters response_iterator = paginator.paginate( PaginationConfig={ "PageSize": 50, # Adjust PageSize as needed "StartingToken": None, } ) # Iterate through the pages of the response clusters_found = False for page in response_iterator: if "DBClusters" in page and page["DBClusters"]: clusters_found = True print("Here are your RDS Aurora clusters:") for cluster in page["DBClusters"]: print( f"Cluster ID: {cluster['DBClusterIdentifier']}, Engine: {cluster['Engine']}" ) if not clusters_found: print("No clusters found!")
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBClusters中的描述

以下代码示例显示如何开始使用 Aurora。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 # Create an RDS client rds = boto3.client("rds") # Create a paginator for the describe_db_clusters operation paginator = rds.get_paginator("describe_db_clusters") # Use the paginator to get a list of DB clusters response_iterator = paginator.paginate( PaginationConfig={ "PageSize": 50, # Adjust PageSize as needed "StartingToken": None, } ) # Iterate through the pages of the response clusters_found = False for page in response_iterator: if "DBClusters" in page and page["DBClusters"]: clusters_found = True print("Here are your RDS Aurora clusters:") for cluster in page["DBClusters"]: print( f"Cluster ID: {cluster['DBClusterIdentifier']}, Engine: {cluster['Engine']}" ) if not clusters_found: print("No clusters found!")
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBClusters中的描述

基本功能

以下代码示例展示了如何:

  • 创建自定义 Aurora 数据库集群参数组并设置参数值。

  • 创建一个使用参数组的数据库集群。

  • 创建包含数据库的数据库实例。

  • 拍摄数据库集群的快照,然后清理资源。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

class AuroraClusterScenario: """Runs a scenario that shows how to get started using Aurora DB clusters.""" def __init__(self, aurora_wrapper): """ :param aurora_wrapper: An object that wraps Aurora DB cluster actions. """ self.aurora_wrapper = aurora_wrapper def create_parameter_group(self, db_engine, parameter_group_name): """ Shows how to get available engine versions for a specified database engine and create a DB cluster parameter group that is compatible with a selected engine family. :param db_engine: The database engine to use as a basis. :param parameter_group_name: The name given to the newly created parameter group. :return: The newly created parameter group. """ print( f"Checking for an existing DB cluster parameter group named {parameter_group_name}." ) parameter_group = self.aurora_wrapper.get_parameter_group(parameter_group_name) if parameter_group is None: print(f"Getting available database engine versions for {db_engine}.") engine_versions = self.aurora_wrapper.get_engine_versions(db_engine) families = list({ver["DBParameterGroupFamily"] for ver in engine_versions}) family_index = q.choose("Which family do you want to use? ", families) print(f"Creating a DB cluster parameter group.") self.aurora_wrapper.create_parameter_group( parameter_group_name, families[family_index], "Example parameter group." ) parameter_group = self.aurora_wrapper.get_parameter_group( parameter_group_name ) print(f"Parameter group {parameter_group['DBClusterParameterGroupName']}:") pp(parameter_group) print("-" * 88) return parameter_group def set_user_parameters(self, parameter_group_name): """ Shows how to get the parameters contained in a custom parameter group and update some of the parameter values in the group. :param parameter_group_name: The name of the parameter group to query and modify. """ print("Let's set some parameter values in your parameter group.") auto_inc_parameters = self.aurora_wrapper.get_parameters( parameter_group_name, name_prefix="auto_increment" ) update_params = [] for auto_inc in auto_inc_parameters: if auto_inc["IsModifiable"] and auto_inc["DataType"] == "integer": print(f"The {auto_inc['ParameterName']} parameter is described as:") print(f"\t{auto_inc['Description']}") param_range = auto_inc["AllowedValues"].split("-") auto_inc["ParameterValue"] = str( q.ask( f"Enter a value between {param_range[0]} and {param_range[1]}: ", q.is_int, q.in_range(int(param_range[0]), int(param_range[1])), ) ) update_params.append(auto_inc) self.aurora_wrapper.update_parameters(parameter_group_name, update_params) print( "You can get a list of parameters you've set by specifying a source of 'user'." ) user_parameters = self.aurora_wrapper.get_parameters( parameter_group_name, source="user" ) pp(user_parameters) print("-" * 88) def create_cluster(self, cluster_name, db_engine, db_name, parameter_group): """ Shows how to create an Aurora DB cluster that contains a database of a specified type. The database is also configured to use a custom DB cluster parameter group. :param cluster_name: The name given to the newly created DB cluster. :param db_engine: The engine of the created database. :param db_name: The name given to the created database. :param parameter_group: The parameter group that is associated with the DB cluster. :return: The newly created DB cluster. """ print("Checking for an existing DB cluster.") cluster = self.aurora_wrapper.get_db_cluster(cluster_name) if cluster is None: admin_username = q.ask( "Enter an administrator user name for the database: ", q.non_empty ) admin_password = q.ask( "Enter a password for the administrator (at least 8 characters): ", q.non_empty, ) engine_versions = self.aurora_wrapper.get_engine_versions( db_engine, parameter_group["DBParameterGroupFamily"] ) engine_choices = [ ver["EngineVersionDescription"] for ver in engine_versions ] print("The available engines for your parameter group are:") engine_index = q.choose("Which engine do you want to use? ", engine_choices) print( f"Creating DB cluster {cluster_name} and database {db_name}.\n" f"The DB cluster is configured to use\n" f"your custom parameter group {parameter_group['DBClusterParameterGroupName']}\n" f"and selected engine {engine_choices[engine_index]}.\n" f"This typically takes several minutes." ) cluster = self.aurora_wrapper.create_db_cluster( cluster_name, parameter_group["DBClusterParameterGroupName"], db_name, db_engine, engine_versions[engine_index]["EngineVersion"], admin_username, admin_password, ) while cluster.get("Status") != "available": wait(30) cluster = self.aurora_wrapper.get_db_cluster(cluster_name) print("Cluster created and available.\n") print("Cluster data:") pp(cluster) print("-" * 88) return cluster def create_instance(self, cluster): """ Shows how to create a DB instance in an existing Aurora DB cluster. A new DB cluster contains no DB instances, so you must add one. The first DB instance that is added to a DB cluster defaults to a read-write DB instance. :param cluster: The DB cluster where the DB instance is added. :return: The newly created DB instance. """ print("Checking for an existing database instance.") cluster_name = cluster["DBClusterIdentifier"] db_inst = self.aurora_wrapper.get_db_instance(cluster_name) if db_inst is None: print("Let's create a database instance in your DB cluster.") print("First, choose a DB instance type:") inst_opts = self.aurora_wrapper.get_orderable_instances( cluster["Engine"], cluster["EngineVersion"] ) inst_choices = list( { opt["DBInstanceClass"] + ", storage type: " + opt["StorageType"] for opt in inst_opts } ) inst_index = q.choose( "Which DB instance class do you want to use? ", inst_choices ) print( f"Creating a database instance. This typically takes several minutes." ) db_inst = self.aurora_wrapper.create_instance_in_cluster( cluster_name, cluster_name, cluster["Engine"], inst_opts[inst_index]["DBInstanceClass"], ) while db_inst.get("DBInstanceStatus") != "available": wait(30) db_inst = self.aurora_wrapper.get_db_instance(cluster_name) print("Instance data:") pp(db_inst) print("-" * 88) return db_inst @staticmethod def display_connection(cluster): """ Displays connection information about an Aurora DB cluster and tips on how to connect to it. :param cluster: The DB cluster to display. """ print( "You can now connect to your database using your favorite MySql client.\n" "One way to connect is by using the 'mysql' shell on an Amazon EC2 instance\n" "that is running in the same VPC as your database cluster. Pass the endpoint,\n" "port, and administrator user name to 'mysql' and enter your password\n" "when prompted:\n" ) print( f"\n\tmysql -h {cluster['Endpoint']} -P {cluster['Port']} -u {cluster['MasterUsername']} -p\n" ) print( "For more information, see the User Guide for Aurora:\n" "\thttps://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_GettingStartedAurora.CreatingConnecting.Aurora.html#CHAP_GettingStartedAurora.Aurora.Connect" ) print("-" * 88) def create_snapshot(self, cluster_name): """ Shows how to create a DB cluster snapshot and wait until it's available. :param cluster_name: The name of a DB cluster to snapshot. """ if q.ask( "Do you want to create a snapshot of your DB cluster (y/n)? ", q.is_yesno ): snapshot_id = f"{cluster_name}-{uuid.uuid4()}" print( f"Creating a snapshot named {snapshot_id}. This typically takes a few minutes." ) snapshot = self.aurora_wrapper.create_cluster_snapshot( snapshot_id, cluster_name ) while snapshot.get("Status") != "available": wait(30) snapshot = self.aurora_wrapper.get_cluster_snapshot(snapshot_id) pp(snapshot) print("-" * 88) def cleanup(self, db_inst, cluster, parameter_group): """ Shows how to clean up a DB instance, DB cluster, and DB cluster parameter group. Before the DB cluster parameter group can be deleted, all associated DB instances and DB clusters must first be deleted. :param db_inst: The DB instance to delete. :param cluster: The DB cluster to delete. :param parameter_group: The DB cluster parameter group to delete. """ cluster_name = cluster["DBClusterIdentifier"] parameter_group_name = parameter_group["DBClusterParameterGroupName"] if q.ask( "\nDo you want to delete the database instance, DB cluster, and parameter " "group (y/n)? ", q.is_yesno, ): print(f"Deleting database instance {db_inst['DBInstanceIdentifier']}.") self.aurora_wrapper.delete_db_instance(db_inst["DBInstanceIdentifier"]) print(f"Deleting database cluster {cluster_name}.") self.aurora_wrapper.delete_db_cluster(cluster_name) print( "Waiting for the DB instance and DB cluster to delete.\n" "This typically takes several minutes." ) while db_inst is not None or cluster is not None: wait(30) if db_inst is not None: db_inst = self.aurora_wrapper.get_db_instance( db_inst["DBInstanceIdentifier"] ) if cluster is not None: cluster = self.aurora_wrapper.get_db_cluster( cluster["DBClusterIdentifier"] ) print(f"Deleting parameter group {parameter_group_name}.") self.aurora_wrapper.delete_parameter_group(parameter_group_name) def run_scenario(self, db_engine, parameter_group_name, cluster_name, db_name): print("-" * 88) print( "Welcome to the Amazon Relational Database Service (Amazon RDS) get started\n" "with Aurora DB clusters demo." ) print("-" * 88) parameter_group = self.create_parameter_group(db_engine, parameter_group_name) self.set_user_parameters(parameter_group_name) cluster = self.create_cluster(cluster_name, db_engine, db_name, parameter_group) wait(5) db_inst = self.create_instance(cluster) self.display_connection(cluster) self.create_snapshot(cluster_name) self.cleanup(db_inst, cluster, parameter_group) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") try: scenario = AuroraClusterScenario(AuroraWrapper.from_client()) scenario.run_scenario( "aurora-mysql", "doc-example-cluster-parameter-group", "doc-example-aurora", "docexampledb", ) except Exception: logging.exception("Something went wrong with the demo.")

定义场景调用以管理 Aurora 操作的函数。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_parameter_group(self, parameter_group_name): """ Gets a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to retrieve. :return: The requested parameter group. """ try: response = self.rds_client.describe_db_cluster_parameter_groups( DBClusterParameterGroupName=parameter_group_name ) parameter_group = response["DBClusterParameterGroups"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBParameterGroupNotFound": logger.info("Parameter group %s does not exist.", parameter_group_name) else: logger.error( "Couldn't get parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameter_group def create_parameter_group( self, parameter_group_name, parameter_group_family, description ): """ Creates a DB cluster parameter group that is based on the specified parameter group family. :param parameter_group_name: The name of the newly created parameter group. :param parameter_group_family: The family that is used as the basis of the new parameter group. :param description: A description given to the parameter group. :return: Data about the newly created parameter group. """ try: response = self.rds_client.create_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, DBParameterGroupFamily=parameter_group_family, Description=description, ) except ClientError as err: logger.error( "Couldn't create parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def delete_parameter_group(self, parameter_group_name): """ Deletes a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to delete. :return: Data about the parameter group. """ try: response = self.rds_client.delete_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name ) except ClientError as err: logger.error( "Couldn't delete parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def get_parameters(self, parameter_group_name, name_prefix="", source=None): """ Gets the parameters that are contained in a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to query. :param name_prefix: When specified, the retrieved list of parameters is filtered to contain only parameters that start with this prefix. :param source: When specified, only parameters from this source are retrieved. For example, a source of 'user' retrieves only parameters that were set by a user. :return: The list of requested parameters. """ try: kwargs = {"DBClusterParameterGroupName": parameter_group_name} if source is not None: kwargs["Source"] = source parameters = [] paginator = self.rds_client.get_paginator("describe_db_cluster_parameters") for page in paginator.paginate(**kwargs): parameters += [ p for p in page["Parameters"] if p["ParameterName"].startswith(name_prefix) ] except ClientError as err: logger.error( "Couldn't get parameters for %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameters def update_parameters(self, parameter_group_name, update_parameters): """ Updates parameters in a custom DB cluster parameter group. :param parameter_group_name: The name of the parameter group to update. :param update_parameters: The parameters to update in the group. :return: Data about the modified parameter group. """ try: response = self.rds_client.modify_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, Parameters=update_parameters, ) except ClientError as err: logger.error( "Couldn't update parameters in %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def get_db_cluster(self, cluster_name): """ Gets data about an Aurora DB cluster. :param cluster_name: The name of the DB cluster to retrieve. :return: The retrieved DB cluster. """ try: response = self.rds_client.describe_db_clusters( DBClusterIdentifier=cluster_name ) cluster = response["DBClusters"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBClusterNotFoundFault": logger.info("Cluster %s does not exist.", cluster_name) else: logger.error( "Couldn't verify the existence of DB cluster %s. Here's why: %s: %s", cluster_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster def create_db_cluster( self, cluster_name, parameter_group_name, db_name, db_engine, db_engine_version, admin_name, admin_password, ): """ Creates a DB cluster that is configured to use the specified parameter group. The newly created DB cluster contains a database that uses the specified engine and engine version. :param cluster_name: The name of the DB cluster to create. :param parameter_group_name: The name of the parameter group to associate with the DB cluster. :param db_name: The name of the database to create. :param db_engine: The database engine of the database that is created, such as MySql. :param db_engine_version: The version of the database engine. :param admin_name: The user name of the database administrator. :param admin_password: The password of the database administrator. :return: The newly created DB cluster. """ try: response = self.rds_client.create_db_cluster( DatabaseName=db_name, DBClusterIdentifier=cluster_name, DBClusterParameterGroupName=parameter_group_name, Engine=db_engine, EngineVersion=db_engine_version, MasterUsername=admin_name, MasterUserPassword=admin_password, ) cluster = response["DBCluster"] except ClientError as err: logger.error( "Couldn't create database %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster def delete_db_cluster(self, cluster_name): """ Deletes a DB cluster. :param cluster_name: The name of the DB cluster to delete. """ try: self.rds_client.delete_db_cluster( DBClusterIdentifier=cluster_name, SkipFinalSnapshot=True ) logger.info("Deleted DB cluster %s.", cluster_name) except ClientError: logger.exception("Couldn't delete DB cluster %s.", cluster_name) raise def create_cluster_snapshot(self, snapshot_id, cluster_id): """ Creates a snapshot of a DB cluster. :param snapshot_id: The ID to give the created snapshot. :param cluster_id: The DB cluster to snapshot. :return: Data about the newly created snapshot. """ try: response = self.rds_client.create_db_cluster_snapshot( DBClusterSnapshotIdentifier=snapshot_id, DBClusterIdentifier=cluster_id ) snapshot = response["DBClusterSnapshot"] except ClientError as err: logger.error( "Couldn't create snapshot of %s. Here's why: %s: %s", cluster_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot def get_cluster_snapshot(self, snapshot_id): """ Gets a DB cluster snapshot. :param snapshot_id: The ID of the snapshot to retrieve. :return: The retrieved snapshot. """ try: response = self.rds_client.describe_db_cluster_snapshots( DBClusterSnapshotIdentifier=snapshot_id ) snapshot = response["DBClusterSnapshots"][0] except ClientError as err: logger.error( "Couldn't get DB cluster snapshot %s. Here's why: %s: %s", snapshot_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot def create_instance_in_cluster( self, instance_id, cluster_id, db_engine, instance_class ): """ Creates a database instance in an existing DB cluster. The first database that is created defaults to a read-write DB instance. :param instance_id: The ID to give the newly created DB instance. :param cluster_id: The ID of the DB cluster where the DB instance is created. :param db_engine: The database engine of a database to create in the DB instance. This must be compatible with the configured parameter group of the DB cluster. :param instance_class: The DB instance class for the newly created DB instance. :return: Data about the newly created DB instance. """ try: response = self.rds_client.create_db_instance( DBInstanceIdentifier=instance_id, DBClusterIdentifier=cluster_id, Engine=db_engine, DBInstanceClass=instance_class, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't create DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst def get_engine_versions(self, engine, parameter_group_family=None): """ Gets database engine versions that are available for the specified engine and parameter group family. :param engine: The database engine to look up. :param parameter_group_family: When specified, restricts the returned list of engine versions to those that are compatible with this parameter group family. :return: The list of database engine versions. """ try: kwargs = {"Engine": engine} if parameter_group_family is not None: kwargs["DBParameterGroupFamily"] = parameter_group_family response = self.rds_client.describe_db_engine_versions(**kwargs) versions = response["DBEngineVersions"] except ClientError as err: logger.error( "Couldn't get engine versions for %s. Here's why: %s: %s", engine, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return versions def get_orderable_instances(self, db_engine, db_engine_version): """ Gets DB instance options that can be used to create DB instances that are compatible with a set of specifications. :param db_engine: The database engine that must be supported by the DB instance. :param db_engine_version: The engine version that must be supported by the DB instance. :return: The list of DB instance options that can be used to create a compatible DB instance. """ try: inst_opts = [] paginator = self.rds_client.get_paginator( "describe_orderable_db_instance_options" ) for page in paginator.paginate( Engine=db_engine, EngineVersion=db_engine_version ): inst_opts += page["OrderableDBInstanceOptions"] except ClientError as err: logger.error( "Couldn't get orderable DB instances. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return inst_opts def get_db_instance(self, instance_id): """ Gets data about a DB instance. :param instance_id: The ID of the DB instance to retrieve. :return: The retrieved DB instance. """ try: response = self.rds_client.describe_db_instances( DBInstanceIdentifier=instance_id ) db_inst = response["DBInstances"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBInstanceNotFound": logger.info("Instance %s does not exist.", instance_id) else: logger.error( "Couldn't get DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst def delete_db_instance(self, instance_id): """ Deletes a DB instance. :param instance_id: The ID of the DB instance to delete. :return: Data about the deleted DB instance. """ try: response = self.rds_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True, DeleteAutomatedBackups=True, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't delete DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst

以下代码示例展示了如何:

  • 创建自定义 Aurora 数据库集群参数组并设置参数值。

  • 创建一个使用参数组的数据库集群。

  • 创建包含数据库的数据库实例。

  • 拍摄数据库集群的快照,然后清理资源。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

class AuroraClusterScenario: """Runs a scenario that shows how to get started using Aurora DB clusters.""" def __init__(self, aurora_wrapper): """ :param aurora_wrapper: An object that wraps Aurora DB cluster actions. """ self.aurora_wrapper = aurora_wrapper def create_parameter_group(self, db_engine, parameter_group_name): """ Shows how to get available engine versions for a specified database engine and create a DB cluster parameter group that is compatible with a selected engine family. :param db_engine: The database engine to use as a basis. :param parameter_group_name: The name given to the newly created parameter group. :return: The newly created parameter group. """ print( f"Checking for an existing DB cluster parameter group named {parameter_group_name}." ) parameter_group = self.aurora_wrapper.get_parameter_group(parameter_group_name) if parameter_group is None: print(f"Getting available database engine versions for {db_engine}.") engine_versions = self.aurora_wrapper.get_engine_versions(db_engine) families = list({ver["DBParameterGroupFamily"] for ver in engine_versions}) family_index = q.choose("Which family do you want to use? ", families) print(f"Creating a DB cluster parameter group.") self.aurora_wrapper.create_parameter_group( parameter_group_name, families[family_index], "Example parameter group." ) parameter_group = self.aurora_wrapper.get_parameter_group( parameter_group_name ) print(f"Parameter group {parameter_group['DBClusterParameterGroupName']}:") pp(parameter_group) print("-" * 88) return parameter_group def set_user_parameters(self, parameter_group_name): """ Shows how to get the parameters contained in a custom parameter group and update some of the parameter values in the group. :param parameter_group_name: The name of the parameter group to query and modify. """ print("Let's set some parameter values in your parameter group.") auto_inc_parameters = self.aurora_wrapper.get_parameters( parameter_group_name, name_prefix="auto_increment" ) update_params = [] for auto_inc in auto_inc_parameters: if auto_inc["IsModifiable"] and auto_inc["DataType"] == "integer": print(f"The {auto_inc['ParameterName']} parameter is described as:") print(f"\t{auto_inc['Description']}") param_range = auto_inc["AllowedValues"].split("-") auto_inc["ParameterValue"] = str( q.ask( f"Enter a value between {param_range[0]} and {param_range[1]}: ", q.is_int, q.in_range(int(param_range[0]), int(param_range[1])), ) ) update_params.append(auto_inc) self.aurora_wrapper.update_parameters(parameter_group_name, update_params) print( "You can get a list of parameters you've set by specifying a source of 'user'." ) user_parameters = self.aurora_wrapper.get_parameters( parameter_group_name, source="user" ) pp(user_parameters) print("-" * 88) def create_cluster(self, cluster_name, db_engine, db_name, parameter_group): """ Shows how to create an Aurora DB cluster that contains a database of a specified type. The database is also configured to use a custom DB cluster parameter group. :param cluster_name: The name given to the newly created DB cluster. :param db_engine: The engine of the created database. :param db_name: The name given to the created database. :param parameter_group: The parameter group that is associated with the DB cluster. :return: The newly created DB cluster. """ print("Checking for an existing DB cluster.") cluster = self.aurora_wrapper.get_db_cluster(cluster_name) if cluster is None: admin_username = q.ask( "Enter an administrator user name for the database: ", q.non_empty ) admin_password = q.ask( "Enter a password for the administrator (at least 8 characters): ", q.non_empty, ) engine_versions = self.aurora_wrapper.get_engine_versions( db_engine, parameter_group["DBParameterGroupFamily"] ) engine_choices = [ ver["EngineVersionDescription"] for ver in engine_versions ] print("The available engines for your parameter group are:") engine_index = q.choose("Which engine do you want to use? ", engine_choices) print( f"Creating DB cluster {cluster_name} and database {db_name}.\n" f"The DB cluster is configured to use\n" f"your custom parameter group {parameter_group['DBClusterParameterGroupName']}\n" f"and selected engine {engine_choices[engine_index]}.\n" f"This typically takes several minutes." ) cluster = self.aurora_wrapper.create_db_cluster( cluster_name, parameter_group["DBClusterParameterGroupName"], db_name, db_engine, engine_versions[engine_index]["EngineVersion"], admin_username, admin_password, ) while cluster.get("Status") != "available": wait(30) cluster = self.aurora_wrapper.get_db_cluster(cluster_name) print("Cluster created and available.\n") print("Cluster data:") pp(cluster) print("-" * 88) return cluster def create_instance(self, cluster): """ Shows how to create a DB instance in an existing Aurora DB cluster. A new DB cluster contains no DB instances, so you must add one. The first DB instance that is added to a DB cluster defaults to a read-write DB instance. :param cluster: The DB cluster where the DB instance is added. :return: The newly created DB instance. """ print("Checking for an existing database instance.") cluster_name = cluster["DBClusterIdentifier"] db_inst = self.aurora_wrapper.get_db_instance(cluster_name) if db_inst is None: print("Let's create a database instance in your DB cluster.") print("First, choose a DB instance type:") inst_opts = self.aurora_wrapper.get_orderable_instances( cluster["Engine"], cluster["EngineVersion"] ) inst_choices = list( { opt["DBInstanceClass"] + ", storage type: " + opt["StorageType"] for opt in inst_opts } ) inst_index = q.choose( "Which DB instance class do you want to use? ", inst_choices ) print( f"Creating a database instance. This typically takes several minutes." ) db_inst = self.aurora_wrapper.create_instance_in_cluster( cluster_name, cluster_name, cluster["Engine"], inst_opts[inst_index]["DBInstanceClass"], ) while db_inst.get("DBInstanceStatus") != "available": wait(30) db_inst = self.aurora_wrapper.get_db_instance(cluster_name) print("Instance data:") pp(db_inst) print("-" * 88) return db_inst @staticmethod def display_connection(cluster): """ Displays connection information about an Aurora DB cluster and tips on how to connect to it. :param cluster: The DB cluster to display. """ print( "You can now connect to your database using your favorite MySql client.\n" "One way to connect is by using the 'mysql' shell on an Amazon EC2 instance\n" "that is running in the same VPC as your database cluster. Pass the endpoint,\n" "port, and administrator user name to 'mysql' and enter your password\n" "when prompted:\n" ) print( f"\n\tmysql -h {cluster['Endpoint']} -P {cluster['Port']} -u {cluster['MasterUsername']} -p\n" ) print( "For more information, see the User Guide for Aurora:\n" "\thttps://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_GettingStartedAurora.CreatingConnecting.Aurora.html#CHAP_GettingStartedAurora.Aurora.Connect" ) print("-" * 88) def create_snapshot(self, cluster_name): """ Shows how to create a DB cluster snapshot and wait until it's available. :param cluster_name: The name of a DB cluster to snapshot. """ if q.ask( "Do you want to create a snapshot of your DB cluster (y/n)? ", q.is_yesno ): snapshot_id = f"{cluster_name}-{uuid.uuid4()}" print( f"Creating a snapshot named {snapshot_id}. This typically takes a few minutes." ) snapshot = self.aurora_wrapper.create_cluster_snapshot( snapshot_id, cluster_name ) while snapshot.get("Status") != "available": wait(30) snapshot = self.aurora_wrapper.get_cluster_snapshot(snapshot_id) pp(snapshot) print("-" * 88) def cleanup(self, db_inst, cluster, parameter_group): """ Shows how to clean up a DB instance, DB cluster, and DB cluster parameter group. Before the DB cluster parameter group can be deleted, all associated DB instances and DB clusters must first be deleted. :param db_inst: The DB instance to delete. :param cluster: The DB cluster to delete. :param parameter_group: The DB cluster parameter group to delete. """ cluster_name = cluster["DBClusterIdentifier"] parameter_group_name = parameter_group["DBClusterParameterGroupName"] if q.ask( "\nDo you want to delete the database instance, DB cluster, and parameter " "group (y/n)? ", q.is_yesno, ): print(f"Deleting database instance {db_inst['DBInstanceIdentifier']}.") self.aurora_wrapper.delete_db_instance(db_inst["DBInstanceIdentifier"]) print(f"Deleting database cluster {cluster_name}.") self.aurora_wrapper.delete_db_cluster(cluster_name) print( "Waiting for the DB instance and DB cluster to delete.\n" "This typically takes several minutes." ) while db_inst is not None or cluster is not None: wait(30) if db_inst is not None: db_inst = self.aurora_wrapper.get_db_instance( db_inst["DBInstanceIdentifier"] ) if cluster is not None: cluster = self.aurora_wrapper.get_db_cluster( cluster["DBClusterIdentifier"] ) print(f"Deleting parameter group {parameter_group_name}.") self.aurora_wrapper.delete_parameter_group(parameter_group_name) def run_scenario(self, db_engine, parameter_group_name, cluster_name, db_name): print("-" * 88) print( "Welcome to the Amazon Relational Database Service (Amazon RDS) get started\n" "with Aurora DB clusters demo." ) print("-" * 88) parameter_group = self.create_parameter_group(db_engine, parameter_group_name) self.set_user_parameters(parameter_group_name) cluster = self.create_cluster(cluster_name, db_engine, db_name, parameter_group) wait(5) db_inst = self.create_instance(cluster) self.display_connection(cluster) self.create_snapshot(cluster_name) self.cleanup(db_inst, cluster, parameter_group) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") try: scenario = AuroraClusterScenario(AuroraWrapper.from_client()) scenario.run_scenario( "aurora-mysql", "doc-example-cluster-parameter-group", "doc-example-aurora", "docexampledb", ) except Exception: logging.exception("Something went wrong with the demo.")

定义场景调用以管理 Aurora 操作的函数。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_parameter_group(self, parameter_group_name): """ Gets a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to retrieve. :return: The requested parameter group. """ try: response = self.rds_client.describe_db_cluster_parameter_groups( DBClusterParameterGroupName=parameter_group_name ) parameter_group = response["DBClusterParameterGroups"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBParameterGroupNotFound": logger.info("Parameter group %s does not exist.", parameter_group_name) else: logger.error( "Couldn't get parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameter_group def create_parameter_group( self, parameter_group_name, parameter_group_family, description ): """ Creates a DB cluster parameter group that is based on the specified parameter group family. :param parameter_group_name: The name of the newly created parameter group. :param parameter_group_family: The family that is used as the basis of the new parameter group. :param description: A description given to the parameter group. :return: Data about the newly created parameter group. """ try: response = self.rds_client.create_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, DBParameterGroupFamily=parameter_group_family, Description=description, ) except ClientError as err: logger.error( "Couldn't create parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def delete_parameter_group(self, parameter_group_name): """ Deletes a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to delete. :return: Data about the parameter group. """ try: response = self.rds_client.delete_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name ) except ClientError as err: logger.error( "Couldn't delete parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def get_parameters(self, parameter_group_name, name_prefix="", source=None): """ Gets the parameters that are contained in a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to query. :param name_prefix: When specified, the retrieved list of parameters is filtered to contain only parameters that start with this prefix. :param source: When specified, only parameters from this source are retrieved. For example, a source of 'user' retrieves only parameters that were set by a user. :return: The list of requested parameters. """ try: kwargs = {"DBClusterParameterGroupName": parameter_group_name} if source is not None: kwargs["Source"] = source parameters = [] paginator = self.rds_client.get_paginator("describe_db_cluster_parameters") for page in paginator.paginate(**kwargs): parameters += [ p for p in page["Parameters"] if p["ParameterName"].startswith(name_prefix) ] except ClientError as err: logger.error( "Couldn't get parameters for %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameters def update_parameters(self, parameter_group_name, update_parameters): """ Updates parameters in a custom DB cluster parameter group. :param parameter_group_name: The name of the parameter group to update. :param update_parameters: The parameters to update in the group. :return: Data about the modified parameter group. """ try: response = self.rds_client.modify_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, Parameters=update_parameters, ) except ClientError as err: logger.error( "Couldn't update parameters in %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def get_db_cluster(self, cluster_name): """ Gets data about an Aurora DB cluster. :param cluster_name: The name of the DB cluster to retrieve. :return: The retrieved DB cluster. """ try: response = self.rds_client.describe_db_clusters( DBClusterIdentifier=cluster_name ) cluster = response["DBClusters"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBClusterNotFoundFault": logger.info("Cluster %s does not exist.", cluster_name) else: logger.error( "Couldn't verify the existence of DB cluster %s. Here's why: %s: %s", cluster_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster def create_db_cluster( self, cluster_name, parameter_group_name, db_name, db_engine, db_engine_version, admin_name, admin_password, ): """ Creates a DB cluster that is configured to use the specified parameter group. The newly created DB cluster contains a database that uses the specified engine and engine version. :param cluster_name: The name of the DB cluster to create. :param parameter_group_name: The name of the parameter group to associate with the DB cluster. :param db_name: The name of the database to create. :param db_engine: The database engine of the database that is created, such as MySql. :param db_engine_version: The version of the database engine. :param admin_name: The user name of the database administrator. :param admin_password: The password of the database administrator. :return: The newly created DB cluster. """ try: response = self.rds_client.create_db_cluster( DatabaseName=db_name, DBClusterIdentifier=cluster_name, DBClusterParameterGroupName=parameter_group_name, Engine=db_engine, EngineVersion=db_engine_version, MasterUsername=admin_name, MasterUserPassword=admin_password, ) cluster = response["DBCluster"] except ClientError as err: logger.error( "Couldn't create database %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster def delete_db_cluster(self, cluster_name): """ Deletes a DB cluster. :param cluster_name: The name of the DB cluster to delete. """ try: self.rds_client.delete_db_cluster( DBClusterIdentifier=cluster_name, SkipFinalSnapshot=True ) logger.info("Deleted DB cluster %s.", cluster_name) except ClientError: logger.exception("Couldn't delete DB cluster %s.", cluster_name) raise def create_cluster_snapshot(self, snapshot_id, cluster_id): """ Creates a snapshot of a DB cluster. :param snapshot_id: The ID to give the created snapshot. :param cluster_id: The DB cluster to snapshot. :return: Data about the newly created snapshot. """ try: response = self.rds_client.create_db_cluster_snapshot( DBClusterSnapshotIdentifier=snapshot_id, DBClusterIdentifier=cluster_id ) snapshot = response["DBClusterSnapshot"] except ClientError as err: logger.error( "Couldn't create snapshot of %s. Here's why: %s: %s", cluster_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot def get_cluster_snapshot(self, snapshot_id): """ Gets a DB cluster snapshot. :param snapshot_id: The ID of the snapshot to retrieve. :return: The retrieved snapshot. """ try: response = self.rds_client.describe_db_cluster_snapshots( DBClusterSnapshotIdentifier=snapshot_id ) snapshot = response["DBClusterSnapshots"][0] except ClientError as err: logger.error( "Couldn't get DB cluster snapshot %s. Here's why: %s: %s", snapshot_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot def create_instance_in_cluster( self, instance_id, cluster_id, db_engine, instance_class ): """ Creates a database instance in an existing DB cluster. The first database that is created defaults to a read-write DB instance. :param instance_id: The ID to give the newly created DB instance. :param cluster_id: The ID of the DB cluster where the DB instance is created. :param db_engine: The database engine of a database to create in the DB instance. This must be compatible with the configured parameter group of the DB cluster. :param instance_class: The DB instance class for the newly created DB instance. :return: Data about the newly created DB instance. """ try: response = self.rds_client.create_db_instance( DBInstanceIdentifier=instance_id, DBClusterIdentifier=cluster_id, Engine=db_engine, DBInstanceClass=instance_class, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't create DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst def get_engine_versions(self, engine, parameter_group_family=None): """ Gets database engine versions that are available for the specified engine and parameter group family. :param engine: The database engine to look up. :param parameter_group_family: When specified, restricts the returned list of engine versions to those that are compatible with this parameter group family. :return: The list of database engine versions. """ try: kwargs = {"Engine": engine} if parameter_group_family is not None: kwargs["DBParameterGroupFamily"] = parameter_group_family response = self.rds_client.describe_db_engine_versions(**kwargs) versions = response["DBEngineVersions"] except ClientError as err: logger.error( "Couldn't get engine versions for %s. Here's why: %s: %s", engine, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return versions def get_orderable_instances(self, db_engine, db_engine_version): """ Gets DB instance options that can be used to create DB instances that are compatible with a set of specifications. :param db_engine: The database engine that must be supported by the DB instance. :param db_engine_version: The engine version that must be supported by the DB instance. :return: The list of DB instance options that can be used to create a compatible DB instance. """ try: inst_opts = [] paginator = self.rds_client.get_paginator( "describe_orderable_db_instance_options" ) for page in paginator.paginate( Engine=db_engine, EngineVersion=db_engine_version ): inst_opts += page["OrderableDBInstanceOptions"] except ClientError as err: logger.error( "Couldn't get orderable DB instances. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return inst_opts def get_db_instance(self, instance_id): """ Gets data about a DB instance. :param instance_id: The ID of the DB instance to retrieve. :return: The retrieved DB instance. """ try: response = self.rds_client.describe_db_instances( DBInstanceIdentifier=instance_id ) db_inst = response["DBInstances"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBInstanceNotFound": logger.info("Instance %s does not exist.", instance_id) else: logger.error( "Couldn't get DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst def delete_db_instance(self, instance_id): """ Deletes a DB instance. :param instance_id: The ID of the DB instance to delete. :return: Data about the deleted DB instance. """ try: response = self.rds_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True, DeleteAutomatedBackups=True, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't delete DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst

操作

以下代码示例演示如何使用 CreateDBCluster

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_db_cluster( self, cluster_name, parameter_group_name, db_name, db_engine, db_engine_version, admin_name, admin_password, ): """ Creates a DB cluster that is configured to use the specified parameter group. The newly created DB cluster contains a database that uses the specified engine and engine version. :param cluster_name: The name of the DB cluster to create. :param parameter_group_name: The name of the parameter group to associate with the DB cluster. :param db_name: The name of the database to create. :param db_engine: The database engine of the database that is created, such as MySql. :param db_engine_version: The version of the database engine. :param admin_name: The user name of the database administrator. :param admin_password: The password of the database administrator. :return: The newly created DB cluster. """ try: response = self.rds_client.create_db_cluster( DatabaseName=db_name, DBClusterIdentifier=cluster_name, DBClusterParameterGroupName=parameter_group_name, Engine=db_engine, EngineVersion=db_engine_version, MasterUsername=admin_name, MasterUserPassword=admin_password, ) cluster = response["DBCluster"] except ClientError as err: logger.error( "Couldn't create database %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBCluster中创建 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBCluster

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_db_cluster( self, cluster_name, parameter_group_name, db_name, db_engine, db_engine_version, admin_name, admin_password, ): """ Creates a DB cluster that is configured to use the specified parameter group. The newly created DB cluster contains a database that uses the specified engine and engine version. :param cluster_name: The name of the DB cluster to create. :param parameter_group_name: The name of the parameter group to associate with the DB cluster. :param db_name: The name of the database to create. :param db_engine: The database engine of the database that is created, such as MySql. :param db_engine_version: The version of the database engine. :param admin_name: The user name of the database administrator. :param admin_password: The password of the database administrator. :return: The newly created DB cluster. """ try: response = self.rds_client.create_db_cluster( DatabaseName=db_name, DBClusterIdentifier=cluster_name, DBClusterParameterGroupName=parameter_group_name, Engine=db_engine, EngineVersion=db_engine_version, MasterUsername=admin_name, MasterUserPassword=admin_password, ) cluster = response["DBCluster"] except ClientError as err: logger.error( "Couldn't create database %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBCluster中创建 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBClusterParameterGroup

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_parameter_group( self, parameter_group_name, parameter_group_family, description ): """ Creates a DB cluster parameter group that is based on the specified parameter group family. :param parameter_group_name: The name of the newly created parameter group. :param parameter_group_family: The family that is used as the basis of the new parameter group. :param description: A description given to the parameter group. :return: Data about the newly created parameter group. """ try: response = self.rds_client.create_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, DBParameterGroupFamily=parameter_group_family, Description=description, ) except ClientError as err: logger.error( "Couldn't create parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBClusterParameterGroup中创建 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBClusterParameterGroup

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_parameter_group( self, parameter_group_name, parameter_group_family, description ): """ Creates a DB cluster parameter group that is based on the specified parameter group family. :param parameter_group_name: The name of the newly created parameter group. :param parameter_group_family: The family that is used as the basis of the new parameter group. :param description: A description given to the parameter group. :return: Data about the newly created parameter group. """ try: response = self.rds_client.create_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, DBParameterGroupFamily=parameter_group_family, Description=description, ) except ClientError as err: logger.error( "Couldn't create parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBClusterParameterGroup中创建 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBClusterSnapshot

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_cluster_snapshot(self, snapshot_id, cluster_id): """ Creates a snapshot of a DB cluster. :param snapshot_id: The ID to give the created snapshot. :param cluster_id: The DB cluster to snapshot. :return: Data about the newly created snapshot. """ try: response = self.rds_client.create_db_cluster_snapshot( DBClusterSnapshotIdentifier=snapshot_id, DBClusterIdentifier=cluster_id ) snapshot = response["DBClusterSnapshot"] except ClientError as err: logger.error( "Couldn't create snapshot of %s. Here's why: %s: %s", cluster_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包中创建DBCluster快照 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBClusterSnapshot

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_cluster_snapshot(self, snapshot_id, cluster_id): """ Creates a snapshot of a DB cluster. :param snapshot_id: The ID to give the created snapshot. :param cluster_id: The DB cluster to snapshot. :return: Data about the newly created snapshot. """ try: response = self.rds_client.create_db_cluster_snapshot( DBClusterSnapshotIdentifier=snapshot_id, DBClusterIdentifier=cluster_id ) snapshot = response["DBClusterSnapshot"] except ClientError as err: logger.error( "Couldn't create snapshot of %s. Here's why: %s: %s", cluster_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包中创建DBCluster快照 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBInstance

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_instance_in_cluster( self, instance_id, cluster_id, db_engine, instance_class ): """ Creates a database instance in an existing DB cluster. The first database that is created defaults to a read-write DB instance. :param instance_id: The ID to give the newly created DB instance. :param cluster_id: The ID of the DB cluster where the DB instance is created. :param db_engine: The database engine of a database to create in the DB instance. This must be compatible with the configured parameter group of the DB cluster. :param instance_class: The DB instance class for the newly created DB instance. :return: Data about the newly created DB instance. """ try: response = self.rds_client.create_db_instance( DBInstanceIdentifier=instance_id, DBClusterIdentifier=cluster_id, Engine=db_engine, DBInstanceClass=instance_class, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't create DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBInstance中创建 (Boto3) API 参考

以下代码示例演示如何使用 CreateDBInstance

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def create_instance_in_cluster( self, instance_id, cluster_id, db_engine, instance_class ): """ Creates a database instance in an existing DB cluster. The first database that is created defaults to a read-write DB instance. :param instance_id: The ID to give the newly created DB instance. :param cluster_id: The ID of the DB cluster where the DB instance is created. :param db_engine: The database engine of a database to create in the DB instance. This must be compatible with the configured parameter group of the DB cluster. :param instance_class: The DB instance class for the newly created DB instance. :return: Data about the newly created DB instance. """ try: response = self.rds_client.create_db_instance( DBInstanceIdentifier=instance_id, DBClusterIdentifier=cluster_id, Engine=db_engine, DBInstanceClass=instance_class, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't create DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBInstance中创建 (Boto3) API 参考

以下代码示例演示如何使用 DeleteDBCluster

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def delete_db_cluster(self, cluster_name): """ Deletes a DB cluster. :param cluster_name: The name of the DB cluster to delete. """ try: self.rds_client.delete_db_cluster( DBClusterIdentifier=cluster_name, SkipFinalSnapshot=True ) logger.info("Deleted DB cluster %s.", cluster_name) except ClientError: logger.exception("Couldn't delete DB cluster %s.", cluster_name) raise
  • 有关 API 的详细信息,请参阅 Python DBCluster 版AWS SDK 中删除 (Boto3) API 参考

以下代码示例演示如何使用 DeleteDBCluster

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def delete_db_cluster(self, cluster_name): """ Deletes a DB cluster. :param cluster_name: The name of the DB cluster to delete. """ try: self.rds_client.delete_db_cluster( DBClusterIdentifier=cluster_name, SkipFinalSnapshot=True ) logger.info("Deleted DB cluster %s.", cluster_name) except ClientError: logger.exception("Couldn't delete DB cluster %s.", cluster_name) raise
  • 有关 API 的详细信息,请参阅 Python DBCluster 版AWS SDK 中删除 (Boto3) API 参考

以下代码示例演示如何使用 DeleteDBClusterParameterGroup

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def delete_parameter_group(self, parameter_group_name): """ Deletes a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to delete. :return: Data about the parameter group. """ try: response = self.rds_client.delete_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name ) except ClientError as err: logger.error( "Couldn't delete parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅 Python DBCluster ParameterGroup 版AWS SDK 中删除 (Boto3) API 参考

以下代码示例演示如何使用 DeleteDBClusterParameterGroup

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def delete_parameter_group(self, parameter_group_name): """ Deletes a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to delete. :return: Data about the parameter group. """ try: response = self.rds_client.delete_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name ) except ClientError as err: logger.error( "Couldn't delete parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅 Python DBCluster ParameterGroup 版AWS SDK 中删除 (Boto3) API 参考

以下代码示例演示如何使用 DeleteDBInstance

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def delete_db_instance(self, instance_id): """ Deletes a DB instance. :param instance_id: The ID of the DB instance to delete. :return: Data about the deleted DB instance. """ try: response = self.rds_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True, DeleteAutomatedBackups=True, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't delete DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst
  • 有关 API 的详细信息,请参阅 Python DBInstance 版AWS SDK 中删除 (Boto3) API 参考

以下代码示例演示如何使用 DeleteDBInstance

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def delete_db_instance(self, instance_id): """ Deletes a DB instance. :param instance_id: The ID of the DB instance to delete. :return: Data about the deleted DB instance. """ try: response = self.rds_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True, DeleteAutomatedBackups=True, ) db_inst = response["DBInstance"] except ClientError as err: logger.error( "Couldn't delete DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst
  • 有关 API 的详细信息,请参阅 Python DBInstance 版AWS SDK 中删除 (Boto3) API 参考

以下代码示例演示如何使用 DescribeDBClusterParameterGroups

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_parameter_group(self, parameter_group_name): """ Gets a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to retrieve. :return: The requested parameter group. """ try: response = self.rds_client.describe_db_cluster_parameter_groups( DBClusterParameterGroupName=parameter_group_name ) parameter_group = response["DBClusterParameterGroups"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBParameterGroupNotFound": logger.info("Parameter group %s does not exist.", parameter_group_name) else: logger.error( "Couldn't get parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameter_group
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBClusterParameterGroups中的描述

以下代码示例演示如何使用 DescribeDBClusterParameterGroups

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_parameter_group(self, parameter_group_name): """ Gets a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to retrieve. :return: The requested parameter group. """ try: response = self.rds_client.describe_db_cluster_parameter_groups( DBClusterParameterGroupName=parameter_group_name ) parameter_group = response["DBClusterParameterGroups"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBParameterGroupNotFound": logger.info("Parameter group %s does not exist.", parameter_group_name) else: logger.error( "Couldn't get parameter group %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameter_group
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBClusterParameterGroups中的描述

以下代码示例演示如何使用 DescribeDBClusterParameters

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_parameters(self, parameter_group_name, name_prefix="", source=None): """ Gets the parameters that are contained in a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to query. :param name_prefix: When specified, the retrieved list of parameters is filtered to contain only parameters that start with this prefix. :param source: When specified, only parameters from this source are retrieved. For example, a source of 'user' retrieves only parameters that were set by a user. :return: The list of requested parameters. """ try: kwargs = {"DBClusterParameterGroupName": parameter_group_name} if source is not None: kwargs["Source"] = source parameters = [] paginator = self.rds_client.get_paginator("describe_db_cluster_parameters") for page in paginator.paginate(**kwargs): parameters += [ p for p in page["Parameters"] if p["ParameterName"].startswith(name_prefix) ] except ClientError as err: logger.error( "Couldn't get parameters for %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameters
  • 有关 API 的详细信息,请参阅 Python 版AWS SDK (Boto 3) API 参考中的 “描述DBCluster参数”。

以下代码示例演示如何使用 DescribeDBClusterParameters

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_parameters(self, parameter_group_name, name_prefix="", source=None): """ Gets the parameters that are contained in a DB cluster parameter group. :param parameter_group_name: The name of the parameter group to query. :param name_prefix: When specified, the retrieved list of parameters is filtered to contain only parameters that start with this prefix. :param source: When specified, only parameters from this source are retrieved. For example, a source of 'user' retrieves only parameters that were set by a user. :return: The list of requested parameters. """ try: kwargs = {"DBClusterParameterGroupName": parameter_group_name} if source is not None: kwargs["Source"] = source parameters = [] paginator = self.rds_client.get_paginator("describe_db_cluster_parameters") for page in paginator.paginate(**kwargs): parameters += [ p for p in page["Parameters"] if p["ParameterName"].startswith(name_prefix) ] except ClientError as err: logger.error( "Couldn't get parameters for %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return parameters
  • 有关 API 的详细信息,请参阅 Python 版AWS SDK (Boto 3) API 参考中的 “描述DBCluster参数”。

以下代码示例演示如何使用 DescribeDBClusterSnapshots

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_cluster_snapshot(self, snapshot_id): """ Gets a DB cluster snapshot. :param snapshot_id: The ID of the snapshot to retrieve. :return: The retrieved snapshot. """ try: response = self.rds_client.describe_db_cluster_snapshots( DBClusterSnapshotIdentifier=snapshot_id ) snapshot = response["DBClusterSnapshots"][0] except ClientError as err: logger.error( "Couldn't get DB cluster snapshot %s. Here's why: %s: %s", snapshot_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot
  • 有关 API 的详细信息,请参阅《在 Python AWS SDK 中描述DBCluster快照 (Boto3) API 参考》。

以下代码示例演示如何使用 DescribeDBClusterSnapshots

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_cluster_snapshot(self, snapshot_id): """ Gets a DB cluster snapshot. :param snapshot_id: The ID of the snapshot to retrieve. :return: The retrieved snapshot. """ try: response = self.rds_client.describe_db_cluster_snapshots( DBClusterSnapshotIdentifier=snapshot_id ) snapshot = response["DBClusterSnapshots"][0] except ClientError as err: logger.error( "Couldn't get DB cluster snapshot %s. Here's why: %s: %s", snapshot_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return snapshot
  • 有关 API 的详细信息,请参阅《在 Python AWS SDK 中描述DBCluster快照 (Boto3) API 参考》。

以下代码示例演示如何使用 DescribeDBClusters

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_db_cluster(self, cluster_name): """ Gets data about an Aurora DB cluster. :param cluster_name: The name of the DB cluster to retrieve. :return: The retrieved DB cluster. """ try: response = self.rds_client.describe_db_clusters( DBClusterIdentifier=cluster_name ) cluster = response["DBClusters"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBClusterNotFoundFault": logger.info("Cluster %s does not exist.", cluster_name) else: logger.error( "Couldn't verify the existence of DB cluster %s. Here's why: %s: %s", cluster_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBClusters中的描述

以下代码示例演示如何使用 DescribeDBClusters

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_db_cluster(self, cluster_name): """ Gets data about an Aurora DB cluster. :param cluster_name: The name of the DB cluster to retrieve. :return: The retrieved DB cluster. """ try: response = self.rds_client.describe_db_clusters( DBClusterIdentifier=cluster_name ) cluster = response["DBClusters"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBClusterNotFoundFault": logger.info("Cluster %s does not exist.", cluster_name) else: logger.error( "Couldn't verify the existence of DB cluster %s. Here's why: %s: %s", cluster_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return cluster
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBClusters中的描述

以下代码示例演示如何使用 DescribeDBEngineVersions

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_engine_versions(self, engine, parameter_group_family=None): """ Gets database engine versions that are available for the specified engine and parameter group family. :param engine: The database engine to look up. :param parameter_group_family: When specified, restricts the returned list of engine versions to those that are compatible with this parameter group family. :return: The list of database engine versions. """ try: kwargs = {"Engine": engine} if parameter_group_family is not None: kwargs["DBParameterGroupFamily"] = parameter_group_family response = self.rds_client.describe_db_engine_versions(**kwargs) versions = response["DBEngineVersions"] except ClientError as err: logger.error( "Couldn't get engine versions for %s. Here's why: %s: %s", engine, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return versions

以下代码示例演示如何使用 DescribeDBEngineVersions

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_engine_versions(self, engine, parameter_group_family=None): """ Gets database engine versions that are available for the specified engine and parameter group family. :param engine: The database engine to look up. :param parameter_group_family: When specified, restricts the returned list of engine versions to those that are compatible with this parameter group family. :return: The list of database engine versions. """ try: kwargs = {"Engine": engine} if parameter_group_family is not None: kwargs["DBParameterGroupFamily"] = parameter_group_family response = self.rds_client.describe_db_engine_versions(**kwargs) versions = response["DBEngineVersions"] except ClientError as err: logger.error( "Couldn't get engine versions for %s. Here's why: %s: %s", engine, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return versions

以下代码示例演示如何使用 DescribeDBInstances

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_db_instance(self, instance_id): """ Gets data about a DB instance. :param instance_id: The ID of the DB instance to retrieve. :return: The retrieved DB instance. """ try: response = self.rds_client.describe_db_instances( DBInstanceIdentifier=instance_id ) db_inst = response["DBInstances"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBInstanceNotFound": logger.info("Instance %s does not exist.", instance_id) else: logger.error( "Couldn't get DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBInstances中的描述

以下代码示例演示如何使用 DescribeDBInstances

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_db_instance(self, instance_id): """ Gets data about a DB instance. :param instance_id: The ID of the DB instance to retrieve. :return: The retrieved DB instance. """ try: response = self.rds_client.describe_db_instances( DBInstanceIdentifier=instance_id ) db_inst = response["DBInstances"][0] except ClientError as err: if err.response["Error"]["Code"] == "DBInstanceNotFound": logger.info("Instance %s does not exist.", instance_id) else: logger.error( "Couldn't get DB instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return db_inst
  • 有关 API 的详细信息,请参阅适用于 Python 的AWS SDK (Boto3) API 参考DBInstances中的描述

以下代码示例演示如何使用 DescribeOrderableDBInstanceOptions

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_orderable_instances(self, db_engine, db_engine_version): """ Gets DB instance options that can be used to create DB instances that are compatible with a set of specifications. :param db_engine: The database engine that must be supported by the DB instance. :param db_engine_version: The engine version that must be supported by the DB instance. :return: The list of DB instance options that can be used to create a compatible DB instance. """ try: inst_opts = [] paginator = self.rds_client.get_paginator( "describe_orderable_db_instance_options" ) for page in paginator.paginate( Engine=db_engine, EngineVersion=db_engine_version ): inst_opts += page["OrderableDBInstanceOptions"] except ClientError as err: logger.error( "Couldn't get orderable DB instances. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return inst_opts

以下代码示例演示如何使用 DescribeOrderableDBInstanceOptions

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def get_orderable_instances(self, db_engine, db_engine_version): """ Gets DB instance options that can be used to create DB instances that are compatible with a set of specifications. :param db_engine: The database engine that must be supported by the DB instance. :param db_engine_version: The engine version that must be supported by the DB instance. :return: The list of DB instance options that can be used to create a compatible DB instance. """ try: inst_opts = [] paginator = self.rds_client.get_paginator( "describe_orderable_db_instance_options" ) for page in paginator.paginate( Engine=db_engine, EngineVersion=db_engine_version ): inst_opts += page["OrderableDBInstanceOptions"] except ClientError as err: logger.error( "Couldn't get orderable DB instances. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return inst_opts

以下代码示例演示如何使用 ModifyDBClusterParameterGroup

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def update_parameters(self, parameter_group_name, update_parameters): """ Updates parameters in a custom DB cluster parameter group. :param parameter_group_name: The name of the parameter group to update. :param update_parameters: The parameters to update in the group. :return: Data about the modified parameter group. """ try: response = self.rds_client.modify_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, Parameters=update_parameters, ) except ClientError as err: logger.error( "Couldn't update parameters in %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBClusterParameterGroup中修改 (Boto3) API 参考

以下代码示例演示如何使用 ModifyDBClusterParameterGroup

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class AuroraWrapper: """Encapsulates Aurora DB cluster actions.""" def __init__(self, rds_client): """ :param rds_client: A Boto3 Amazon Relational Database Service (Amazon RDS) client. """ self.rds_client = rds_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ rds_client = boto3.client("rds") return cls(rds_client) def update_parameters(self, parameter_group_name, update_parameters): """ Updates parameters in a custom DB cluster parameter group. :param parameter_group_name: The name of the parameter group to update. :param update_parameters: The parameters to update in the group. :return: Data about the modified parameter group. """ try: response = self.rds_client.modify_db_cluster_parameter_group( DBClusterParameterGroupName=parameter_group_name, Parameters=update_parameters, ) except ClientError as err: logger.error( "Couldn't update parameters in %s. Here's why: %s: %s", parameter_group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅在 Python AWS 开发工具包DBClusterParameterGroup中修改 (Boto3) API 参考

场景

以下代码示例显示如何创建借阅图书馆,其中顾客可以使用由 Amazon Aurora 数据库支持的 REST API 借阅和归还图书。

适用于 Python 的 SDK(Boto3)

演示如何使用 AWS SDK for Python (Boto3) 与亚马逊关系数据库服务 (Amazon RDS) API 和 AWS Chalice 一起创建由亚马逊 Aurora 数据库支持的 REST API。此 Web 服务是完全无服务器的,代表简单的借阅图书馆,其中顾客可以借阅和归还图书。了解如何:

  • 创建和管理无服务器 Aurora 数据库集群。

  • AWS Secrets Manager 用于管理数据库凭证。

  • 实施一个数据存储层,该层使用 Amazon RDS 将数据移入和移出数据库。

  • 使用 AWS Chalice 将无服务器 REST API 部署到亚马逊 API Gateway 和。 AWS Lambda

  • 使用请求软件包向 Web 服务发送请求。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • API Gateway

  • Aurora

  • Lambda

  • Secrets Manager

以下代码示例显示如何创建借阅图书馆,其中顾客可以使用由 Amazon Aurora 数据库支持的 REST API 借阅和归还图书。

适用于 Python 的 SDK(Boto3)

演示如何使用 AWS SDK for Python (Boto3) 与亚马逊关系数据库服务 (Amazon RDS) API 和 AWS Chalice 一起创建由亚马逊 Aurora 数据库支持的 REST API。此 Web 服务是完全无服务器的,代表简单的借阅图书馆,其中顾客可以借阅和归还图书。了解如何:

  • 创建和管理无服务器 Aurora 数据库集群。

  • AWS Secrets Manager 用于管理数据库凭证。

  • 实施一个数据存储层,该层使用 Amazon RDS 将数据移入和移出数据库。

  • 使用 AWS Chalice 将无服务器 REST API 部署到亚马逊 API Gateway 和。 AWS Lambda

  • 使用请求软件包向 Web 服务发送请求。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • API Gateway

  • Aurora

  • Lambda

  • Secrets Manager

以下代码示例演示如何创建一个 Web 应用程序,该应用程序可跟踪 Amazon Aurora Serverless 数据库中的工作项目并使用亚马逊简单电子邮件服务 (Amazon SES) 发送报告。

适用于 Python 的 SDK(Boto3)

演示如何使用创建 REST 服务,该 AWS SDK for Python (Boto3) 服务可跟踪亚马逊 Aurora 无服务器数据库中的工作项目,并使用亚马逊简单电子邮件服务 (Amazon SES) 通过电子邮件发送报告。此示例使用 Flask Web 框架处理 HTTP 路由,并且与 React 网页集成来呈现完整功能的 Web 应用程序。

  • 构建与 AWS 服务集成的 Flask REST 服务。

  • 读取、写入和更新存储在 Aurora Serverless 数据库中的工作项。

  • 创建包含数据库凭据的 AWS Secrets Manager 密钥,并使用它来验证对数据库的调用。

  • 使用 Amazon SES 发送工作项的电子邮件报告。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • Aurora

  • Amazon RDS

  • Amazon RDS 数据服务

  • Amazon SES

以下代码示例演示如何创建一个 Web 应用程序,该应用程序可跟踪 Amazon Aurora Serverless 数据库中的工作项目并使用亚马逊简单电子邮件服务 (Amazon SES) 发送报告。

适用于 Python 的 SDK(Boto3)

演示如何使用创建 REST 服务,该 AWS SDK for Python (Boto3) 服务可跟踪亚马逊 Aurora 无服务器数据库中的工作项目,并使用亚马逊简单电子邮件服务 (Amazon SES) 通过电子邮件发送报告。此示例使用 Flask Web 框架处理 HTTP 路由,并且与 React 网页集成来呈现完整功能的 Web 应用程序。

  • 构建与 AWS 服务集成的 Flask REST 服务。

  • 读取、写入和更新存储在 Aurora Serverless 数据库中的工作项。

  • 创建包含数据库凭据的 AWS Secrets Manager 密钥,并使用它来验证对数据库的调用。

  • 使用 Amazon SES 发送工作项的电子邮件报告。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • Aurora

  • Amazon RDS

  • Amazon RDS 数据服务

  • Amazon SES

下一主题:

Auto Scaling

上一主题:

Audit Manager
隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。