python溫溼度資料遠端接收處理入庫程式
阿新 • • 發佈:2018-11-10
程式結構
Brige---+
- ----dbconnect.py (資料庫連線和配置資訊)
- ----crc16.py (CRC16校驗程式)
- ---- cdb.py (對資料庫cdb表的操作)
- ----cdsjb.py (對資料庫cdsjb表的操作)
- ----gcxmb.py (對資料庫gcxmb表的操作)
- ----last_data.txt (用於儲存上一次讀取的溫溼度,用於篩選波動太大的資料)
- ----mian.py (第一版 未對資料進行處理的主函式)
- ----maincopy.py (第二版 對資料進行處理後的主函式)
- ----+
程式內容
dbconnect.py
#以字典形式建立資料庫配置資訊,方便修改 PY_MYSQL_CONN_DICT = { "host": '10.103.102.96', "port": 3306, "user": 'bridge', "passwd": '123456', "db": '現場終端資料', "charset": 'utf8' } import pymysql class dbConnect: def __init__(self): self.conn = None self.cursor = None self.__conn_dict = PY_MYSQL_CONN_DICT def connect(self, cursor=pymysql.cursors.DictCursor): try: self.conn = pymysql.connect(**self.__conn_dict) self.cursor = self.conn.cursor(cursor=cursor) #資料庫連線成功並返回遊標 return self.cursor except Exception as e: print("連線失敗:%s"%e) def close(self): self.conn.commit() self.cursor.close() self.conn.close()
crc16.py
def crc16(x):
u'''
@summary: 計算CRC16值
@param x: bytes
@return: 返回2位元組值,類似:b'\x7B\x2A'。
'''
if not isinstance(x, bytes):
raise ValueError('Parameter must be a bytes type')
b = 0xA001
a = 0xFFFF
for byte in x:
a = a ^ byte
for _ in range(8):
last = a % 2
a = a >> 1
if last == 1:
a = a ^ b
aa = '0' * (6 - len(hex(a))) + hex(a)[2:]
ll, hh = int(aa[:2], 16), int(aa[2:], 16)
rtn = '%x' % (hh * 256 + ll & 0xffff)
while len(rtn) < 4:
rtn = '0' + rtn
rtn = hextobytes(rtn)
return rtn
def hextobytes(x):
if not isinstance(x, str):
x = str(x, 'ascii')
return bytes.fromhex(x)
def bytestohex(x):
if not isinstance(x, bytes):
x = bytes(x, 'ascii')
return ''.join(["%02x" % i for i in x]).strip()
cdb.py
from Brige import dbconnect
def fetchone():
conn = dbconnect.dbConnect()
cousor = conn.connect()
sql ="""SELECT gcxmbm cdbm, cdmc, cgqbm, jldw, sfqy, sjlx, sflb, lbfs, lbcs, bz FROM cdb WHERE 1 """
cousor.execute(sql)
result = cousor.fetchone() #獲取第一一條資料並返回結果
conn.close()
return result
def insert(**kwargs):
conn = dbconnect.dbConnect()
cursor = conn.connect()
sql = """insert into cdb(%s) values(%s)"""
key_list = []
value_list = []
for k, v in kwargs.items():
key_list.append(k)
value_list.append('%%(%s)s' % k)
sql = sql % (','.join(key_list), ','.join(value_list))
cursor.execute(sql, kwargs)
conn.close()
cdsjb.py
from Brige import dbconnect
def fetch():
conn = dbconnect.dbConnect()
cursor = conn.connect()
cursor.execute("select * from cdsjb")
result = cursor.fetchall()
conn.close()
print("success")
return result
def insert(**kwargs):
conn = dbconnect.dbConnect()
cursor =conn.connect()
sql = """insert into cdsjb(%s) values(%s)"""
key_list = []
value_list = []
for k, v in kwargs.items():
key_list.append(k)
value_list.append('%%(%s)s' % k)
sql = sql % (','.join(key_list), ','.join(value_list))
cursor.execute(sql, kwargs)
conn.close()
gcxmb.py
from Brige import dbconnect
def insert(gcxmbm,gcxmmc):
conn = dbconnect.dbConnect()
cursor = conn.connect()
sql = """insert into gcxmb(gcxmbm,gcxmmc) values(%s,%s)"""
cursor.execute(sql,(gcxmbm,gcxmmc))
conn.close()
def fetch():
conn = dbconnect.dbConnect()
cursor = conn.connect()
sql =""" select * from gcxmb """
cursor.execute(sql)
result = cursor.fetchone()
conn.close()
return result
main.py
import socket
import codecs
import time
import datetime
from Brige import crc16
import struct
from Brige import cdsjb,cdb,gcxmb
"""
client
connect()
recv()
send()
sendall()
"""
def main():
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(sk)
address = ('10.1.156.82', 8001)
sk.connect(address)
inp = '030300000002c5e9'
gcxmbm, gcxmmc = "631507030103彭陽宴", "溫溼度處理"
cdbm1, cdbm2 = "shidu", "wendu"
cdmc1, cdmc2 = "溼度", "溫度"
while True:
sk.send(codecs.decode(inp, 'hex'))
try:
result = sk.recv(1024)
except socket.timeout:
print('timeout')
crc=crc16.crc16(result[:-2])
if crc==result[-2:]:
print('CRC16成功')
shidu,wendu=struct.unpack('>hh',result[3:7])
shidu/=100
wendu/=100
#比較上一次的溫溼度的波動
with open("last_data",'r') as r:
last_data = r.read()
if not last_data:
continue
last_wendu,last_shidu = last_data.split("%")
last_wendu,last_shidu = float(last_wendu),float(last_shidu)
if(abs(shidu-last_shidu)>5 or abs(wendu-last_wendu>5)):
print("數字波動太大,未寫入資料庫")
continue
else:
#將當前溫度溼度寫入文件作為記錄
data = str(wendu) + "%"+str(shidu)
with open("last_data",'w') as f:
f.write(data)
cur_time = datetime.datetime.now()
print("溼度:%s,溫度:%s" % (shidu, wendu),datetime.datetime.now())
print("開始向資料庫傳入資料...")
gcxmb_data = gcxmb.fetch()
if not gcxmb_data:
try:
gcxmb.insert(gcxmbm,gcxmmc)
except Exception:
print("工程專案表插入失敗")
cdb_data = cdb.fetchone()
if not cdb_data:
try:
cdb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,cdmc=cdmc1,cgqbm="03",jldw="RH",sflb=1)
cdb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, cdmc=cdmc2, cgqbm="03", jldw="攝氏度", sflb=1)
except Exception:
print("測點表插入失敗 ")
try:
cdsjb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,clsj=cur_time,inst=shidu)
cdsjb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, clsj=cur_time, inst=wendu)
print("傳入成功")
except:
print("測點資料表插入失敗")
time.sleep(5)
sk.close()
main()
maincopy.py
import socket
import codecs
import time
import datetime
from Brige import crc16
import struct
from Brige import cdsjb,cdb,gcxmb
"""
client
connect()
recv()
send()
sendall()
"""
def main():
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(sk)
address = ('10.1.156.82', 8001)
sk.connect(address)
inp = '030300000002c5e9' #遠端需要接受的引數
gcxmbm, gcxmmc = "631507030103彭陽宴", "溫溼度處理" #這個為自己設定的工程專案編碼和名稱,用於後面對gcxmb表的操作
#提前定義工程編碼和名稱,因為需要對溫度和溼度進行操作,所以定義兩個,也是用於後面對cdb表的操作
cdbm1, cdbm2 = "shidu", "wendu"
cdmc1, cdmc2 = "溼度", "溫度"
while True:
total_wendu,total_shidu=0,0
list_wendu = []
list_shidu = []
global inst_wendu #瞬時溫度
global inst_shidu #瞬時溼度
global avg_wendu #平均溫度
global avg_shidu #平均溼度
global max_wendu #最大溫度
global max_shidu #最大溼度
global min_wendu #最小溫度
global min_shidu #最小溼度
n =0
begin_time = time.time() #開始讀取是的時間戳
while True:
sk.send(codecs.decode(inp, 'hex'))
try:
result = sk.recv(1024)
except socket.timeout:
print('timeout')
crc=crc16.crc16(result[:-2]) #進行CRC16-2校驗
if crc==result[-2:]:
print('CRC16成功')
shidu,wendu=struct.unpack('>hh',result[3:7]) #用struct方法解析溼度和溫度,看不懂可以自行百度瞭解struct.unpack的用法
shidu/=100
wendu/=100
#比較上一次的溫溼度的波動
with open("last_data",'r') as r:
last_data = r.read()
if not last_data: #如果last_data.txt中沒有資料,就把當前溫室度作為上一次溫溼度進行比較
last_data = str(wendu)+'%'+str(shidu) #字串拼接
last_wendu,last_shidu = last_data.split("%") #字串以%拆分,得到溫溼度
last_wendu,last_shidu = float(last_wendu),float(last_shidu) #轉化為float型數值
if(abs(shidu-last_shidu)>5 or abs(wendu-last_wendu>5)): #比較當前溫溼度與上一次溫溼度差異,如果波動大於5就過濾掉
print(abs(shidu-last_shidu),abs(wendu-last_wendu))
print("數字波動太大,未記錄")
continue
else:
# 將當前溫度溼度寫入文件作為記錄
data = str(wendu) + "%" + str(shidu)
with open("last_data", 'w') as f:
f.write(data)
n += 1
list_shidu.append(shidu)
list_wendu.append(wendu)
total_wendu += wendu #對每一次溫度進行累加
total_shidu +=shidu #對每一次溼度進行累加
end_time = time.time() #獲取結束時的時間戳
if(end_time-begin_time>5): #如果如果兩次間隔大於五秒,停止讀取資料,開始對資料進行處理
inst_wendu = list_wendu[-1] #陣列最後一個就是當時的瞬時值
inst_shidu = list_shidu[-1]
list_wendu.sort() #對陣列進行排序 從小到大
list_shidu.sort()
avg_wendu = total_wendu/n #求平均值
avg_shidu = total_shidu/n
max_wendu = list_wendu[-1] #此時陣列最後一個對最大值
max_shidu = list_shidu[-1]
min_wendu = list_wendu[0] #陣列第一個為最小值
min_shidu = list_shidu[0]
time.sleep(0.1)
break
else: #時間沒有大於五秒則繼續讀取資料
continue
cur_time = datetime.datetime.now() #記錄處理完成的時間
print("最大溫度%s,最小溫度%s,平均溫度%s,瞬時溫度%s" %(max_wendu,min_wendu,avg_wendu,inst_wendu),cur_time)
print("最大溼度%s,最小溼度%s,平均溼度%s,瞬時溼度%s" % (max_shidu, min_shidu, avg_shidu, inst_shidu),cur_time)
print("開始向資料庫傳入資料...")
gcxmb_data = gcxmb.fetch() #先對gcxmb表進行查詢,如果存在資料,則進行下一步,如果不存在就插入資料
if not gcxmb_data:
try:
gcxmb.insert(gcxmbm,gcxmmc)
except Exception:
print("工程專案表插入失敗")
cdb_data = cdb.fetchone() #對cdb表進行查詢,存在資料則跳過,不存在則插入資料
if not cdb_data:
try:
cdb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,cdmc=cdmc1,cgqbm="03",jldw="RH",sflb=1)
cdb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, cdmc=cdmc2, cgqbm="03", jldw="攝氏度", sflb=1)
except Exception:
print("測點表插入失敗 ")
try: #插入處理後的資料
cdsjb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,clsj=cur_time,avg=avg_shidu,max=max_shidu,min=min_shidu,inst=inst_shidu)
cdsjb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, clsj=cur_time,avg=avg_wendu,max=max_wendu,min=min_wendu,inst=inst_wendu)
print("傳入成功")
except:
print("測點資料表插入失敗")
sk.close()
main()
執行截圖:
注意:
1.main.py和maincopy.py只需要執行其中一個,last_data.txt最開始為空。
2.執行時注意修改配置資訊
3.程式有時候可能有bug,多執行幾次就好了。
4.所有py檔案建立在一個名為Brige的包下面,也可以自己修改