1. 程式人生 > 實用技巧 >檢測異常點並過濾

檢測異常點並過濾

1、 檢測通過區域性相關跟蹤方法測量的異常,不同方法對應不同的閾值。

 1  def detect_anomaly_lcs(self, lcs_scores):
 2         """
 3         It detects the anomalies which are measured by local correlation tracking method.
 4         - gauss: threshold = 0.0 + self.sigma * std
 5         - threshold: the given threshold variable
 6         - proportion: threshold = sort_scores[threshold_index]
7 :param lcs_scores: list<float> | the list of local correlation scores 8 :return: 9 """ 10 if self.rule == "gauss": 11 mean = 0.0 12 std = np.std(lcs_scores) 13 threshold = mean + self.sigma * std 14 change_labels = []
15 for lcs in range(len(lcs_scores)): 16 if lcs > threshold: 17 change_labels.append(True) 18 else: 19 change_labels.append(False) 20 return change_labels, lcs_scores 21 if self.rule == "threshold
": 22 threshold = self.threshold 23 change_labels = [] 24 for lcs in range(len(lcs_scores)): 25 if lcs > threshold: 26 change_labels.append(True) 27 else: 28 change_labels.append(False) 29 return change_labels, lcs_scores 30 if self.rule == "proportion": 31 sort_scores = sorted(np.array(lcs_scores)) 32 threshold_index = int(len(lcs_scores) * (1.0 - self.proportion)) 33 threshold = sort_scores[threshold_index] 34 change_labels = [] 35 for lcs in range(len(lcs_scores)): 36 if lcs > threshold: 37 change_labels.append(True) 38 else: 39 change_labels.append(False) 40 return change_labels, lcs_scores

2、通過比較預測值和實際值來計算每個點的掉落率。執行filter_anomaly()函式以通過引數“ rule”過濾掉異常。

 1     def detect_anomaly_regression(self, predicted_series1, practical_series1, predicted_series2, practical_series2):
 2         """
 3         It calculates the drop ratio of each point by comparing the predicted value and practical value.
 4         Then it runs filter_anomaly() function to filter out the anomalies by the parameter "rule".
 5         :param predicted_series1: list<float> | the predicted values of the KPI series 1.
 6         :param practical_series1: list<float> | the practical values of the KPI series 1.
 7         :param predicted_series2: list<float> | the predicted values of the KPI series 2.
 8         :param practical_series2: list<float> | the practical values of the KPI series 2.
 9         :return:
10         """
11         change_ratios1 = []
12         change_ratios2 = []
13         change_scores = []
14         for i in range(len(practical_series1)):
15             c1 = (practical_series1[i] - predicted_series1[i]) / (predicted_series1[i] + 1e-7)
16             c2 = (practical_series2[i] - predicted_series2[i]) / (predicted_series2[i] + 1e-7)
17             change_ratios1.append(c1)
18             change_ratios2.append(c2)
19             s = (abs(c1) + abs(c2)) / 2.0
20             change_scores.append(s)
21 
22         change_labels = self.filter_anomaly(change_ratios1, change_ratios2, change_scores)
23         return change_ratios1, change_ratios2, change_labels, change_scores

3、檢測迴歸方法的異常

 1     def filter_anomaly(self, change_ratios1, change_ratios2, change_scores):
 2         """
 3         It detects the anomalies which are measured by regression method.
 4         - gauss: threshold1 = mean - self.sigma * std, threshold2 = mean + self.sigma * std
 5         - threshold: the given threshold variable
 6         - proportion: threshold = sort_scores[threshold_index]
 7         :param change_ratios1: list<float> | the change ratios of the KPI1.
 8         :param change_ratios2: list<float> | the change ratios of the KPI2.
 9         :param change_scores: list<float> | the average of the change anomaly degree of the two change ratios.
10         :return: list<bool> | the list of the labels where "True" stands for an anomaly.
11         """
12         if self.rule == 'gauss':
13             mean = np.mean(change_ratios1)
14             std = np.std(change_ratios1)
15             threshold1 = mean - self.sigma * std
16             threshold2 = mean + self.sigma * std
17             change_labels1 = self.filter_by_threshold(change_ratios1, threshold1, threshold2)
18             mean = np.mean(change_ratios2)
19             std = np.std(change_ratios2)
20             threshold1 = mean - self.sigma * std
21             threshold2 = mean + self.sigma * std
22             change_labels2 = self.filter_by_threshold(change_ratios2, threshold1, threshold2)
23             change_labels = list(np.array(change_labels1) + np.array(change_labels2))
24             return change_labels
25 
26         if self.rule == "threshold":
27             threshold = self.threshold
28             change_labels1 = self.filter_by_threshold(change_ratios1, -threshold, threshold)
29             change_labels2 = self.filter_by_threshold(change_ratios2, -threshold, threshold)
30             change_labels = list(np.array(change_labels1) + np.array(change_labels2))
31             return change_labels
32 
33         if self.rule == "proportion":
34             sort_scores = sorted(np.array(change_scores))
35             threshold_index = int(len(change_scores) * (1.0 - self.proportion))
36             threshold = sort_scores[threshold_index]
37             change_labels = []
38             for i in range(len(change_scores)):
39                 if change_scores[i] > threshold:
40                     change_labels.append(True)
41                 else:
42                     change_labels.append(False)
43             return change_labels

4、將過於偏離的點過濾為異常。

 1     def filter_by_threshold(self, change_ratios, threshold1, threshold2):
 2         """
 3         It filter out the too deviated points as anomalies.
 4         :param change_ratios: list<float> | the change ratios.
 5         :param threshold1: float | the negative threshold standing for a drop deviation.
 6         :param threshold2: float | the positive threshold standing for a rise deviation.
 7         :return: list<bool> | the list of the labels where "True" stands for an anomaly.
 8         """
 9         change_labels = []
10         for r in change_ratios:
11             if r < threshold1 or r > threshold2:
12                 change_labels.append(True)
13             else:
14                 change_labels.append(False)
15         return change_labels