Tkinter開發采用串口通信的上位機軟件(3)
阿新 • • 發佈:2019-01-02
main daemon mon 功能 creat 使用 進行 結果 sid
本博客的所有原創文章采用創作公用版協議。要求署名、非商業用途和保持一致。要求署名必須包含我的網名(geokai)以及文章來源(選擇博客首地址或者具體博文地址)。
商業性使用須預先征得本人同意(發送Email到 [email protected])
18年下半年太忙了,直接停止軟件的開發計劃。在18年最後幾天使用python自帶的Tkinter框架簡單的先把軟件功能實現出來了。占時把這一期的標題改成Tkinter開發上位機軟件。
先說一下軟件實現的功能把
1)獲取二氧化碳傳感器探頭的數據,使用到pyserial,crcmod庫
2)使用matplolib進行實時繪圖,使用到matplotlib庫
3)定時將數據回傳到郵箱,使用到email,smtplib庫
這裏只放出最核心部分的代碼
導入關鍵的庫
#導入數值GUI框架 import tkinter as tk from tkinter import scrolledtext #導入繪圖包 from matplotlib.backends.backend_tkagg import ( FigureCanvasTkAgg, NavigationToolbar2Tk) # Implement the default Matplotlib key bindings. #from matplotlib.backend_bases import key_press_handler from matplotlib.figure import Figure import matplotlib.dates as mdates #導入數學計算包 import pandas as pd import numpy as np #導入系統包 import threading import time from datetime import datetime,timedelta import serial.tools.list_ports import crcmod #導入網絡包,郵件發送 import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.header import Header
GUI框架,其中包括繪圖部分
#定義GUI界面及功能 class Application(tk.Tk): def __init__(self): ‘‘‘初始化‘‘‘ self.createWidgets() def createWidgets(self): ‘‘‘設置繪圖區‘‘‘ self.fig = Figure(figsize=(10,5), dpi=100) self.ax_co2 = self.fig.add_subplot(1,1,1) self.canvas = FigureCanvasTkAgg(self.fig, master=self) self.canvas.get_tk_widget() self.canvas._tkcanvas.place(x=0, y=0, width=1024, height=350)#pack(side=tk.TOP, fill=tk.BOTH, expand=1) ‘‘‘設置二氧化碳的數據接收區‘‘‘ self.log_co2 = scrolledtext.ScrolledText(self, font=("Calibri", 8), background=‘#ffffff‘) self.log_co2.place(x=720, y=450, width=300, height=60) self.log_co2.insert(tk.END,‘Strat\r\n‘) self.log_co2_neat = scrolledtext.ScrolledText(self, font=("Calibri", 10), background=‘#ffffff‘) self.log_co2_neat.place(x=720, y=530, width=300, height=100) self.log_co2_neat.insert(tk.END,‘Strat\r\n‘) self.log_co2_col_name=tk.Label(bg=‘gray‘, font=("Calibri", 10), justify=tk.LEFT, anchor=tk.W, text=‘Time\t\tCO2(ppm)‘) self.log_co2_col_name.place(x=720, y=510, width=300, height=20) ‘‘‘設置按鈕區‘‘‘ self.bt_connect_str = tk.StringVar() if self.trans_data_status==False: self.bt_connect_str.set(‘開始傳輸數據‘) else: self.bt_connect_str.set(‘停止傳輸數據‘) self.bt_connect = tk.Button(self, textvariable=self.bt_connect_str, command=self.ActivateTrans) self.bt_connect.pack(side=tk.LEFT , anchor=tk.S) self.Draw() # 繪圖 ‘‘‘設置狀態區‘‘‘ self.lb_port_co2_status=tk.Label(bg=‘red‘, width=10, height=1, text=‘CO2 Status‘) self.lb_port_co2_status.pack(side=tk.RIGHT , anchor=tk.S) def ActivateTrans(self): ‘‘‘ 點擊數據傳輸按鈕後激活數據傳輸 1)激活一個從串口獲取二氧化碳數據的線程 3)激活郵件發送線程 4)運行以上三個線程,並判斷是否正確連接串口,並顯示串口連接狀態 ***註意以上三個線程的功能較為復雜,使用了單獨的Thread類進行了繼承,因此停止線程 采用定義在類裏面的Stop()方法 5)激活文本數據刷新線程 6)激活繪圖區刷新線程 ***以上兩個線程僅有單獨的函數並且封裝在窗體類下,直接采用Threading類進行定義,所 以需要註意停止需采用threading.Event()方法進行停止 ‘‘‘ if self.trans_data_status==False: self.trans_thread_co2 = Thread_CO2(0, "Thread_CO2_1") self.trans_thread_co2.setDaemon(True) self.trans_thread_co2.start() if self.trans_thread_co2.port_available==True: self.lb_port_co2_status.config(bg=‘green‘) print(‘CO2 Port Failed to Connect‘) self.email_thread_event = threading.Event() self.email_thread = threading.Thread(target = SendEmail, args=(EMAIL_RESEND_INTERVAL, self.email_thread_event)) self.email_thread.start() self.refresh_thread_event = threading.Event() self.refresh_thread = threading.Thread(target = self.RefreshThread, args=(1, self.refresh_thread_event)) self.refresh_thread.start() self.redraw_thread_event = threading.Event() self.redraw_thread = threading.Thread(target = self.ReDrawThread, args=(10, self.redraw_thread_event)) self.redraw_thread.start() self.trans_data_status=True self.bt_connect_str.set(‘停止傳輸數據‘) else: self.trans_thread_co2.Stop() self.lb_port_co2_status.config(bg=‘red‘) self.lb_port_hg_status.config(bg=‘red‘) self.trans_data_status=False self.bt_connect_str.set(‘開始傳輸數據‘) self.email_thread_event.set() self.refresh_thread_event.set() self.redraw_thread_event.set() # self.email_thread.join(0) def RefreshThread(self, time_interval, stop_event): ‘‘‘ 原始數據刷新程序 ‘‘‘ while(not stop_event.is_set() ): print(‘refresh‘) self.RefreshDataText() #pinrt(time_interval) time.sleep(time_interval) def RefreshDataText(self): ‘‘‘ 判斷是否有新的數據並顯示在文本框中 ‘‘‘ text = self.log_co2.get(0.0, tk.END).splitlines() # print(raw_trans_data_co2) if len(raw_trans_data_co2)>0 : if raw_trans_data_co2[-1]==text[-2]: pass else: self.log_co2.insert(tk.END, raw_trans_data_co2[-1]+‘\r\n‘) self.log_co2.see(tk.END) #print(np.array(compiled_data_co2.iloc[-1])) if len(compiled_data_co2)>0: one_data = np.array(compiled_data_co2.iloc[-1]) #print(one_data) one_data = str(one_data[0]) + ‘\t\t‘ + str(one_data[1]) text = self.log_co2_neat.get(0.0, tk.END).splitlines() if one_data==text[-2]: print(‘same‘) else: self.log_co2_neat.insert(tk.END, one_data + ‘\r\n‘) self.log_co2_neat.see(tk.END) def AdjustScale(self,_): ‘‘‘ 調整繪圖區坐標軸範圍 ‘‘‘ def ReDrawThread(self, time_interval, stop_event): ‘‘‘ 繪圖區刷新程序 ‘‘‘ while(not stop_event.is_set()): try: self.Draw() except: pass time.sleep(time_interval) def Draw(self): ‘‘‘ 實時繪圖程序 TODO: 1)個人認為使用matplotlib的這種繪圖方式效率有些底下,是否采用諸如Animation的 動態繪圖功能改善繪圖性能有待檢驗 ‘‘‘ #判斷是否有有效數據 if len(compiled_data_co2)>0 or len(compiled_data_hg)>0: #由於二氧化碳數據量太大,選擇最後16000條數據,足夠保證最大3天的顯示量,降低繪圖負擔 #註意原始數據中時間數據最好經過to_datetime函數規整一遍,以免造成數據錯誤 co2_x_data = pd.to_datetime(compiled_data_co2.iloc[-16000:,0]) co2_y_data = compiled_data_co2.iloc[-16000:,1] # co2_xlim_min = datetime.strptime(co2_x_data.iloc[-1], ‘%Y-%m-%d %H:%M:%S‘) - self.fig_xlim_delta co2_xlim_min = co2_x_data.iloc[-1] - self.fig_xlim_delta print( co2_y_data.min()) self.ax_co2.clear() self.ax_co2.xaxis.set_major_formatter(mdates.DateFormatter(‘%m-%d\n%H:%M‘)) self.ax_co2.xaxis.set_major_locator(mdates.AutoDateLocator()) self.ax_co2.scatter(co2_x_data.values,co2_y_data.values, s = 1, c=‘green‘) self.ax_co2.set_xlim(co2_xlim_min,co2_x_data.iloc[-1]+ self.fig_xlim_delta/9) self.ax_co2.set_ylim(co2_y_data.min(), co2_y_data.min()+self.fig_co2_ylim_delta) self.ax_co2.set_ylabel(‘$CO_2(ppm)$‘) self.ax_co2.grid(linestyle=‘--‘) self.fig.savefig(‘D:/figure.png‘) self.canvas.draw() def _quit(self): ‘‘‘退出‘‘‘ if self.trans_data_status==True: self.ActivateTrans() self.quit() # 停止 mainloop self.destroy() # 銷毀所有部件
二氧化碳數據的傳輸
class Thread_CO2 (threading.Thread): ‘‘‘ 接收CO2數據的線程 該CO2探頭的購買鏈接https://m.tb.cn/h.3phrPcr ‘‘‘ def __init__(self, threadID, name): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.port_available = False com_list = serial.tools.list_ports.comports() for port in com_list: print(port.device) #返回端口號如COM3 print(port.description) #返回設備名字 print(port.pid) #返回設備在計算機上的位置 if port.pid==29987: port_num_co2=port.device self.port_available = True self.ser_co2=serial.Serial() if self.port_available==True: self.ser_co2.port=port_num_co2 self.ser_co2.baudrate=19200 self.ser_co2.parity=serial.PARITY_EVEN self.ser_co2.timeout=0.5 self.__runing_flag=True def run(self): print ("開始線程:" + self.name) if self.port_available==False: if DEBUG_MODE==True: while(self.__runing_flag): self.FakeData() time.sleep(1) return 1 if not self.ser_co2.is_open: self.ser_co2.open() print(self.ser_co2) while(self.__runing_flag): self.GetData() time.sleep(CO2_REFRESH_INTERVAL) def GetData(self): ‘‘‘ 獲取CO2數據需要註意進制的轉換,以及最終的CRC16校驗 CRC的校驗使用CRCMOD庫,不同類型的CRC均可以采用此庫進行計算 其中特別要關註poly這個參數,參考http://www.ip33.com/crc.html 在該網站查詢CRC多項式,並在開頭補1 ‘‘‘ retry_time= 10 while(retry_time): request_code_co2=[] #首先配置需要發送的信息,Serial庫接收直接以0-255的int值 #因此需要將16進制字符串轉換為10進制整數 for i in ‘15 04 13 8B 00 01 46 70 ‘.split(): request_code_co2.append(int(i,16)) # print(request_code_co2) if not self.ser_co2.is_open: self.ser_co2.open() self.ser_co2.write(request_code_co2) #獲取的格式為b‘‘,byte型 temp =self.ser_co2.readline() #print(temp) #定義CRC,並計算CRC crc16_func = crcmod.Crc(poly=0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000) crc16_func.update(temp[:-2]) co2_conc=[] #CRC的計算結果為hex型,采用bytes.fromhex()轉換為byte再與傳輸的最後兩位byte對比,註意順序 #如果獲取成功就退出,沒有成功則重復,最多10次 #TODO:如果多詞未獲取成功,未來需要加入一個錯誤信息日誌 if bytes.fromhex(crc16_func.hexdigest()) == temp[-2:][::-1]: print(temp[-4:-2]) co2_conc = (int.from_bytes(temp[-4:-2], byteorder=‘big‘, signed=False)) break retry_time-=1 #raw_trans_data_co2用來顯示文本信息,需要將DateTime和獲取的16進制值轉換成str類型,否則文本框無法顯示 #TODO:這個語句應該可以優化 co2_one_data_raw = str([temp.hex()[x*2:x*2+2] for x in range(len(temp.hex())//2) ]) co2_one_data_time = (datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)) raw_trans_data_co2.append(str(co2_one_data_time) + co2_one_data_raw) try: if co2_conc>=0: compiled_data_co2.loc[len(compiled_data_co2)] = [co2_one_data_time, co2_conc] except: pass with open(‘D:/raw_data_CO2.txt‘, mode=‘a‘) as f: f.write(str(co2_one_data_time)) f.write(‘\t‘) f.write(str(co2_conc)) f.write(‘\r\n‘) #print(compiled_data_co2) def FakeData(self): #產生偽數據 def Stop(self): if self.ser_co2.is_open and self.port_available==True: self.ser_co2.close() print(self.ser_co2) self.__runing_flag=False
Tkinter開發采用串口通信的上位機軟件(3)