1、 检测通过局部相关跟踪方法测量的异常,不同方法对应不同的阈值。
1 def detect_anomaly_lcs(self, lcs_scores): 2 """ 3 It detects the anomalies which are measured by local correlation tracking method. 4 - gauss: threshold = 0.0 + self.sigma * std 5 - threshold: the given threshold variable 6 - proportion: threshold = sort_scores[threshold_index] 7 :param lcs_scores: list<float> | the list of local correlation scores 8 :return: 9 """ 10 if self.rule == "gauss": 11 mean = 0.0 12 std = np.std(lcs_scores) 13 threshold = mean + self.sigma * std 14 change_labels = [] 15 for lcs in range(len(lcs_scores)): 16 if lcs > threshold: 17 change_labels.append(True) 18 else: 19 change_labels.append(False) 20 return change_labels, lcs_scores 21 if self.rule == "threshold": 22 threshold = self.threshold 23 change_labels = [] 24 for lcs in range(len(lcs_scores)): 25 if lcs > threshold: 26 change_labels.append(True) 27 else: 28 change_labels.append(False) 29 return change_labels, lcs_scores 30 if self.rule == "proportion": 31 sort_scores = sorted(np.array(lcs_scores)) 32 threshold_index = int(len(lcs_scores) * (1.0 - self.proportion)) 33 threshold = sort_scores[threshold_index] 34 change_labels = [] 35 for lcs in range(len(lcs_scores)): 36 if lcs > threshold: 37 change_labels.append(True) 38 else: 39 change_labels.append(False) 40 return change_labels, lcs_scores
2、通过比较预测值和实际值来计算每个点的掉落率。运行filter_anomaly()函数以通过参数“ rule”过滤掉异常。
1 def detect_anomaly_regression(self, predicted_series1, practical_series1, predicted_series2, practical_series2): 2 """ 3 It calculates the drop ratio of each point by comparing the predicted value and practical value. 4 Then it runs filter_anomaly() function to filter out the anomalies by the parameter "rule". 5 :param predicted_series1: list<float> | the predicted values of the KPI series 1. 6 :param practical_series1: list<float> | the practical values of the KPI series 1. 7 :param predicted_series2: list<float> | the predicted values of the KPI series 2. 8 :param practical_series2: list<float> | the practical values of the KPI series 2. 9 :return: 10 """ 11 change_ratios1 = [] 12 change_ratios2 = [] 13 change_scores = [] 14 for i in range(len(practical_series1)): 15 c1 = (practical_series1[i] - predicted_series1[i]) / (predicted_series1[i] + 1e-7) 16 c2 = (practical_series2[i] - predicted_series2[i]) / (predicted_series2[i] + 1e-7) 17 change_ratios1.append(c1) 18 change_ratios2.append(c2) 19 s = (abs(c1) + abs(c2)) / 2.0 20 change_scores.append(s) 21 22 change_labels = self.filter_anomaly(change_ratios1, change_ratios2, change_scores) 23 return change_ratios1, change_ratios2, change_labels, change_scores
3、检测回归方法的异常
1 def filter_anomaly(self, change_ratios1, change_ratios2, change_scores): 2 """ 3 It detects the anomalies which are measured by regression method. 4 - gauss: threshold1 = mean - self.sigma * std, threshold2 = mean + self.sigma * std 5 - threshold: the given threshold variable 6 - proportion: threshold = sort_scores[threshold_index] 7 :param change_ratios1: list<float> | the change ratios of the KPI1. 8 :param change_ratios2: list<float> | the change ratios of the KPI2. 9 :param change_scores: list<float> | the average of the change anomaly degree of the two change ratios. 10 :return: list<bool> | the list of the labels where "True" stands for an anomaly. 11 """ 12 if self.rule == ‘gauss‘: 13 mean = np.mean(change_ratios1) 14 std = np.std(change_ratios1) 15 threshold1 = mean - self.sigma * std 16 threshold2 = mean + self.sigma * std 17 change_labels1 = self.filter_by_threshold(change_ratios1, threshold1, threshold2) 18 mean = np.mean(change_ratios2) 19 std = np.std(change_ratios2) 20 threshold1 = mean - self.sigma * std 21 threshold2 = mean + self.sigma * std 22 change_labels2 = self.filter_by_threshold(change_ratios2, threshold1, threshold2) 23 change_labels = list(np.array(change_labels1) + np.array(change_labels2)) 24 return change_labels 25 26 if self.rule == "threshold": 27 threshold = self.threshold 28 change_labels1 = self.filter_by_threshold(change_ratios1, -threshold, threshold) 29 change_labels2 = self.filter_by_threshold(change_ratios2, -threshold, threshold) 30 change_labels = list(np.array(change_labels1) + np.array(change_labels2)) 31 return change_labels 32 33 if self.rule == "proportion": 34 sort_scores = sorted(np.array(change_scores)) 35 threshold_index = int(len(change_scores) * (1.0 - self.proportion)) 36 threshold = sort_scores[threshold_index] 37 change_labels = [] 38 for i in range(len(change_scores)): 39 if change_scores[i] > threshold: 40 change_labels.append(True) 41 else: 42 change_labels.append(False) 43 return change_labels
4、将过于偏离的点过滤为异常。
1 def filter_by_threshold(self, change_ratios, threshold1, threshold2): 2 """ 3 It filter out the too deviated points as anomalies. 4 :param change_ratios: list<float> | the change ratios. 5 :param threshold1: float | the negative threshold standing for a drop deviation. 6 :param threshold2: float | the positive threshold standing for a rise deviation. 7 :return: list<bool> | the list of the labels where "True" stands for an anomaly. 8 """ 9 change_labels = [] 10 for r in change_ratios: 11 if r < threshold1 or r > threshold2: 12 change_labels.append(True) 13 else: 14 change_labels.append(False) 15 return change_labels
原文:https://www.cnblogs.com/0211ji/p/13294905.html