1. 程式人生 > >python溫溼度資料遠端接收處理入庫程式

python溫溼度資料遠端接收處理入庫程式

程式結構

Brige---+

  •         ----dbconnect.py    (資料庫連線和配置資訊)
  •         ----crc16.py     (CRC16校驗程式)
  •         ---- cdb.py        (對資料庫cdb表的操作)
  •         ----cdsjb.py        (對資料庫cdsjb表的操作)
  •         ----gcxmb.py        (對資料庫gcxmb表的操作)
  •         ----last_data.txt       (用於儲存上一次讀取的溫溼度,用於篩選波動太大的資料)
  •         ----mian.py                (第一版  未對資料進行處理的主函式)
  •         ----maincopy.py    (第二版 對資料進行處理後的主函式)
  •         ----+

程式內容       

dbconnect.py 

#以字典形式建立資料庫配置資訊,方便修改
PY_MYSQL_CONN_DICT = {                
    "host": '10.103.102.96',
    "port": 3306,
    "user": 'bridge',
    "passwd": '123456',
    "db": '現場終端資料',
    "charset": 'utf8'
}
import pymysql
class dbConnect:
    def __init__(self):
        self.conn = None
        self.cursor = None
        self.__conn_dict = PY_MYSQL_CONN_DICT

    def connect(self, cursor=pymysql.cursors.DictCursor):
        try:
            self.conn = pymysql.connect(**self.__conn_dict)
            self.cursor = self.conn.cursor(cursor=cursor)    #資料庫連線成功並返回遊標
            return self.cursor
        except Exception as e:
            print("連線失敗:%s"%e)

    def close(self):
        self.conn.commit()
        self.cursor.close()
        self.conn.close()

crc16.py

def crc16(x):
    u'''
    @summary: 計算CRC16值
    @param x: bytes
    @return: 返回2位元組值,類似:b'\x7B\x2A'。
    '''
    if not isinstance(x, bytes):
        raise ValueError('Parameter must be a bytes type')
    b = 0xA001
    a = 0xFFFF
    for byte in x:
        a = a ^ byte
        for _ in range(8):
            last = a % 2
            a = a >> 1
            if last == 1:
                a = a ^ b
    aa = '0' * (6 - len(hex(a))) + hex(a)[2:]
    ll, hh = int(aa[:2], 16), int(aa[2:], 16)
    rtn = '%x' % (hh * 256 + ll & 0xffff)
    while len(rtn) < 4:
        rtn = '0' + rtn
    rtn = hextobytes(rtn)
    return rtn
def hextobytes(x):
    if not isinstance(x, str):
        x = str(x, 'ascii')
    return bytes.fromhex(x)
def bytestohex(x):
    if not isinstance(x, bytes):
        x = bytes(x, 'ascii')
    return ''.join(["%02x" % i for i in x]).strip()

cdb.py

from Brige import dbconnect
def fetchone():
    conn = dbconnect.dbConnect()
    cousor = conn.connect()
    sql ="""SELECT  gcxmbm cdbm, cdmc, cgqbm, jldw, sfqy, sjlx, sflb, lbfs, lbcs, bz FROM cdb WHERE 1 """
    cousor.execute(sql)
    result = cousor.fetchone()    #獲取第一一條資料並返回結果
    conn.close()
    return result

def insert(**kwargs):
    conn = dbconnect.dbConnect()
    cursor = conn.connect()
    sql = """insert into cdb(%s) values(%s)"""
    key_list = []
    value_list = []
    for k, v in kwargs.items():
        key_list.append(k)
        value_list.append('%%(%s)s' % k)
    sql = sql % (','.join(key_list), ','.join(value_list))
    cursor.execute(sql, kwargs)
    conn.close()

cdsjb.py

from Brige import dbconnect

def fetch():
    conn = dbconnect.dbConnect()
    cursor = conn.connect()
    cursor.execute("select * from cdsjb")
    result = cursor.fetchall()
    conn.close()
    print("success")
    return result
def insert(**kwargs):
    conn = dbconnect.dbConnect()
    cursor =conn.connect()
    sql = """insert into cdsjb(%s) values(%s)"""
    key_list = []
    value_list = []
    for k, v in kwargs.items():
        key_list.append(k)
        value_list.append('%%(%s)s' % k)
    sql = sql % (','.join(key_list), ','.join(value_list))
    cursor.execute(sql, kwargs)
    conn.close()

gcxmb.py

from Brige import dbconnect

def insert(gcxmbm,gcxmmc):
    conn = dbconnect.dbConnect()
    cursor = conn.connect()
    sql = """insert into gcxmb(gcxmbm,gcxmmc) values(%s,%s)"""
    cursor.execute(sql,(gcxmbm,gcxmmc))
    conn.close()

def fetch():
    conn = dbconnect.dbConnect()
    cursor = conn.connect()
    sql =""" select * from gcxmb """
    cursor.execute(sql)
    result = cursor.fetchone()
    conn.close()
    return result

main.py

import socket
import codecs
import time
import datetime
from Brige import crc16
import struct
from Brige import cdsjb,cdb,gcxmb

"""
client
    connect()
    recv()
    send()
    sendall()

"""
def main():
    sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    print(sk)
    address = ('10.1.156.82', 8001)
    sk.connect(address)
    inp = '030300000002c5e9'
    gcxmbm, gcxmmc = "631507030103彭陽宴", "溫溼度處理"
    cdbm1, cdbm2 = "shidu", "wendu"
    cdmc1, cdmc2 = "溼度", "溫度"
    while True:
        sk.send(codecs.decode(inp, 'hex'))
        try:
            result = sk.recv(1024)
        except socket.timeout:
            print('timeout')
        crc=crc16.crc16(result[:-2])
        if crc==result[-2:]:
            print('CRC16成功')
        shidu,wendu=struct.unpack('>hh',result[3:7])
        shidu/=100
        wendu/=100
        #比較上一次的溫溼度的波動
        with open("last_data",'r') as r:
            last_data =  r.read()
        if not last_data:
            continue
        last_wendu,last_shidu = last_data.split("%")
        last_wendu,last_shidu = float(last_wendu),float(last_shidu)
        if(abs(shidu-last_shidu)>5 or abs(wendu-last_wendu>5)):
            print("數字波動太大,未寫入資料庫")
            continue
        else:
            #將當前溫度溼度寫入文件作為記錄
            data = str(wendu) + "%"+str(shidu)
            with open("last_data",'w') as f:
                f.write(data)
            cur_time = datetime.datetime.now()
            print("溼度:%s,溫度:%s" % (shidu, wendu),datetime.datetime.now())
            print("開始向資料庫傳入資料...")
            gcxmb_data = gcxmb.fetch()
            if not gcxmb_data:
                try:
                    gcxmb.insert(gcxmbm,gcxmmc)
                except Exception:
                    print("工程專案表插入失敗")
            cdb_data = cdb.fetchone()
            if not cdb_data:
                try:
                    cdb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,cdmc=cdmc1,cgqbm="03",jldw="RH",sflb=1)
                    cdb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, cdmc=cdmc2, cgqbm="03", jldw="攝氏度", sflb=1)
                except Exception:
                    print("測點表插入失敗 ")
            try:
                cdsjb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,clsj=cur_time,inst=shidu)
                cdsjb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, clsj=cur_time, inst=wendu)
                print("傳入成功")
            except:
                print("測點資料表插入失敗")
            time.sleep(5)
    sk.close()
main()
maincopy.py
import socket
import codecs
import time
import datetime
from Brige import crc16
import struct
from Brige import cdsjb,cdb,gcxmb

"""
client
    connect()
    recv()
    send()
    sendall()

"""
def main():
    sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    print(sk)
    address = ('10.1.156.82', 8001)
    sk.connect(address)
    inp = '030300000002c5e9'   #遠端需要接受的引數
    gcxmbm, gcxmmc = "631507030103彭陽宴", "溫溼度處理"     #這個為自己設定的工程專案編碼和名稱,用於後面對gcxmb表的操作

    #提前定義工程編碼和名稱,因為需要對溫度和溼度進行操作,所以定義兩個,也是用於後面對cdb表的操作
    cdbm1, cdbm2 = "shidu", "wendu"
    cdmc1, cdmc2 = "溼度", "溫度"

    while True:
        total_wendu,total_shidu=0,0
        list_wendu = []
        list_shidu = []
        global inst_wendu   #瞬時溫度
        global inst_shidu   #瞬時溼度 
        global avg_wendu    #平均溫度
        global avg_shidu    #平均溼度
        global max_wendu    #最大溫度
        global max_shidu    #最大溼度
        global min_wendu    #最小溫度
        global min_shidu    #最小溼度
        n =0
        begin_time = time.time()     #開始讀取是的時間戳
        while True:
            sk.send(codecs.decode(inp, 'hex'))
            try:
                result = sk.recv(1024)
            except socket.timeout:
                print('timeout')
            crc=crc16.crc16(result[:-2])     #進行CRC16-2校驗
            if crc==result[-2:]:
                print('CRC16成功')
            shidu,wendu=struct.unpack('>hh',result[3:7])    #用struct方法解析溼度和溫度,看不懂可以自行百度瞭解struct.unpack的用法
            shidu/=100
            wendu/=100
            #比較上一次的溫溼度的波動
            with open("last_data",'r') as r:
                last_data =  r.read()
            if not last_data:  #如果last_data.txt中沒有資料,就把當前溫室度作為上一次溫溼度進行比較
                last_data = str(wendu)+'%'+str(shidu)   #字串拼接
            last_wendu,last_shidu = last_data.split("%")   #字串以%拆分,得到溫溼度
            last_wendu,last_shidu = float(last_wendu),float(last_shidu)   #轉化為float型數值
            if(abs(shidu-last_shidu)>5 or abs(wendu-last_wendu>5)):   #比較當前溫溼度與上一次溫溼度差異,如果波動大於5就過濾掉
                print(abs(shidu-last_shidu),abs(wendu-last_wendu))
                print("數字波動太大,未記錄")
                continue
            else:
                # 將當前溫度溼度寫入文件作為記錄
                data = str(wendu) + "%" + str(shidu)
                with open("last_data", 'w') as f:
                    f.write(data)
                n += 1
                list_shidu.append(shidu)    
                list_wendu.append(wendu)
                total_wendu += wendu    #對每一次溫度進行累加
                total_shidu +=shidu     #對每一次溼度進行累加   
                end_time = time.time()    #獲取結束時的時間戳
                if(end_time-begin_time>5):    #如果如果兩次間隔大於五秒,停止讀取資料,開始對資料進行處理
                    inst_wendu = list_wendu[-1]    #陣列最後一個就是當時的瞬時值
                    inst_shidu = list_shidu[-1]
                    list_wendu.sort()            #對陣列進行排序 從小到大
                    list_shidu.sort()
                    avg_wendu = total_wendu/n    #求平均值
                    avg_shidu = total_shidu/n
                    max_wendu = list_wendu[-1]    #此時陣列最後一個對最大值
                    max_shidu = list_shidu[-1]
                    min_wendu = list_wendu[0]    #陣列第一個為最小值
                    min_shidu = list_shidu[0]
                    time.sleep(0.1)
                    break
                else:  #時間沒有大於五秒則繼續讀取資料
                    continue
        cur_time = datetime.datetime.now()    #記錄處理完成的時間
        print("最大溫度%s,最小溫度%s,平均溫度%s,瞬時溫度%s" %(max_wendu,min_wendu,avg_wendu,inst_wendu),cur_time)
        print("最大溼度%s,最小溼度%s,平均溼度%s,瞬時溼度%s" % (max_shidu, min_shidu, avg_shidu, inst_shidu),cur_time)
        print("開始向資料庫傳入資料...")
        gcxmb_data = gcxmb.fetch()     #先對gcxmb表進行查詢,如果存在資料,則進行下一步,如果不存在就插入資料
        if not gcxmb_data:
            try:
                gcxmb.insert(gcxmbm,gcxmmc)
            except Exception:
                print("工程專案表插入失敗")
        cdb_data = cdb.fetchone()    #對cdb表進行查詢,存在資料則跳過,不存在則插入資料
        if not cdb_data:
            try:
                cdb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,cdmc=cdmc1,cgqbm="03",jldw="RH",sflb=1)
                cdb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, cdmc=cdmc2, cgqbm="03", jldw="攝氏度", sflb=1)
            except Exception:
                print("測點表插入失敗 ")
        try:    #插入處理後的資料
            cdsjb.insert(gcxmbm=gcxmbm,cdbm=cdbm1,clsj=cur_time,avg=avg_shidu,max=max_shidu,min=min_shidu,inst=inst_shidu)
            cdsjb.insert(gcxmbm=gcxmbm, cdbm=cdbm2, clsj=cur_time,avg=avg_wendu,max=max_wendu,min=min_wendu,inst=inst_wendu)
            print("傳入成功")
        except:
            print("測點資料表插入失敗")
    sk.close()
main()

執行截圖:




注意:

    1.main.py和maincopy.py只需要執行其中一個,last_data.txt最開始為空。

    2.執行時注意修改配置資訊

    3.程式有時候可能有bug,多執行幾次就好了。

    4.所有py檔案建立在一個名為Brige的包下面,也可以自己修改