基於PySpark的網路服務異常檢測系統 (四) Mysql與SparkSQL對接同步資料 kmeans演算法計算預測異常
阿新 • • 發佈:2018-12-15
def get_current_timestamp():
2 """
3 獲取當前時間戳
4 :return:
5 """
6 return int(time.time()) * 1000
7
8
9 def convert_datetime_to_timestamp(dtime):
10 """
11 把datetime轉換為時間戳
12 :param datetime:
13 :return:
14 """
15 timestamp = time.mktime(dtime.timetuple())
16 return int(timestamp) * 1000
17
18
19 def get_cache_cat_data(start_time, end_time, force=False):
20 """
21 獲取指定時間段的cat資料
22 :param start_time:
23 :param end_time:
24 :return:
25 """
26 key = 'GET_CAT_RES_DATA_{0}_TO_{1}'.format(
27 start_time, end_time
28 )
29 content = cache.get(key)
30 if force or not content:
31 content = get_cat_res_data(start_time, end_time)
32 if content:
33 cache.set(key, content, timeout=CACHE_TIMEOUT_DEFAULT)
34
35 return content
36
37
38 def add_normal_cat_data(data):
39 """
40 構建資料model 用yield每次返回1000條資料
41 :param data
42 :return:
43 """
44 tmp_cat_normal_models = []
45
46 for cat_data in data:
47 response_time = cat_data.get('response_time')
48 request_count = cat_data.get('request_count') or 1
49 fail_count = cat_data.get('fail_count') or 1
50 cat_data['id'] = str(uuid4())
51 if response_time < 1.2 and (fail_count / request_count) < 0.2:
52 cat_obj = CatNormalResource(
53 **cat_data
54 )
55 tmp_cat_normal_models.append(cat_obj)
56
57 if len(tmp_cat_normal_models) >= 1000:
58 yield tmp_cat_normal_models
59 tmp_cat_normal_models = []
60
61 yield tmp_cat_normal_models
62
63
64 @celery_app.task
65 def insert_normal_cat_data(data):
66 """
67 使用非同步,每次用bulk 批量插入 1000條資料
68 :param data:
69 :return:
70 """
71 try:
72 for i in add_normal_cat_data(data):
73 CatNormalResource.objects.bulk_create(i)
74 except Exception as e:
75 print(e)
76 raise RsError('插入資料庫失敗')
77
78
79 def insert_normal_cat_job():
80 """
81 定時匯入前一天的正常資料
82 :return:
83 """
84 logger.info('insert_normal_cat_job ....')
85 dt_time = datetime.datetime.now() + datetime.timedelta(days=-1)
86 start_time = convert_datetime_to_timestamp(dt_time)
87 end_time = get_current_timestamp()
88 data = get_cache_cat_data(start_time, end_time)
89 insert_normal_cat_data.delay(data)