1. 程式人生 > 其它 >Python3+PCAN-USB基於PCAN-Basic二次開發實現CAN匯流排訊息讀寫

Python3+PCAN-USB基於PCAN-Basic二次開發實現CAN匯流排訊息讀寫

一、環境搭建

1.概述

本文主要是通過Python3與CAN匯流排工具PCAN-USB基於官方PCAN-Basic庫實現CAN匯流排訊息讀寫功能。

2.PCANBasic.py和PCANBasic.dll下載地址

https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

3.Python安裝

下載地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

二、專案展示

1.檔案目錄

2.CAN訊息讀取截圖

3.CAN訊息傳送截圖

三、完整程式碼

#!/usr/bin/python
# _*_ coding:utf-8 _*_ from PCANBasic import * from queue import * import threading, time, os class PcanOperate(PCANBasic, threading.Thread): def __init__(self): super().__init__() # 繼承父類的init方法 result = self.Initialize(PCAN_USBBUS1, PCAN_BAUD_500K) # 匯流排初始化 if result != PCAN_ERROR_OK:
print(self.GetErrorText(result)) # 發生錯誤,獲取錯誤資訊 else: print("PCAN-USB 已初始化") def ProcessMessage(self, msg, timestamp): """CAN訊息處理方法""" msg_dict = {} msg_id = hex(msg.ID) if len(msg_id[2:]) == 7: msg_dict["CANID"] = '0' + msg_id[2:].upper()
else: msg_dict["CANID"] = msg_id[2:].upper() msg_dict["MSGTYPE"] = msg.MSGTYPE msg_dict["LEN"] = msg.LEN data = '' for i in range(8): if len(hex(msg.DATA[i])[2:]) == 1: d = ' ' + '0' + hex(msg.DATA[i])[2:].upper() else: d = ' ' + hex(msg.DATA[i])[2:].upper() data += d msg_dict["DATA"] = data[1:] timestamp_dict = {} timestamp_dict['millis'] = timestamp.millis timestamp_dict['millis_overflow'] = timestamp.millis_overflow timestamp_dict['micros'] = timestamp.micros time_sum = timestamp.micros + 1000 * timestamp.millis + 0x100000000 * 1000 * timestamp.millis_overflow return msg_dict def PutQueue(self): """從匯流排中讀取CAN訊息及其時間戳,並放入佇列。""" while True: """檢查接收佇列是否有新訊息""" readResult = self.GetStatus(PCAN_USBBUS1) if readResult != PCAN_ERROR_OK: result = self.GetErrorText(readResult) print(result[1]) event2.set() else: readResult = self.Read(PCAN_USBBUS1) # 接收匯流排報文 time.sleep(0.01) if readResult[0] != PCAN_ERROR_QRCVEMPTY: print("訊息入隊") q.put(self.ProcessMessage(readResult[1], readResult[2])) # 訊息入隊 event1.set() else: result = self.GetErrorText(readResult[0]) print(result[1]) event2.set() break def GetQueue(self): """從佇列中獲取資訊""" while True: if event2.is_set(): break if not q.empty(): event1.wait() print(q.get()) # 訊息出隊 def GetXmtMsg(self): """獲取xmt檔案中需要傳送的訊息""" with open(os.path.join(os.getcwd(), 'zuidazhi.xmt'), 'r+') as f: fo = f.readlines() allcandata = [] for i, j in enumerate(fo, start=1): onecandata = {} if i > 14: f1 = j.strip().split() if len(f1) == 14: # 獲取長度為8的data onecandata['cycle'] = (int(f1[3])) msg = TPCANMsg() msg.ID = int(f1[1][:-1], 16) msg.MSGTYPE = PCAN_MESSAGE_EXTENDED msg.LEN = int(f1[4]) msg.DATA[0] = int(f1[6][:-1], 16) msg.DATA[1] = int(f1[7][:-1], 16) msg.DATA[2] = int(f1[8][:-1], 16) msg.DATA[3] = int(f1[9][:-1], 16) msg.DATA[4] = int(f1[10][:-1], 16) msg.DATA[5] = int(f1[11][:-1], 16) msg.DATA[6] = int(f1[12][:-1], 16) msg.DATA[7] = int(f1[13][:-1], 16) onecandata['msg'] = msg allcandata.append(onecandata) return allcandata def CanWrite(self): """傳送CAN訊息""" allcandata = self.GetXmtMsg() print(allcandata) if len(allcandata) > 0: for i in allcandata: result = self.Write(PCAN_USBBUS1, i['msg']) currenttime = int(round(time.time() * 1000)) i['currenttime'] = currenttime if result != PCAN_ERROR_OK: result = self.GetErrorText(result) print(result) else: print("時間戳記錄成功") time.sleep(1) while True: currenttime = int(round(time.time() * 1000)) for i in allcandata: interval = currenttime - i['currenttime'] if interval >= i['cycle']: result = self.Write(PCAN_USBBUS1, i['msg']) # if result != PCAN_ERROR_OK: result = self.GetErrorText(result) print(result) else: print("訊息傳送成功") i['currenttime'] = currenttime def __del__(self): result = self.Uninitialize(PCAN_USBBUS1) # 匯流排釋放 if result != PCAN_ERROR_OK: result = self.GetErrorText(result) print(result[1]) else: print("PCAN-USB 已釋放") if __name__ == '__main__': q = Queue() event1 = threading.Event() event2 = threading.Event() pcan = PcanOperate() # s1 = threading.Thread(target=pcan.CanWrite, name="傳送CAN訊息執行緒") # s1.start() s2 = threading.Thread(target=pcan.PutQueue, name="讀取CAN訊息並放入佇列執行緒") s2.start() s3 = threading.Thread(target=pcan.GetQueue, name="從佇列中讀取CAN訊息執行緒") s3.start()
—————————————————————————————— 選擇正確的事、再把事做正確 ——————————————————————————————