基於無線傳輸的溫溼度採集系統上位機開發
阿新 • • 發佈:2018-11-19
執行環境
Linux系統,Python語言
實現功能
從微控制器串列埠接收採集到的溫度和溼度,將資料存到資料庫中,並實時顯示在折線圖上。
使用軟體
Pycharm
使用python庫
資料庫層:pymysql
資料視覺化層:matplotlib
串列埠通訊層:pyserial
實現流程
串列埠通訊模組
串列埠所在位置:"/dev/ttyUSB0"
波特率:9600
超時時間:timeout=None,由於是實時監測,所以將timeout設定為None,只有資料傳輸過來的時候才執行後面程式。
傳輸位元組:bytesize=8
import random import serial import time from Humiture1.WriteData import Mysqlclass class Serailset: def __init__(self): self.mc=Mysqlclass() self.ser=serial.Serial("/dev/ttyUSB0",9600,timeout=None,bytesize=8)//初始化配置 #關閉串列埠 def close(self): self.ser.close() #讀取串列埠資料,每次讀取一行 def read(self): return self.ser.readline() if __name__ == '__main__': ser=Serailset() print(ser.insertMysql()) ser.close()
資料庫層
使用pymysql模組與資料庫進行互動,將資料庫操作封裝成一個類,方便操作。
import pymysql as mysql import time import random #連線資料庫 db_config = { 'host':'localhost', 'user':'root', 'passwd':'', 'db':'Data', 'port':3306, 'charset':'utf8' } class Mysqlclass: #初始化,連線資料庫,建立遊標 def __init__(self): try: self.db_config=db_config self.conn=mysql.connect(**self.db_config) self.cur=self.conn.cursor() except Exception as e: print("連線資料庫失敗") else: print("連線資料庫成功") #關閉連線,關閉遊標 def close(self): self.cur.close() self.conn.close() #往A資料表存入資料 def insertsql_A(self,list): li=[] li.append(tuple(list)) insert_sql='INSERT INTO humitures_A(measuring_time,temperature,humidity) VALUES (%s,%s,%s)' self.cur.executemany(insert_sql,li) last_id_A=int(self.cur.lastrowid) self.conn.commit() return last_id_A #往B資料表存入資料 def insertsql_B(self, list): li = [] li.append(tuple(list)) insert_sql = 'INSERT INTO humitures_B(measuring_time,temperature,humidity) VALUES (%s,%s,%s)' self.cur.executemany(insert_sql, li) last_id_B=int(self.cur.lastrowid) self.conn.commit() return last_id_B #讀取資料表中最大行,方便訪問最新插入的資料 def read_maxcount(self,site): self.cur.execute('select count(*) from humitures_%s' %(site)) return self.cur.fetchall()[0][0] #讀取資料 def readData_A(self,after): time.sleep(0.5) self.cur.execute('select * from humitures_A where id between %s and %s' %(after-10,after)) self.conn.commit() return self.cur.fetchall() def readData_B(self,after): time.sleep(0.5) self.cur.execute('select * from humitures_B where id between %s and %s' %(after-10,after)) self.conn.commit() return self.cur.fetchall()
資料視覺化
import matplotlib.pyplot as plt from Humiture1.WriteData import Mysqlclass import numpy as np import time class PictureShow: def __init__(self): self.mc = Mysqlclass() self.fig = plt.figure() plt.ion() def showA(self,last_id_A): ax1=plt.subplot(211) #實現多幅圖建立 plt.grid(True) after=last_id_A tuple=self.mc.readData_A(after) x1_datelist = [str(i[1])[11:] for i in tuple] y1_tmplist=[i[2] for i in tuple] y2_humlist=[i[3] for i in tuple] ax1.plot(x1_datelist,y1_tmplist,c='red') ax1.set_title("Site_A_Temperature") ax1.set_ylabel('Temperature(C)', fontsize=16) ax2=ax1.twinx() #繫結ax1和ax2實現雙Y軸折線圖 ax2.plot(x1_datelist, y2_humlist, c='blue') ax2.set_xlabel('Time(s)', fontsize=16) ax2.set_ylabel('humiture(%)', fontsize=16) def showB(self,last_id_B): ax3=plt.subplot(212) plt.grid(True) after = last_id_B tuple=self.mc.readData_B(after) x2_datelist = [str(i[1])[11:] for i in tuple] y3_tmplist = [i[2] for i in tuple] y4_humlist = [i[3] for i in tuple] ax3.plot(x2_datelist, y3_tmplist, c='red') ax3.set_title("Site_B_Temperature") ax3.set_ylabel('Temperature(C)', fontsize=16) ax4 = ax3.twinx() ax4.plot(x2_datelist, y4_humlist, c='blue') ax4.set_xlabel('Time(s)', fontsize=16) ax4.set_ylabel('humiture(%)', fontsize=16) self.fig.autofmt_xdate() def showWindow(self,last_id_A,last_id_B): plt.clf() # 清除畫布 self.showA(last_id_A) self.showB(last_id_B) plt.pause(0.001)
主操作
從串列埠中讀取資料,判斷A、B兩地的資料,A地的資訊存入資料庫中的A資料表,B地的資訊存入資料庫中B地的資料表。每插入一條資訊折線圖模組從資料庫中讀取一條最新插入的資料,在折線圖上進行顯式。
import time
from Humiture1.WriteData import Mysqlclass
from Humiture1.showpicture import PictureShow
from Humiture1.Serailset import Serailset
class Manager:
def __init__(self):
self.mc=Mysqlclass()
self.ps=PictureShow()
self.ser=Serailset()
def insertMysqlandshow(self):
while True:
infotuple = []
info = str(self.ser.read())
date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
infotuple.append(date)
infotuple.append(info[8:10])
infotuple.append(info[15:17])
last_id_A = int(self.mc.read_maxcount("A"))
last_id_B = int(self.mc.read_maxcount("B"))
if info[2] == "A":
self.mc.insertsql_A(infotuple)
self.ps.showWindow(last_id_A, last_id_B)
print(info)
elif info[2] == "B":
self.mc.insertsql_B(infotuple)
self.ps.showWindow(last_id_A, last_id_B)
print(info)
if __name__ == '__main__':
mg=Manager()
mg.insertMysqlandshow()