Python 生成周期性波動的資料 可指定數值範圍3
阿新 • • 發佈:2021-06-29
import numpy as np import math import matplotlib.pyplot as plt import pandas as pd import pymssql from random import choice import json import time import os class MSSQL: # 類的建構函式,初始化資料庫連線ip或者域名,以及使用者名稱,密碼,要連線的資料庫名稱 def __init__(self,host,user,pwd,db): self.host=host self.user=user self.pwd=pwd self.db=db # 得到資料庫連線資訊函式,返回: conn.cursor() def __GetConnect(self): self.conn=pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8') cur=self.conn.cursor() #將資料庫連線資訊,賦值給cur。 if not cur: raise(NameError,"連線資料庫失敗") else: return cur #執行查詢語句,返回的是一個包含tuple的list,list的元素是記錄行,tuple的元素是每行記錄的欄位 def ExecQuery(self,sql): cur = self.__GetConnect() #獲得資料庫連線資訊 cur.execute(sql) #執行Sql語句 resList = cur.fetchall() #獲得所有的查詢結果 self.conn.close() #查詢完畢後必須關閉連線 return resList #返回查詢結果 #執行Sql語句函式,無返回結果的,方向修改的 def ExecNonQuery(self,sql): cur = self.__GetConnect() cur.execute(sql) self.conn.commit() self.conn.close() # new一個物件 # mssql = MSSQL('192.168.2.51', 'sa', 'Sql123456', 'AEHMS20201216') # mssql = MSSQL('.', 'sa', 'sa', 'AEHMS20201216') mssql = None def randomSplit(total):# 隨機分組資料 foo = [60,80,150,200,300,600] count = 0 arr= [] bl = True while (bl): num = choice(foo)# 隨機取值 if count + num >= total:# 最後隨機取的那個數 超出當前範圍 break break arr.append(num) count+=num if total != count:# 追加最後一個元素 arr.append(total-count) print(count) print(arr) seg = []# 值如:[(0,50),(50,200)] print('--------------') curCount=0 for num in arr: start = curCount end = curCount + num seg.append((start, end)) print(start, end) curCount=end print(seg) return seg def createData(pointNum, avgValue): # 生成周期性資料 long=pointNum # 400個步長,x軸的總長度 base=avgValue # 均值 ybase = np.zeros((1,long))[0] + base # 所有資料 period_multiply = 0.1 # 越大,幅值越大,調整波峰 period_frequency = 500 # 越大,週期越大 all_period_multiply = [0.1, 0,2]# 預設多個幅值 [0.1, 0,2, 0.3, 0.4, 0.5] all_period_frequency = [50, 150, 200, 300, 400, 600, 800, 1000, 1300]# 預設多個週期值 seg = randomSplit(pointNum)# 原始: seg = [(0, pointNum)] for (i,j) in seg: # 一組一組資料的遍歷 print(i, j) period_multiply = choice(all_period_multiply)# 隨機取值 period_frequency = choice(all_period_frequency)# 隨機取值 n = j-i # n=40,40 50 x = np.arange(n) season1 = 0.2 * np.array([math.sin(i*0.2/period_frequency*math.pi) for i in x]) season2 = 0.5 * np.array([math.sin(i*0.5/period_frequency*math.pi) for i in x]) noise = np.random.normal(0, 0.2, len(x)) y = season1 + season2 + noise # 可以疊加多尺度週期和噪聲 # y = season1+season2 for idx in range(i, j): # 遍歷具體的點 # print(idx, period_multiply) value1 = ybase[idx] + y[idx-i] * period_multiply value2 = round(value1, 3) # 保留三位小數 ybase[idx] = value2 # plt.figure(figsize=(15, 3.5)) # plt.plot(ybase) # plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=2.0) # plt.show() return ybase # # 測試 # points = createData(200, 1.2) # print(points) def getIdsByCode(code): # 獲取 ids by code sql = "SELECT ID FROM tb_SensorRecord WHERE Code='{}' AND GetTime>='2021-01-01' AND GetTime<='2021-07-01' ORDER BY ID ASC".format(code) results = mssql.ExecQuery(sql) arr_ids = [] for row in results: arr_ids.append(row[0]) return arr_ids # # 測試 # ids = getIdsByCode('080906') # print(len(ids)) # # 測試 執行sql語句 # mssql.ExecNonQuery("UPDATE tb_SensorRecord SET NewValue1='1' WHERE ID='4178839'") # print('ok') def getSensor(): # 獲取所有的感測器,從Excel中讀取感測器 arr_sensors = [] df = pd.read_excel('1.xlsx', dtype={'編碼': np.str_}) # dtype指定列的資料型別 for index, row in df.iterrows(): arr_sensors.append({"code":row["編碼"], "avgValue":row["均值"]}) return arr_sensors # # 測試 # sensors = getSensor() # print(sensors) # 主邏輯 startTime = time.perf_counter() curCode = '' try: dbConfig = { 'host' : '', 'user' : '', 'pwd' : '', 'db' : '' } with open('dbConfig.json', 'r') as f:# 從檔案中讀取配置資訊 dbConfig = eval(json.load(f)) print(dbConfig) mssql = MSSQL(dbConfig["host"], dbConfig["user"], dbConfig["pwd"], dbConfig["db"]) sensors = getSensor() for item in sensors: # 遍歷感測器 print(item) code = item["code"] avgValue = item["avgValue"] curCode = code ids = getIdsByCode(code) points = createData(len(ids), avgValue) sql = "" for index, value in enumerate(ids): print(index, value, points[index]) sql += "UPDATE tb_SensorRecord SET NewValue1='{0}' WHERE ID='{1}';".format(points[index], value) if (index % 8000 == 0):# 間隔寫入到資料庫 print("正在寫入到資料庫1") mssql.ExecNonQuery(sql) sql = "" if sql.strip() != '': print("正在寫入到資料庫2") mssql.ExecNonQuery(sql) sql = "" print('處理完成') except Exception as e: print('Error:', e) with open('err.txt', 'w') as f: json.dump(str(e)+",當前感測器:"+curCode, f) finally: endTime = time.perf_counter() print("The function run time is : %.03f seconds" %(endTime-startTime)) os.system('pause')# 避免執行完後自動關閉控制檯
需要的外部檔案
json配置檔案,用於放資料庫連結
"{\"host\": \".\", \"user\": \"sa\", \"pwd\": \"sa\", \"db\": \"AEHMS20210629\"}"
Excel檔案,用於指定需要修改的感測器和均值