1. 程式人生 > 其它 >Python 生成周期性波動的資料 可指定數值範圍3

Python 生成周期性波動的資料 可指定數值範圍3

import numpy as np
import math
import matplotlib.pyplot as plt
import pandas as pd
import pymssql
from random import choice
import json
import time
import os




class MSSQL:
    # 類的建構函式,初始化資料庫連線ip或者域名,以及使用者名稱,密碼,要連線的資料庫名稱
    def __init__(self,host,user,pwd,db): 
        self.host=host
        self.user=user
        self.pwd=pwd
        self.db=db
    
    # 得到資料庫連線資訊函式,返回: conn.cursor()
    def __GetConnect(self):
        self.conn=pymssql.connect(host=self.host,
                                  user=self.user,
                                  password=self.pwd,
                                  database=self.db,
                                  charset='utf8')
        cur=self.conn.cursor()  #將資料庫連線資訊,賦值給cur。
        if not cur:
            raise(NameError,"連線資料庫失敗")
        else:
            return cur

        
    #執行查詢語句,返回的是一個包含tuple的list,list的元素是記錄行,tuple的元素是每行記錄的欄位
    def ExecQuery(self,sql):  
        cur = self.__GetConnect() #獲得資料庫連線資訊
        cur.execute(sql)          #執行Sql語句
        resList = cur.fetchall()  #獲得所有的查詢結果
        self.conn.close()         #查詢完畢後必須關閉連線
        return resList            #返回查詢結果
    
    
    #執行Sql語句函式,無返回結果的,方向修改的
    def ExecNonQuery(self,sql):
        cur = self.__GetConnect()
        cur.execute(sql)
        self.conn.commit()
        self.conn.close()





# new一個物件
# mssql = MSSQL('192.168.2.51', 'sa', 'Sql123456', 'AEHMS20201216')
# mssql = MSSQL('.', 'sa', 'sa', 'AEHMS20201216')
mssql = None








def randomSplit(total):# 隨機分組資料
    
    foo = [60,80,150,200,300,600]
    count = 0
    arr= []
    bl = True
    
    while (bl):
        num = choice(foo)# 隨機取值
        if count + num >= total:# 最後隨機取的那個數 超出當前範圍 break
            break
        arr.append(num)
        count+=num

    if total != count:# 追加最後一個元素
        arr.append(total-count)

    print(count)
    print(arr)

    
    seg = []# 值如:[(0,50),(50,200)]
    
    print('--------------')
    curCount=0
    for num in arr:
        start = curCount
        end = curCount + num
        seg.append((start, end))
        print(start, end)
        curCount=end

    print(seg)
    return seg








def createData(pointNum, avgValue): # 生成周期性資料

    long=pointNum # 400個步長,x軸的總長度
    base=avgValue # 均值
    ybase = np.zeros((1,long))[0] + base # 所有資料
    

    period_multiply = 0.1 # 越大,幅值越大,調整波峰
    period_frequency = 500 # 越大,週期越大
    
    all_period_multiply = [0.1, 0,2]# 預設多個幅值 [0.1, 0,2, 0.3, 0.4, 0.5]
    all_period_frequency = [50, 150, 200, 300, 400, 600, 800, 1000, 1300]# 預設多個週期值

    
    seg = randomSplit(pointNum)# 原始: seg = [(0, pointNum)]

    for (i,j) in seg: # 一組一組資料的遍歷
        print(i, j)
        
        period_multiply = choice(all_period_multiply)# 隨機取值
        period_frequency = choice(all_period_frequency)# 隨機取值
        
        n = j-i # n=40,40 50
        x = np.arange(n)
        season1 = 0.2 * np.array([math.sin(i*0.2/period_frequency*math.pi) for i in x])
        season2 = 0.5 * np.array([math.sin(i*0.5/period_frequency*math.pi) for i in x])
        noise = np.random.normal(0, 0.2, len(x))
        y = season1 + season2 + noise # 可以疊加多尺度週期和噪聲
        # y = season1+season2
        
        for idx in range(i, j): # 遍歷具體的點
            # print(idx, period_multiply)
            value1 = ybase[idx] + y[idx-i] * period_multiply
            value2 = round(value1, 3) # 保留三位小數
            ybase[idx] = value2
     

    # plt.figure(figsize=(15, 3.5))
    # plt.plot(ybase)
    # plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=2.0)
    # plt.show()
    
    return ybase


# # 測試
# points = createData(200, 1.2)
# print(points)






def getIdsByCode(code): # 獲取 ids by code
    sql = "SELECT ID FROM tb_SensorRecord WHERE Code='{}' AND GetTime>='2021-01-01' AND GetTime<='2021-07-01' ORDER BY ID ASC".format(code)
    results = mssql.ExecQuery(sql)
    arr_ids = []
    for row in results:
        arr_ids.append(row[0])        
    return arr_ids


# # 測試
# ids = getIdsByCode('080906')
# print(len(ids))



# # 測試 執行sql語句
# mssql.ExecNonQuery("UPDATE tb_SensorRecord SET NewValue1='1' WHERE ID='4178839'")
# print('ok')






def getSensor(): # 獲取所有的感測器,從Excel中讀取感測器
    arr_sensors = []
    df = pd.read_excel('1.xlsx', dtype={'編碼': np.str_}) # dtype指定列的資料型別
    for index, row in df.iterrows():
        arr_sensors.append({"code":row["編碼"], "avgValue":row["均值"]})
    return arr_sensors


# # 測試
# sensors = getSensor()
# print(sensors)








# 主邏輯

startTime = time.perf_counter()
curCode = ''

try:
    dbConfig = {
        'host' : '',
        'user' : '',
        'pwd' : '',
        'db' : ''
    }
    with open('dbConfig.json', 'r') as f:# 從檔案中讀取配置資訊
        dbConfig = eval(json.load(f))
    print(dbConfig)

    mssql = MSSQL(dbConfig["host"], dbConfig["user"], dbConfig["pwd"], dbConfig["db"])

    sensors = getSensor()
    for item in sensors: # 遍歷感測器
        print(item)
        code = item["code"]
        avgValue = item["avgValue"]
        curCode = code
        
        ids = getIdsByCode(code)
        points = createData(len(ids), avgValue)
        
        sql = ""
        for index, value in enumerate(ids):
            print(index, value, points[index])            
            sql += "UPDATE tb_SensorRecord SET NewValue1='{0}' WHERE ID='{1}';".format(points[index], value)            
            if (index % 8000 == 0):# 間隔寫入到資料庫
                print("正在寫入到資料庫1")                
                mssql.ExecNonQuery(sql)
                sql = ""
            
        if sql.strip() != '':
            print("正在寫入到資料庫2")
            mssql.ExecNonQuery(sql)
            sql = ""
      
    print('處理完成') 


except Exception as e:
    print('Error:', e)
    with open('err.txt', 'w') as f:
        json.dump(str(e)+",當前感測器:"+curCode, f)


finally:
    endTime = time.perf_counter()
    print("The function run time is : %.03f seconds" %(endTime-startTime))
    os.system('pause')# 避免執行完後自動關閉控制檯


需要的外部檔案

json配置檔案,用於放資料庫連結

"{\"host\": \".\", \"user\": \"sa\", \"pwd\": \"sa\", \"db\": \"AEHMS20210629\"}"

Excel檔案,用於指定需要修改的感測器和均值

修改後的資料預覽