SDK for Python (Boto3) を使用した Auto Scaling の例 - AWS SDK コードサンプル

Doc AWS SDK Examples リポジトリには、他にも SDK の例があります。 AWS GitHub

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SDK for Python (Boto3) を使用した Auto Scaling の例

次のコードサンプルは、Auto Scaling で AWS SDK for Python (Boto3) を使用してアクションを実行し、一般的なシナリオを実装する方法を示しています。

アクションはより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、関連するシナリオやサービス間の例ではアクションのコンテキストが確認できます。

「シナリオ」は、同じサービス内で複数の関数を呼び出して、特定のタスクを実行する方法を示すコード例です。

各例には、 へのリンクが含まれています。このリンクには GitHub、コンテキスト内でコードをセットアップして実行する方法の手順が記載されています。

開始方法

次のコード例は、Auto Scaling の使用を開始する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

import boto3 def hello_autoscaling(autoscaling_client): """ Use the AWS SDK for Python (Boto3) to create an Amazon EC2 Auto Scaling client and list some of the Auto Scaling groups in your account. This example uses the default settings specified in your shared credentials and config files. :param auto-scaling_client: A Boto3 Amazon EC2 Auto Scaling client object. """ print( "Hello, Amazon EC2 Auto Scaling! Let's list up to ten of you Auto Scaling groups:" ) response = autoscaling_client.describe_auto_scaling_groups() groups = response.get("AutoScalingGroups", []) if groups: for group in groups: print(f"\t{group['AutoScalingGroupName']}: {group['AvailabilityZones']}") else: print("There are no Auto Scaling groups in your account.") if __name__ == "__main__": hello_autoscaling(boto3.client("autoscaling"))
  • API の詳細については、 DescribeAutoScalingGroups AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

アクション

次のコード例は、Auto Scaling グループを ELB ターゲットグループにアタッチする方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScaler: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix, inst_type, ami_param, autoscaling_client, ec2_client, ssm_client, iam_client, ): """ :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client self.launch_template_name = f"{resource_prefix}-template" self.group_name = f"{resource_prefix}-group" self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" self.key_pair_name = f"{resource_prefix}-key-pair" def attach_load_balancer_target_group(self, lb_target_group): """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forward requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: raise AutoScalerError( f"Couldn't attach load balancer target group {lb_target_group['TargetGroupName']}\n" f"to auto scaling group {self.group_name}" )
  • API の詳細については、 AttachLoadBalancerTargetGroups AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling グループを作成する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def create_group( self, group_name, group_zones, launch_template_name, min_size, max_size ): """ Creates an Auto Scaling group. :param group_name: The name to give to the group. :param group_zones: The Availability Zones in which instances can be created. :param launch_template_name: The name of an existing Amazon EC2 launch template. The launch template specifies the configuration of instances that are created by auto scaling activities. :param min_size: The minimum number of active instances in the group. :param max_size: The maximum number of active instances in the group. """ try: self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=group_name, AvailabilityZones=group_zones, LaunchTemplate={ "LaunchTemplateName": launch_template_name, "Version": "$Default", }, MinSize=min_size, MaxSize=max_size, ) except ClientError as err: logger.error( "Couldn't create group %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • API の詳細については、 CreateAutoScalingGroup AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling グループを削除する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

Auto Scaling グループの最小サイズをゼロに更新し、グループ内のすべてのインスタンスを終了して、グループを削除します。

class AutoScaler: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix, inst_type, ami_param, autoscaling_client, ec2_client, ssm_client, iam_client, ): """ :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client self.launch_template_name = f"{resource_prefix}-template" self.group_name = f"{resource_prefix}-group" self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" self.key_pair_name = f"{resource_prefix}-key-pair" def _try_terminate_instance(self, inst_id): stopping = False log.info(f"Stopping {inst_id}.") while not stopping: try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=inst_id, ShouldDecrementDesiredCapacity=True ) stopping = True except ClientError as err: if err.response["Error"]["Code"] == "ScalingActivityInProgress": log.info("Scaling activity in progress for %s. Waiting...", inst_id) time.sleep(10) else: raise AutoScalerError(f"Couldn't stop instance {inst_id}: {err}.") def _try_delete_group(self): """ Tries to delete the EC2 Auto Scaling group. If the group is in use or in progress, the function waits and retries until the group is successfully deleted. """ stopped = False while not stopped: try: self.autoscaling_client.delete_auto_scaling_group( AutoScalingGroupName=self.group_name ) stopped = True log.info("Deleted EC2 Auto Scaling group %s.", self.group_name) except ClientError as err: if ( err.response["Error"]["Code"] == "ResourceInUse" or err.response["Error"]["Code"] == "ScalingActivityInProgress" ): log.info( "Some instances are still running. Waiting for them to stop..." ) time.sleep(10) else: raise AutoScalerError( f"Couldn't delete group {self.group_name}: {err}." ) def delete_group(self): """ Terminates all instances in the group, deletes the EC2 Auto Scaling group. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=self.group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self._try_terminate_instance(inst_id) self._try_delete_group() else: log.info("No groups found named %s, nothing to do.", self.group_name) except ClientError as err: raise AutoScalerError(f"Couldn't delete group {self.group_name}: {err}.")
  • API の詳細については、 DeleteAutoScalingGroup AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling グループの CloudWatch メトリクス収集を無効にする方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def disable_metrics(self, group_name): """ Stops CloudWatch metric collection for the Auto Scaling group. :param group_name: The name of the group. """ try: self.autoscaling_client.disable_metrics_collection( AutoScalingGroupName=group_name ) except ClientError as err: logger.error( "Couldn't disable metrics %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • API の詳細については、DisableMetricsCollectionAWS「 SDK for Python (Boto3) API リファレンス」の「」を参照してください。

次のコード例は、Auto Scaling グループの CloudWatch メトリクス収集を有効にする方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def enable_metrics(self, group_name, metrics): """ Enables CloudWatch metric collection for Amazon EC2 Auto Scaling activities. :param group_name: The name of the group to enable. :param metrics: A list of metrics to collect. """ try: self.autoscaling_client.enable_metrics_collection( AutoScalingGroupName=group_name, Metrics=metrics, Granularity="1Minute" ) except ClientError as err: logger.error( "Couldn't enable metrics on %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • API の詳細については、 EnableMetricsCollection AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling グループに関する情報を取得する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def describe_group(self, group_name): """ Gets information about an Auto Scaling group. :param group_name: The name of the group to look up. :return: Information about the group, if found. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) except ClientError as err: logger.error( "Couldn't describe group %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: groups = response.get("AutoScalingGroups", []) return groups[0] if len(groups) > 0 else None
  • API の詳細については、 DescribeAutoScalingGroups AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling インスタンスに関する情報を取得する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def describe_instances(self, instance_ids): """ Gets information about instances. :param instance_ids: A list of instance IDs to look up. :return: Information about instances, or an empty list if none are found. """ try: response = self.autoscaling_client.describe_auto_scaling_instances( InstanceIds=instance_ids ) except ClientError as err: logger.error( "Couldn't describe instances %s. Here's why: %s: %s", instance_ids, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["AutoScalingInstances"]
  • API の詳細については、 DescribeAutoScalingInstances AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling アクティビティに関する情報を取得する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def describe_scaling_activities(self, group_name): """ Gets information about scaling activities for the group. Scaling activities are things like instances stopping or starting in response to user requests or capacity changes. :param group_name: The name of the group to look up. :return: The list of scaling activities for the group, ordered with the most recent activity first. """ try: response = self.autoscaling_client.describe_scaling_activities( AutoScalingGroupName=group_name ) except ClientError as err: logger.error( "Couldn't describe scaling activities %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Activities"]
  • API の詳細については、 DescribeScalingActivities AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling グループの希望するキャパシティを設定する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def set_desired_capacity(self, group_name, capacity): """ Sets the desired capacity of the group. Amazon EC2 Auto Scaling tries to keep the number of running instances equal to the desired capacity. :param group_name: The name of the group to update. :param capacity: The desired number of running instances. """ try: self.autoscaling_client.set_desired_capacity( AutoScalingGroupName=group_name, DesiredCapacity=capacity, HonorCooldown=False, ) except ClientError as err: logger.error( "Couldn't set desired capacity %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • API の詳細については、 SetDesiredCapacity AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

次のコード例は、Auto Scaling グループでインスタンスを削除する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def terminate_instance(self, instance_id, decrease_capacity): """ Stops an instance. :param instance_id: The ID of the instance to stop. :param decrease_capacity: Specifies whether to decrease the desired capacity of the group. When passing True for this parameter, you can stop an instance without having a replacement instance start when the desired capacity threshold is crossed. :return: The scaling activity that occurs in response to this action. """ try: response = self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrease_capacity ) except ClientError as err: logger.error( "Couldn't terminate instance %s. Here's why: %s: %s", instance_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Activity"]

次のコード例は、Auto Scaling グループの設定を更新する方法を示しています。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def update_group(self, group_name, **kwargs): """ Updates an Auto Scaling group. :param group_name: The name of the group to update. :param kwargs: Keyword arguments to pass through to the service. """ try: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, **kwargs ) except ClientError as err: logger.error( "Couldn't update group %s. Here's why: %s: %s", group_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • API の詳細については、 UpdateAutoScalingGroup AWS SDK for Python (Boto3) API リファレンスの「」を参照してください。

シナリオ

次のコード例は、本、映画、曲のレコメンデーションを返す負荷分散型ウェブサービスの作成方法を示しています。この例は、障害に対するサービスの対応方法と、障害発生時の耐障害性を高めるためにサービスを再構築する方法を示しています。

  • Amazon EC2 Auto Scaling グループを使用して、起動テンプレートに基づいて Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを作成し、インスタンス数を所定の範囲内に維持します。

  • Elastic Load Balancing で HTTP リクエストを処理して配信します。

  • Auto Scaling グループ内のインスタンスの状態を監視し、正常なインスタンスにのみリクエストを転送します。

  • 各 EC2 インスタンスで Python ウェブサーバーを実行して HTTP リクエストを処理します。ウェブサーバーはレコメンデーションとヘルスチェックを返します。

  • Amazon DynamoDB テーブルを使用してレコメンデーションサービスをシミュレートできます。

  • AWS Systems Manager パラメータを更新して、リクエストやヘルスチェックに対するウェブサーバーの応答を制御できます。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

コマンドプロンプトからインタラクティブのシナリオを実行します。

class Runner: def __init__( self, resource_path, recommendation, autoscaler, loadbalancer, param_helper ): self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = "HTTP" self.port = 80 self.ssh_port = 22 def deploy(self): recommendations_path = f"{self.resource_path}/recommendations.json" startup_script = f"{self.resource_path}/server_startup_script.sh" instance_policy = f"{self.resource_path}/instance_policy.json" print( "\nFor this demo, we'll use the AWS SDK for Python (Boto3) to create several AWS resources\n" "to set up a load-balanced web service endpoint and explore some ways to make it resilient\n" "against various kinds of failures.\n\n" "Some of the resources create by this demo are:\n" ) print( "\t* A DynamoDB table that the web service depends on to provide book, movie, and song recommendations." ) print( "\t* An EC2 launch template that defines EC2 instances that each contain a Python web server." ) print( "\t* An EC2 Auto Scaling group that manages EC2 instances across several Availability Zones." ) print( "\t* An Elastic Load Balancing (ELB) load balancer that targets the Auto Scaling group to distribute requests." ) print("-" * 88) q.ask("Press Enter when you're ready to start deploying resources.") print( f"Creating and populating a DynamoDB table named '{self.recommendation.table_name}'." ) self.recommendation.create() self.recommendation.populate(recommendations_path) print("-" * 88) print( f"Creating an EC2 launch template that runs '{startup_script}' when an instance starts.\n" f"This script starts a Python web server defined in the `server.py` script. The web server\n" f"listens to HTTP requests on port 80 and responds to requests to '/' and to '/healthcheck'.\n" f"For demo purposes, this server is run as the root user. In production, the best practice is to\n" f"run a web server, such as Apache, with least-privileged credentials.\n" ) print( f"The template also defines an IAM policy that each instance uses to assume a role that grants\n" f"permissions to access the DynamoDB recommendation table and Systems Manager parameters\n" f"that control the flow of the demo.\n" ) self.autoscaler.create_template(startup_script, instance_policy) print("-" * 88) print( f"Creating an EC2 Auto Scaling group that maintains three EC2 instances, each in a different\n" f"Availability Zone." ) zones = self.autoscaler.create_group(3) print("-" * 88) print( "At this point, you have EC2 instances created. Once each instance starts, it listens for\n" "HTTP requests. You can see these instances in the console or continue with the demo." ) print("-" * 88) q.ask("Press Enter when you're ready to continue.") print(f"Creating variables that control the flow of the demo.\n") self.param_helper.reset() print( "\nCreating an Elastic Load Balancing target group and load balancer. The target group\n" "defines how the load balancer connects to instances. The load balancer provides a\n" "single endpoint where clients connect and dispatches requests to instances in the group.\n" ) vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc["VpcId"], zones) target_group = self.loadbalancer.create_target_group( self.protocol, self.port, vpc["VpcId"] ) self.loadbalancer.create_load_balancer( [subnet["SubnetId"] for subnet in subnets], target_group ) self.autoscaler.attach_load_balancer_target_group(target_group) print(f"Verifying access to the load balancer endpoint...") lb_success = self.loadbalancer.verify_load_balancer_endpoint() if not lb_success: print( "Couldn't connect to the load balancer, verifying that the port is open..." ) current_ip_address = requests.get( "http://checkip.amazonaws.com" ).text.strip() sec_group, port_is_open = self.autoscaler.verify_inbound_port( vpc, self.port, current_ip_address ) sec_group, ssh_port_is_open = self.autoscaler.verify_inbound_port( vpc, self.ssh_port, current_ip_address ) if not port_is_open: print( "For this example to work, the default security group for your default VPC must\n" "allows access from this computer. You can either add it automatically from this\n" "example or add it yourself using the AWS Management Console.\n" ) if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.port, current_ip_address ) if not ssh_port_is_open: if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound SSH traffic on port {self.ssh_port} for debugging from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.ssh_port, current_ip_address ) lb_success = self.loadbalancer.verify_load_balancer_endpoint() if lb_success: print("Your load balancer is ready. You can access it by browsing to:\n") print(f"\thttp://{self.loadbalancer.endpoint()}\n") else: print( "Couldn't get a successful response from the load balancer endpoint. Troubleshoot by\n" "manually verifying that your VPC and security group are configured correctly and that\n" "you can successfully make a GET request to the load balancer endpoint:\n" ) print(f"\thttp://{self.loadbalancer.endpoint()}\n") print("-" * 88) q.ask("Press Enter when you're ready to continue with the demo.") def demo_choices(self): actions = [ "Send a GET request to the load balancer endpoint.", "Check the health of load balancer targets.", "Go to the next part of the demo.", ] choice = 0 while choice != 2: print("-" * 88) print( "\nSee the current state of the service by selecting one of the following choices:\n" ) choice = q.choose("\nWhich action would you like to take? ", actions) print("-" * 88) if choice == 0: print("Request:\n") print(f"GET http://{self.loadbalancer.endpoint()}") response = requests.get(f"http://{self.loadbalancer.endpoint()}") print("\nResponse:\n") print(f"{response.status_code}") if response.headers.get("content-type") == "application/json": pp(response.json()) elif choice == 1: print("\nChecking the health of load balancer targets:\n") health = self.loadbalancer.check_target_health() for target in health: state = target["TargetHealth"]["State"] print( f"\tTarget {target['Target']['Id']} on port {target['Target']['Port']} is {state}" ) if state != "healthy": print( f"\t\t{target['TargetHealth']['Reason']}: {target['TargetHealth']['Description']}\n" ) print( f"\nNote that it can take a minute or two for the health check to update\n" f"after changes are made.\n" ) elif choice == 2: print("\nOkay, let's move on.") print("-" * 88) def demo(self): ssm_only_policy = f"{self.resource_path}/ssm_only_policy.json" print("\nResetting parameters to starting values for demo.\n") self.param_helper.reset() print( "\nThis part of the demonstration shows how to toggle different parts of the system\n" "to create situations where the web service fails, and shows how using a resilient\n" "architecture can keep the web service running in spite of these failures." ) print("-" * 88) print( "At the start, the load balancer endpoint returns recommendations and reports that all targets are healthy." ) self.demo_choices() print( f"The web service running on the EC2 instances gets recommendations by querying a DynamoDB table.\n" f"The table name is contained in a Systems Manager parameter named '{self.param_helper.table}'.\n" f"To simulate a failure of the recommendation service, let's set this parameter to name a non-existent table.\n" ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") print( "\nNow, sending a GET request to the load balancer endpoint returns a failure code. But, the service reports as\n" "healthy to the load balancer because shallow health checks don't check for failure of the recommendation service." ) self.demo_choices() print( f"Instead of failing when the recommendation service fails, the web service can return a static response.\n" f"While this is not a perfect solution, it presents the customer with a somewhat better experience than failure.\n" ) self.param_helper.put(self.param_helper.failure_response, "static") print( f"\nNow, sending a GET request to the load balancer endpoint returns a static response.\n" f"The service still reports as healthy because health checks are still shallow.\n" ) self.demo_choices() print("Let's reinstate the recommendation service.\n") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) print( "\nLet's also substitute bad credentials for one of the instances in the target group so that it can't\n" "access the DynamoDB recommendation table.\n" ) self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ["AmazonSSMManagedInstanceCore"], ) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) print( f"\nReplacing the profile for instance {bad_instance_id} with a profile that contains\n" f"bad credentials...\n" ) self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile["AssociationId"], ) print( "Now, sending a GET request to the load balancer endpoint returns either a recommendation or a static response,\n" "depending on which instance is selected by the load balancer.\n" ) self.demo_choices() print( "\nLet's implement a deep health check. For this demo, a deep health check tests whether\n" "the web service can access the DynamoDB table that it depends on for recommendations. Note that\n" "the deep health check is only for ELB routing and not for Auto Scaling instance health.\n" "This kind of deep health check is not recommended for Auto Scaling instance health, because it\n" "risks accidental termination of all instances in the Auto Scaling group when a dependent service fails.\n" ) print( "By implementing deep health checks, the load balancer can detect when one of the instances is failing\n" "and take that instance out of rotation.\n" ) self.param_helper.put(self.param_helper.health_check, "deep") print( f"\nNow, checking target health indicates that the instance with bad credentials ({bad_instance_id})\n" f"is unhealthy. Note that it might take a minute or two for the load balancer to detect the unhealthy \n" f"instance. Sending a GET request to the load balancer endpoint always returns a recommendation, because\n" "the load balancer takes unhealthy instances out of its rotation.\n" ) self.demo_choices() print( "\nBecause the instances in this demo are controlled by an auto scaler, the simplest way to fix an unhealthy\n" "instance is to terminate it and let the auto scaler start a new instance to replace it.\n" ) self.autoscaler.terminate_instance(bad_instance_id) print( "\nEven while the instance is terminating and the new instance is starting, sending a GET\n" "request to the web service continues to get a successful recommendation response because\n" "the load balancer routes requests to the healthy instances. After the replacement instance\n" "starts and reports as healthy, it is included in the load balancing rotation.\n" "\nNote that terminating and replacing an instance typically takes several minutes, during which time you\n" "can see the changing health check status until the new instance is running and healthy.\n" ) self.demo_choices() print( "\nIf the recommendation service fails now, deep health checks mean all instances report as unhealthy.\n" ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") print( "\nWhen all instances are unhealthy, the load balancer continues to route requests even to\n" "unhealthy instances, allowing them to fail open and return a static response rather than fail\n" "closed and report failure to the customer." ) self.demo_choices() self.param_helper.reset() def destroy(self): print( "This concludes the demo of how to build and manage a resilient service.\n" "To keep things tidy and to avoid unwanted charges on your account, we can clean up all AWS resources\n" "that were created for this demo." ) if q.ask("Do you want to clean up all demo resources? (y/n) ", q.is_yesno): self.loadbalancer.delete_load_balancer() self.loadbalancer.delete_target_group() self.autoscaler.delete_group() self.autoscaler.delete_key_pair() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name, ) self.recommendation.destroy() else: print( "Okay, we'll leave the resources intact.\n" "Don't forget to delete them when you're done with them or you might incur unexpected charges." ) def main(): parser = argparse.ArgumentParser() parser.add_argument( "--action", required=True, choices=["all", "deploy", "demo", "destroy"], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.", ) parser.add_argument( "--resource_path", default="../../../workflows/resilient_service/resources", help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.", ) args = parser.parse_args() print("-" * 88) print( "Welcome to the demonstration of How to Build and Manage a Resilient Service!" ) print("-" * 88) prefix = "doc-example-resilience" recommendation = RecommendationService.from_client( "doc-example-recommendation-service" ) autoscaler = AutoScaler.from_client(prefix) loadbalancer = LoadBalancer.from_client(prefix) param_helper = ParameterHelper.from_client(recommendation.table_name) runner = Runner( args.resource_path, recommendation, autoscaler, loadbalancer, param_helper ) actions = [args.action] if args.action != "all" else ["deploy", "demo", "destroy"] for action in actions: if action == "deploy": runner.deploy() elif action == "demo": runner.demo() elif action == "destroy": runner.destroy() print("-" * 88) print("Thanks for watching!") print("-" * 88) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") main()

Auto Scaling と Amazon EC2 のアクションをラップするクラスを作成します。

class AutoScaler: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix, inst_type, ami_param, autoscaling_client, ec2_client, ssm_client, iam_client, ): """ :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client self.launch_template_name = f"{resource_prefix}-template" self.group_name = f"{resource_prefix}-group" self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" self.key_pair_name = f"{resource_prefix}-key-pair" @classmethod def from_client(cls, resource_prefix): """ Creates this class from Boto3 clients. :param resource_prefix: The prefix for naming AWS resources that are created by this class. """ as_client = boto3.client("autoscaling") ec2_client = boto3.client("ec2") ssm_client = boto3.client("ssm") iam_client = boto3.client("iam") return cls( resource_prefix, "t3.micro", "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2", as_client, ec2_client, ssm_client, iam_client, ) def create_instance_profile( self, policy_file, policy_name, role_name, profile_name, aws_managed_policies=() ): """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } with open(policy_file) as file: instance_policy_doc = file.read() policy_arn = None try: pol_response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=instance_policy_doc ) policy_arn = pol_response["Policy"]["Arn"] log.info("Created policy with ARN %s.", policy_arn) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": log.info("Policy %s already exists, nothing to do.", policy_name) list_pol_response = self.iam_client.list_policies(Scope="Local") for pol in list_pol_response["Policies"]: if pol["PolicyName"] == policy_name: policy_arn = pol["Arn"] break if policy_arn is None: raise AutoScalerError(f"Couldn't create policy {policy_name}: {err}") try: self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc) ) self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f"arn:aws:iam::aws:policy/{aws_policy}", ) log.info("Created role %s and attached policy %s.", role_name, policy_arn) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": log.info("Role %s already exists, nothing to do.", role_name) else: raise AutoScalerError(f"Couldn't create role {role_name}: {err}") try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) else: raise AutoScalerError( f"Couldn't create profile {profile_name} and attach it to role\n" f"{role_name}: {err}" ) return profile_arn def get_instance_profile(self, instance_id): """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{"Name": "instance-id", "Values": [instance_id]}] ) except ClientError as err: raise AutoScalerError( f"Couldn't get instance profile association for instance {instance_id}: {err}" ) else: return response["IamInstanceProfileAssociations"][0] def replace_instance_profile( self, instance_id, new_instance_profile_name, profile_association_id ): """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to update. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={"Name": new_instance_profile_name}, AssociationId=profile_association_id, ) log.info( "Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name, ) time.sleep(5) inst_ready = False tries = 0 while not inst_ready: if tries % 6 == 0: self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info( "Rebooting instance %s and waiting for it to to be ready.", instance_id, ) tries += 1 time.sleep(10) response = self.ssm_client.describe_instance_information() for info in response["InstanceInformationList"]: if info["InstanceId"] == instance_id: inst_ready = True self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={"commands": ["cd / && sudo python3 server.py 80"]}, ) log.info("Restarted the Python web server on instance %s.", instance_id) except ClientError as err: raise AutoScalerError( f"Couldn't replace instance profile for association {profile_association_id}: {err}" ) def delete_instance_profile(self, profile_name, role_name): """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name ) else: raise AutoScalerError( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) def create_key_pair(self, key_pair_name): """ Creates a new key pair. :param key_pair_name: The name of the key pair to create. :return: The newly created key pair. """ try: response = self.ec2_client.create_key_pair(KeyName=key_pair_name) with open(f"{key_pair_name}.pem", "w") as file: file.write(response["KeyMaterial"]) chmod(f"{key_pair_name}.pem", 0o600) log.info("Created key pair %s.", key_pair_name) except ClientError as err: raise AutoScalerError(f"Couldn't create key pair {key_pair_name}: {err}") def delete_key_pair(self): """ Deletes a key pair. :param key_pair_name: The name of the key pair to delete. """ try: self.ec2_client.delete_key_pair(KeyName=self.key_pair_name) remove(f"{self.key_pair_name}.pem") log.info("Deleted key pair %s.", self.key_pair_name) except ClientError as err: raise AutoScalerError( f"Couldn't delete key pair {self.key_pair_name}: {err}" ) except FileNotFoundError: log.info("Key pair %s doesn't exist, nothing to do.", self.key_pair_name) except PermissionError: log.info( "Inadequate permissions to delete key pair %s.", self.key_pair_name ) except Exception as err: raise AutoScalerError( f"Couldn't delete key pair {self.key_pair_name}: {err}" ) def create_template(self, server_startup_script_file, instance_policy_file): """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: self.create_key_pair(self.key_pair_name) self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name, ) with open(server_startup_script_file) as file: start_server_script = file.read() ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest["Parameter"]["Value"] lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ "InstanceType": self.inst_type, "ImageId": ami_id, "IamInstanceProfile": {"Name": self.instance_profile_name}, "UserData": base64.b64encode( start_server_script.encode(encoding="utf-8") ).decode(encoding="utf-8"), "KeyName": self.key_pair_name, }, ) template = lt_response["LaunchTemplate"] log.info( "Created launch template %s for AMI %s on %s.", self.launch_template_name, ami_id, self.inst_type, ) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.AlreadyExistsException" ): log.info( "Launch template %s already exists, nothing to do.", self.launch_template_name, ) else: raise AutoScalerError( f"Couldn't create launch template {self.launch_template_name}: {err}." ) return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template( LaunchTemplateName=self.launch_template_name ) self.delete_instance_profile( self.instance_profile_name, self.instance_role_name ) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): log.info( "Launch template %s does not exist, nothing to do.", self.launch_template_name, ) else: raise AutoScalerError( f"Couldn't delete launch template {self.launch_template_name}: {err}." ) def get_availability_zones(self): """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] except ClientError as err: raise AutoScalerError(f"Couldn't get availability zones: {err}.") else: return zones def create_group(self, group_size): """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ zones = [] try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ "LaunchTemplateName": self.launch_template_name, "Version": "$Default", }, MinSize=group_size, MaxSize=group_size, ) log.info( "Created EC2 Auto Scaling group %s with availability zones %s.", self.launch_template_name, zones, ) except ClientError as err: if err.response["Error"]["Code"] == "AlreadyExists": log.info( "EC2 Auto Scaling group %s already exists, nothing to do.", self.group_name, ) else: raise AutoScalerError( f"Couldn't create EC2 Auto Scaling group {self.group_name}: {err}" ) return zones def get_instances(self): """ Gets data about the instances in the EC2 Auto Scaling group. :return: Data about the instances. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) instance_ids = [ i["InstanceId"] for i in as_response["AutoScalingGroups"][0]["Instances"] ] except ClientError as err: raise AutoScalerError( f"Couldn't get instances for Auto Scaling group {self.group_name}: {err}" ) else: return instance_ids def terminate_instance(self, instance_id): """ Terminates and instances in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=False ) log.info("Terminated instance %s.", instance_id) except ClientError as err: raise AutoScalerError(f"Couldn't terminate instance {instance_id}: {err}") def attach_load_balancer_target_group(self, lb_target_group): """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forward requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: raise AutoScalerError( f"Couldn't attach load balancer target group {lb_target_group['TargetGroupName']}\n" f"to auto scaling group {self.group_name}" ) def _try_terminate_instance(self, inst_id): stopping = False log.info(f"Stopping {inst_id}.") while not stopping: try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=inst_id, ShouldDecrementDesiredCapacity=True ) stopping = True except ClientError as err: if err.response["Error"]["Code"] == "ScalingActivityInProgress": log.info("Scaling activity in progress for %s. Waiting...", inst_id) time.sleep(10) else: raise AutoScalerError(f"Couldn't stop instance {inst_id}: {err}.") def _try_delete_group(self): """ Tries to delete the EC2 Auto Scaling group. If the group is in use or in progress, the function waits and retries until the group is successfully deleted. """ stopped = False while not stopped: try: self.autoscaling_client.delete_auto_scaling_group( AutoScalingGroupName=self.group_name ) stopped = True log.info("Deleted EC2 Auto Scaling group %s.", self.group_name) except ClientError as err: if ( err.response["Error"]["Code"] == "ResourceInUse" or err.response["Error"]["Code"] == "ScalingActivityInProgress" ): log.info( "Some instances are still running. Waiting for them to stop..." ) time.sleep(10) else: raise AutoScalerError( f"Couldn't delete group {self.group_name}: {err}." ) def delete_group(self): """ Terminates all instances in the group, deletes the EC2 Auto Scaling group. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=self.group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self._try_terminate_instance(inst_id) self._try_delete_group() else: log.info("No groups found named %s, nothing to do.", self.group_name) except ClientError as err: raise AutoScalerError(f"Couldn't delete group {self.group_name}: {err}.") def get_default_vpc(self): """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs( Filters=[{"Name": "is-default", "Values": ["true"]}] ) except ClientError as err: raise AutoScalerError(f"Couldn't get default VPC: {err}") else: return response["Vpcs"][0] def verify_inbound_port(self, vpc, port, ip_address): """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specific VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {"Name": "group-name", "Values": ["default"]}, {"Name": "vpc-id", "Values": [vpc["VpcId"]]}, ] ) sec_group = response["SecurityGroups"][0] port_is_open = False log.info("Found default security group %s.", sec_group["GroupId"]) for ip_perm in sec_group["IpPermissions"]: if ip_perm.get("FromPort", 0) == port: log.info("Found inbound rule: %s", ip_perm) for ip_range in ip_perm["IpRanges"]: cidr = ip_range.get("CidrIp", "") if cidr.startswith(ip_address) or cidr == "0.0.0.0/0": port_is_open = True if ip_perm["PrefixListIds"]: port_is_open = True if not port_is_open: log.info( "The inbound rule does not appear to be open to either this computer's IP\n" "address of %s, to all IP addresses (0.0.0.0/0), or to a prefix list ID.", ip_address, ) else: break except ClientError as err: raise AutoScalerError( f"Couldn't verify inbound rule for port {port} for VPC {vpc['VpcId']}: {err}" ) else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id, port, ip_address): """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f"{ip_address}/32", FromPort=port, ToPort=port, IpProtocol="tcp", ) log.info( "Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address, ) except ClientError as err: raise AutoScalerError( f"Couldn't authorize ingress to {sec_group_id} on port {port} from {ip_address}: {err}" ) def get_subnets(self, vpc_id, zones): """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ try: response = self.ec2_client.describe_subnets( Filters=[ {"Name": "vpc-id", "Values": [vpc_id]}, {"Name": "availability-zone", "Values": zones}, {"Name": "default-for-az", "Values": ["true"]}, ] ) subnets = response["Subnets"] log.info("Found %s subnets for the specified zones.", len(subnets)) except ClientError as err: raise AutoScalerError(f"Couldn't get subnets: {err}") else: return subnets

Elastic Load Balancing のアクションをラップするクラスを作成します。

class LoadBalancer: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, target_group_name, load_balancer_name, elb_client): """ :param target_group_name: The name of the target group associated with the load balancer. :param load_balancer_name: The name of the load balancer. :param elb_client: A Boto3 Elastic Load Balancing client. """ self.target_group_name = target_group_name self.load_balancer_name = load_balancer_name self.elb_client = elb_client self._endpoint = None @classmethod def from_client(cls, resource_prefix): """ Creates this class from a Boto3 client. :param resource_prefix: The prefix to give to AWS resources created by this class. """ elb_client = boto3.client("elbv2") return cls(f"{resource_prefix}-tg", f"{resource_prefix}-lb", elb_client) def endpoint(self): """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ if self._endpoint is None: try: response = self.elb_client.describe_load_balancers( Names=[self.load_balancer_name] ) self._endpoint = response["LoadBalancers"][0]["DNSName"] except ClientError as err: raise LoadBalancerError( f"Couldn't get the endpoint for load balancer {self.load_balancer_name}: {err}" ) return self._endpoint def create_target_group(self, protocol, port, vpc_id): """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forward requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=self.target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info("Created load balancing target group %s.", self.target_group_name) except ClientError as err: raise LoadBalancerError( f"Couldn't create load balancing target group {self.target_group_name}: {err}" ) else: return target_group def delete_target_group(self): """ Deletes the target group. """ done = False while not done: try: response = self.elb_client.describe_target_groups( Names=[self.target_group_name] ) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info( "Deleted load balancing target group %s.", self.target_group_name ) done = True except ClientError as err: if err.response["Error"]["Code"] == "TargetGroupNotFound": log.info( "Load balancer target group %s not found, nothing to do.", self.target_group_name, ) done = True elif err.response["Error"]["Code"] == "ResourceInUse": log.info( "Target group not yet released from load balancer, waiting..." ) time.sleep(10) else: raise LoadBalancerError( f"Couldn't delete load balancing target group {self.target_group_name}: {err}" ) def create_load_balancer(self, subnet_ids, target_group): """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param subnet_ids: A list of subnets to associate with the load balancer. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=self.load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info("Created load balancer %s.", self.load_balancer_name) waiter = self.elb_client.get_waiter("load_balancer_available") log.info("Waiting for load balancer to be available...") waiter.wait(Names=[self.load_balancer_name]) log.info("Load balancer is available!") self.elb_client.create_listener( LoadBalancerArn=load_balancer["LoadBalancerArn"], Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( "Created listener to forward traffic from load balancer %s to target group %s.", self.load_balancer_name, target_group["TargetGroupName"], ) except ClientError as err: raise LoadBalancerError( f"Failed to create load balancer {self.load_balancer_name}" f"and add a listener for target group {target_group['TargetGroupName']}: {err}" ) else: self._endpoint = load_balancer["DNSName"] return load_balancer def delete_load_balancer(self): """ Deletes a load balancer. """ try: response = self.elb_client.describe_load_balancers( Names=[self.load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", self.load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[self.load_balancer_name]) except ClientError as err: if err.response["Error"]["Code"] == "LoadBalancerNotFound": log.info( "Load balancer %s does not exist, nothing to do.", self.load_balancer_name, ) else: raise LoadBalancerError( f"Couldn't delete load balancer {self.load_balancer_name}: {err}" ) def verify_load_balancer_endpoint(self): """ Verify this computer can successfully send a GET request to the load balancer endpoint. """ success = False retries = 3 while not success and retries > 0: try: lb_response = requests.get(f"http://{self.endpoint()}") log.info( "Got response %s from load balancer endpoint.", lb_response.status_code, ) if lb_response.status_code == 200: success = True else: retries = 0 except requests.exceptions.ConnectionError: log.info( "Got connection error from load balancer endpoint, retrying..." ) retries -= 1 time.sleep(10) return success def check_target_health(self): """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[self.target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: raise LoadBalancerError( f"Couldn't check health of {self.target_group_name} targets: {err}" ) else: return health_response["TargetHealthDescriptions"]

DynamoDB を使用してレコメンデーションサービスをシミュレートするクラスを作成します。

class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name, dynamodb_client): """ :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client @classmethod def from_client(cls, table_name): """ Creates this class from a Boto3 client. :param table_name: The name of the DynamoDB recommendations table. """ ddb_client = boto3.client("dynamodb") return cls(table_name, ddb_client) def create(self): """ Creates a DynamoDB table to use a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[ {"AttributeName": "MediaType", "AttributeType": "S"}, {"AttributeName": "ItemId", "AttributeType": "N"}, ], KeySchema=[ {"AttributeName": "MediaType", "KeyType": "HASH"}, {"AttributeName": "ItemId", "KeyType": "RANGE"}, ], ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, ) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_exists") waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceInUseException": log.info("Table %s exists, nothing to be do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}." ) else: return response def populate(self, data_file): """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. """ try: with open(data_file) as data: items = json.load(data) batch = [{"PutRequest": {"Item": item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info( "Populated table %s with items from %s.", self.table_name, data_file ) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}" ) def destroy(self): """ Deletes the recommendations table. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_not_exists") waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}." )

Systems Manager のアクションをラップするクラスを作成します。

class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table = "doc-example-resilient-architecture-table" failure_response = "doc-example-resilient-architecture-failure-response" health_check = "doc-example-resilient-architecture-health-check" def __init__(self, table_name, ssm_client): """ :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name @classmethod def from_client(cls, table_name): ssm_client = boto3.client("ssm") return cls(table_name, ssm_client) def reset(self): """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, "none") self.put(self.health_check, "shallow") def put(self, name, value): """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. """ try: self.ssm_client.put_parameter( Name=name, Value=value, Overwrite=True, Type="String" ) log.info("Setting demo parameter %s to '%s'.", name, value) except ClientError as err: raise ParameterHelperError( f"Couldn't set parameter {name} to {value}: {err}" )

次のコードサンプルは、以下の操作方法を示しています。

  • 起動テンプレートとアベイラビリティーゾーンを使用して、Amazon EC2 Auto Scaling グループを作成し、実行中のインスタンスに関する情報を取得します。

  • Amazon CloudWatch メトリクスの収集を有効にします。

  • グループの希望するキャパシティを更新し、インスタンスが起動するのを待ちます。

  • グループのインスタンスを終了します。

  • ユーザーのリクエストやキャパシティの変更に応じて発生するスケーリングアクティビティを一覧表示します。

  • CloudWatch メトリクスの統計を取得し、リソースをクリーンアップします。

SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

コマンドプロンプトからインタラクティブのシナリオを実行します。

def run_scenario(as_wrapper, svc_helper): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print( "Welcome to the Amazon EC2 Auto Scaling demo for managing groups and instances." ) print("-" * 88) print( "This example requires a launch template that specifies how to create\n" "EC2 instances. You can use an existing template or create a new one." ) template_name = q.ask( "Enter the name of an existing launch template or press Enter to create a new one: " ) template = None if template_name: template = svc_helper.get_template(template_name) if template is None: inst_type = "t1.micro" ami_id = "ami-0ca285d4c2cda3300" print("Let's create a launch template with the following specifications:") print(f"\tInstanceType: {inst_type}") print(f"\tAMI ID: {ami_id}") template_name = q.ask("Enter a name for the template: ", q.non_empty) template = svc_helper.create_template(template_name, inst_type, ami_id) print("-" * 88) print("Let's create an Auto Scaling group.") group_name = q.ask("Enter a name for the group: ", q.non_empty) zones = svc_helper.get_availability_zones() print("EC2 instances can be created in the following Availability Zones:") for index, zone in enumerate(zones): print(f"\t{index+1}. {zone}") print(f"\t{len(zones)+1}. All zones") zone_sel = q.ask( "Which zone do you want to use? ", q.is_int, q.in_range(1, len(zones) + 1) ) group_zones = [zones[zone_sel - 1]] if zone_sel <= len(zones) else zones print(f"Creating group {group_name}...") as_wrapper.create_group(group_name, group_zones, template_name, 1, 1) wait(10) group = as_wrapper.describe_group(group_name) print("Created group:") pp(group) print("Waiting for instance to start...") wait_for_group(group_name, as_wrapper) print("-" * 88) use_metrics = q.ask( "Do you want to collect metrics about Amazon EC2 Auto Scaling during this demo (y/n)? ", q.is_yesno, ) if use_metrics: as_wrapper.enable_metrics( group_name, [ "GroupMinSize", "GroupMaxSize", "GroupDesiredCapacity", "GroupInServiceInstances", "GroupTotalInstances", ], ) print(f"Metrics enabled for {group_name}.") print("-" * 88) print(f"Let's update the maximum number of instances in {group_name} from 1 to 3.") q.ask("Press Enter when you're ready.") as_wrapper.update_group(group_name, MaxSize=3) group = as_wrapper.describe_group(group_name) print("The group still has one running instance, but can have up to three:") print_simplified_group(group) print("-" * 88) print(f"Let's update the desired capacity of {group_name} from 1 to 2.") q.ask("Press Enter when you're ready.") as_wrapper.set_desired_capacity(group_name, 2) wait(10) group = as_wrapper.describe_group(group_name) print("Here's the current state of the group:") print_simplified_group(group) print("-" * 88) print("Waiting for the new instance to start...") instance_ids = wait_for_group(group_name, as_wrapper) print("-" * 88) print(f"Let's terminate one of the instances in {group_name}.") print("Because the desired capacity is 2, another instance will start.") print("The currently running instances are:") for index, inst_id in enumerate(instance_ids): print(f"\t{index+1}. {inst_id}") inst_sel = q.ask( "Which instance do you want to stop? ", q.is_int, q.in_range(1, len(instance_ids) + 1), ) print(f"Stopping {instance_ids[inst_sel-1]}...") as_wrapper.terminate_instance(instance_ids[inst_sel - 1], False) wait(10) group = as_wrapper.describe_group(group_name) print(f"Here's the state of {group_name}:") print_simplified_group(group) print("Waiting for the scaling activities to complete...") wait_for_group(group_name, as_wrapper) print("-" * 88) print(f"Let's get a report of scaling activities for {group_name}.") q.ask("Press Enter when you're ready.") activities = as_wrapper.describe_scaling_activities(group_name) print( f"Found {len(activities)} activities.\n" f"Activities are ordered with the most recent one first:" ) for act in activities: pp(act) print("-" * 88) if use_metrics: print("Let's look at CloudWatch metrics.") metric_namespace = "AWS/AutoScaling" metric_dimensions = [{"Name": "AutoScalingGroupName", "Value": group_name}] print(f"The following metrics are enabled for {group_name}:") done = False while not done: metrics = svc_helper.get_metrics(metric_namespace, metric_dimensions) for index, metric in enumerate(metrics): print(f"\t{index+1}. {metric.name}") print(f"\t{len(metrics)+1}. None") metric_sel = q.ask( "Which metric do you want to see? ", q.is_int, q.in_range(1, len(metrics) + 1), ) if metric_sel < len(metrics) + 1: span = 5 metric = metrics[metric_sel - 1] print(f"Over the last {span} minutes, {metric.name} recorded:") # CloudWatch metric times are in the UTC+0 time zone. now = datetime.now(timezone.utc) metric_data = svc_helper.get_metric_statistics( metric_dimensions, metric, now - timedelta(minutes=span), now ) pp(metric_data) if not q.ask("Do you want to see another metric (y/n)? ", q.is_yesno): done = True else: done = True print(f"Let's clean up.") q.ask("Press Enter when you're ready.") if use_metrics: print(f"Stopping metrics collection for {group_name}.") as_wrapper.disable_metrics(group_name) print( "You must terminate all instances in the group before you can delete the group." ) print("Set minimum size to 0.") as_wrapper.update_group(group_name, MinSize=0) group = as_wrapper.describe_group(group_name) instance_ids = [inst["InstanceId"] for inst in group["Instances"]] for inst_id in instance_ids: print(f"Stopping {inst_id}.") as_wrapper.terminate_instance(inst_id, True) print("Waiting for instances to stop...") wait_for_instances(instance_ids, as_wrapper) print(f"Deleting {group_name}.") as_wrapper.delete_group(group_name) print("-" * 88) if template is not None: if q.ask( f"Do you want to delete launch template {template_name} used in this demo (y/n)? " ): svc_helper.delete_template(template_name) print("Template deleted.") print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: wrapper = AutoScalingWrapper(boto3.client("autoscaling")) helper = ServiceHelper(boto3.client("ec2"), boto3.resource("cloudwatch")) run_scenario(wrapper, helper) except Exception: logging.exception("Something went wrong with the demo!")

起動テンプレートとメトリクスを管理するためにシナリオによって呼び出される関数を定義します。これらの関数は Amazon EC2 および CloudWatch アクションをラップします。

class ServiceHelper: """Encapsulates Amazon EC2 and CloudWatch actions for the example.""" def __init__(self, ec2_client, cloudwatch_resource): """ :param ec2_client: A Boto3 Amazon EC2 client. :param cloudwatch_resource: A Boto3 CloudWatch resource. """ self.ec2_client = ec2_client self.cloudwatch_resource = cloudwatch_resource def get_template(self, template_name): """ Gets a launch template. Launch templates specify configuration for instances that are launched by Amazon EC2 Auto Scaling. :param template_name: The name of the template to look up. :return: The template, if it exists. """ try: response = self.ec2_client.describe_launch_templates( LaunchTemplateNames=[template_name] ) template = response["LaunchTemplates"][0] except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): logger.warning("Launch template %s does not exist.", template_name) else: logger.error( "Couldn't verify launch template %s. Here's why: %s: %s", template_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return template def create_template(self, template_name, inst_type, ami_id): """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. :param template_name: The name to give to the template. :param inst_type: The type of the instance, such as t1.micro. :param ami_id: The ID of the Amazon Machine Image (AMI) to use when creating an instance. :return: Information about the newly created template. """ try: response = self.ec2_client.create_launch_template( LaunchTemplateName=template_name, LaunchTemplateData={"InstanceType": inst_type, "ImageId": ami_id}, ) template = response["LaunchTemplate"] except ClientError as err: logger.error( "Couldn't create launch template %s. Here's why: %s: %s", template_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return template def delete_template(self, template_name): """ Deletes a launch template. :param template_name: The name of the template to delete. """ try: self.ec2_client.delete_launch_template(LaunchTemplateName=template_name) except ClientError as err: logger.error( "Couldn't delete launch template %s. Here's why: %s: %s", template_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_availability_zones(self): """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] except ClientError as err: logger.error( "Couldn't get availability zones. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return zones def get_metrics(self, namespace, dimensions): """ Gets a list of CloudWatch metrics filtered by namespace and dimensions. :param namespace: The namespace of the metrics to look up. :param dimensions: The dimensions of the metrics to look up. :return: The list of metrics. """ try: metrics = list( self.cloudwatch_resource.metrics.filter( Namespace=namespace, Dimensions=dimensions ) ) except ClientError as err: logger.error( "Couldn't get metrics for %s, %s. Here's why: %s: %s", namespace, dimensions, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return metrics @staticmethod def get_metric_statistics(dimensions, metric, start, end): """ Gets statistics for a CloudWatch metric within a specified time span. :param dimensions: The dimensions of the metric. :param metric: The metric to look up. :param start: The start of the time span for retrieved metrics. :param end: The end of the time span for retrieved metrics. :return: The list of data points found for the specified metric. """ try: response = metric.get_statistics( Dimensions=dimensions, StartTime=start, EndTime=end, Period=60, Statistics=["Sum"], ) data = response["Datapoints"] except ClientError as err: logger.error( "Couldn't get statistics for metric %s. Here's why: %s: %s", metric.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return data def print_simplified_group(group): """ Prints a subset of data for an Auto Scaling group. """ print(group["AutoScalingGroupName"]) print(f"\tLaunch template: {group['LaunchTemplate']['LaunchTemplateName']}") print( f"\tMin: {group['MinSize']}, Max: {group['MaxSize']}, Desired: {group['DesiredCapacity']}" ) if group["Instances"]: print(f"\tInstances:") for inst in group["Instances"]: print(f"\t\t{inst['InstanceId']}: {inst['LifecycleState']}") def wait_for_group(group_name, as_wrapper): """ Waits for instances to start or stop in an Auto Scaling group. Prints the data for each instance after scaling activities are complete. """ group = as_wrapper.describe_group(group_name) instance_ids = [i["InstanceId"] for i in group["Instances"]] return wait_for_instances(instance_ids, as_wrapper) def wait_for_instances(instance_ids, as_wrapper): """ Waits for instances to start or stop in an Auto Scaling group. Prints the data for each instance after scaling activities are complete. """ ready = False instances = [] while not ready: instances = as_wrapper.describe_instances(instance_ids) if instance_ids else [] if all([x["LifecycleState"] in ["Terminated", "InService"] for x in instances]): ready = True else: wait(10) if instances: print( f"Here are the details of the instance{'s' if len(instances) > 1 else ''}:" ) for instance in instances: pp(instance) return instance_ids