1. 程式人生 > 其它 >Python異常檢測以及報警實現

Python異常檢測以及報警實現

# -*- encoding: utf-8 -*- ''' @File : launcher.py @Time : 2021/11/16 17:55:00 @Author : peng.wu @Version : 1.0 @Desc : 3sigma異常值檢測 '''
# here put the import lib import numpy as np import pandas as pd from matplotlib import pyplot as plt import seaborn as sns import requests from DingDingBot import DDBOT
def load_data(cols:list, filter: str): """ 載入資料 """ df = pd.read_csv("data\detail.csv", encoding="utf-8") p_col = ["stat_day", "stat_hours", "app_channel", "pv", "ids_uv", "users_uv"] df.columns = p_col df = df[df["app_channel"] == filter] return df[cols]
def sigma_stat(N: int): """ N is k sigma """
# 預處理資料 cols = ["stat_day", "stat_hours", "pv"] init_data = load_data(cols, "靈錫") init_data["stat_hours"] = init_data["stat_hours"].map(str) init_data["date"] = init_data["stat_day"].str.cat(init_data["stat_hours"], sep=" ") # 模型初始化 data_y = init_data["pv"] data_x = init_data["date"]
ymean = np.mean(data_y) ystd = np.std(data_y) threshold1 = ymean - N * ystd threshold2 = ymean + N * ystd print("pv平均值:{0}, 標準差:{1}, sigma-min:{2}, sigma-max:{3}".format(ymean, ystd, threshold1, threshold2))
# 將異常值儲存 outlier = [] outlier_x = [] for i in range(0, len(data_y)): if (data_y.iloc[i] < threshold1 or data_y.iloc[i] > threshold2): outlier.append(data_y.iloc[i]) outlier_x.append(data_x.iloc[i]) else: continue
result = zip(outlier_x, outlier) print("\n異常資料如下:\n") for item in result: print(item)
return data_y
def quantile_stat(data_x, data_y): """ 分位數異常檢測 """
# sns.boxplot(data_y) 箱線圖 # 計算上四分位數和下四分位數 q1 = data_y.quantile(0.25) q3 = data_y.quantile(0.75)
low_outlier = q1 - 1.5 * (q3 - q1) high_outlier = q3 + 1.5 * (q3 - q1)
# 檢測異常點 q_outlier = [] q_outlier_x = [] for i in range(0, len(data_y)): if (data_y.iloc[i] < lower_outlier or data_y.iloc[i] > high_outlier): q_outlier.append(data_y.iloc[i]) q_outlier_x.append(data_x.iloc[i]) else: continue
def send_msg(): """ 釘釘異常檢測報警 """ pass
if __name__ == '__main__': pass