附录 B – 卡方计算示例 - 高级多可用区弹性模式

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

附录 B – 卡方计算示例

以下是收集错误指标并对数据执行卡方检验的示例。该代码尚未实现生产就绪,也不会执行必要的错误处理,但提供了逻辑工作原理的概念验证。您应该更新此示例以满足您的需求。

首先,Amazon EventBridge 计划的事件每分钟调用一次 Lambda 函数。该事件的内容配置有以下数据:

{ "timestamp": "2023-03-15T15:26:37.527Z", "namespace": "multi-az/frontend", "metricName": "5xx", "dimensions": [ { "Name": "Region", "Value": "us-east-1" }, { "Name": "Controller", "Value": "Home" }, { "Name": "Action", "Value": "Index" } ], "period": 60, "stat": "Sum", "unit": "Count", "chiSquareMetricName": "multi-az/chi-squared", "azs": [ "use1-az2", "use1-az4", "use1-az6" ] }

这些数据用于指定检索相应的 CloudWatch 指标(如命名空间、指标名称和维度)所需的常见数据,然后发布每个可用区的卡方检验结果。使用 Python 3.9,Lambda 函数中的代码如下所示。简而言之,它收集前一分钟指定的 CloudWatch 指标,对该数据运行卡方检验,然后发布有关每个指定可用区的检验结果的 CloudWatch 指标。

import os import boto3 import datetime import copy import json from datetime import timedelta from scipy.stats import chisquare from aws_embedded_metrics import metric_scope cw_client = boto3.client("cloudwatch", os.environ.get("AWS_REGION", "us-east-1")) @metric_scope def handler(event, context, metrics): metrics.set_property("Event", json.loads(json.dumps(event, default = str))) time = datetime.datetime.strptime(event["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") # Round down to the previous minute end: datetime = roundTime(time) # Subtract a minute for the start start: datetime = end - timedelta(minutes = 1) # Get all the metrics that match the query results = get_all_metrics(event, start, end, metrics) metrics.set_property("MetricCounts", results) # Calculate the chi squared result chi_sq_result = chisquare(list(results.values())) expected = sum(list(results.values())) / len(results.values()) metrics.set_property("ChiSquaredResult", chi_sq_result) # Put the chi square metrics into CloudWatch put_all_metrics(event, results, chi_sq_result[1], expected, start, metrics) def get_all_metrics(detail: dict, start: datetime, end: datetime, metrics): """ Gets all of the error metrics for each AZ specified """ metric_query = { "MetricDataQueries": [ ], "StartTime": start, "EndTime": end } for az in detail["azs"]: dim = copy.deepcopy(detail["dimensions"]) dim.append({"Name": "AZ-ID", "Value": az}) query = { "Id": az.replace("-", "_"), "MetricStat": { "Metric": { "Namespace": detail["namespace"], "MetricName": detail["metricName"], "Dimensions": dim }, "Period": int(detail["period"]), "Stat": detail["stat"], "Unit": detail["unit"] }, "Label": az, "ReturnData": True } metric_query["MetricDataQueries"].append(query) metrics.set_property("GetMetricRequest", json.loads(json.dumps(metric_query, default=str))) next_token: str = None results = {} while True: if next_token is not None: metric_query["NextToken"] = next_token data = cw_client.get_metric_data(**metric_query) if next_token is not None: metrics.set_property("GetMetricResult::" + next_token, json.loads(json.dumps(data, default = str))) else: metrics.set_property("GetMetricResult", json.loads(json.dumps(data, default = str))) for item in data["MetricDataResults"]: key = item["Id"].replace("_", "-") if key not in results: results[key] = 0 results[key] += sum(item["Values"]) if "NextToken" in data: next_token = data["NextToken"] if next_token is None: break return results def put_all_metrics(detail: dict, results: dict, chi_sq_value: float, expected: float, timestamp: datetime, metrics): """ Adds the chi squared metric for all AZs to CloudWatch """ farthest_from_expected = None if len(results) > 0: keys = list(results.keys()) farthest_from_expected = keys[0] for key in keys: if abs(results[key] - expected) > abs(results[farthest_from_expected] - expected): farthest_from_expected = key metric_query = { "Namespace": detail["namespace"], "MetricData": [] } for az in detail["azs"]: dim = copy.deepcopy(detail["dimensions"]) dim.append({"Name": "AZ-ID", "Value": az}) query = { "MetricName": detail["chiSquareMetricName"], "Dimensions": dim, "Timestamp": timestamp, } if chi_sq_value <= 0.05 and az == farthest_from_expected: query["Value"] = 1 else: query["Value"] = 0 metric_query["MetricData"].append(query) metrics.set_property("PutMetricRequest", json.loads(json.dumps(metric_query, default = str))) cw_client.put_metric_data(**metric_query) def roundTime(dt=None, roundTo=60): """Round a datetime object to any time lapse in seconds dt : datetime.datetime object, default now. roundTo : Closest number of seconds to round to, default 1 minute. """ if dt == None : dt = datetime.datetime.now() seconds = (dt.replace(tzinfo=None) - dt.min).seconds rounding = (seconds+roundTo/2) // roundTo * roundTo return dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)

然后,您可以为每个可用区创建警报。以下示例针对 use1-az2,介绍三个连续一分钟数据点的警报,这些数据点的最大值等于 1(1 是卡方检验确定错误率存在统计学显著偏差时发布的指标)。

{ "Type": "AWS::CloudWatch::Alarm", "Properties": { "AlarmName": "use1-az2-chi-squared", "ActionsEnabled": true, "OKActions": [], "AlarmActions": [], "InsufficientDataActions": [], "MetricName": "multi-az/chi-squared", "Namespace": "multi-az/frontend", "Statistic": "Maximum", "Dimensions": [ { "Name": "AZ-ID", "Value": "use1-az2" }, { "Name": "Action", "Value": "Index" }, { "Name": "Region", "Value": "us-east-1" }, { "Name": "Controller", "Value": "Home" } ], "Period": 60, "EvaluationPeriods": 3, "DatapointsToAlarm": 3, "Threshold": 1, "ComparisonOperator": "GreaterThanOrEqualToThreshold", "TreatMissingData": "missing" } }

您还可以创建 M(最大为 N)警报,并将这两个警报与复合警报合并在一起。您还需要为每个可用区中的每个控制器/操作组合或微服务创建相同的警报。最后,您可以将卡方复合警报添加到每个控制器/操作组合的可用区特定警报中,如 使用异常值检测进行故障检测 所示。