python-can庫基於PCAN-USB使用方法
阿新 • • 發佈:2022-03-04
一、概述
1.介紹
python-can庫為Python提供了控制器區域網的支援,為不同的硬體裝置提供了通用的抽象,並提供了一套實用程式,用於在CAN總線上傳送和接收訊息。
支援硬體介面:
Name |
Documentation |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2.環境搭建
Python安裝:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe
PCAN-USB驅動: https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip
庫:pip install python-can
3.參考文件
https://python-can.readthedocs.io/en/master/#
二、常用方法
1.接收報文
from can.interfaces.pcan.pcan import PcanBus def bus_recv(): """輪詢接收訊息""" try: while True: msg = bus.recv(timeout=100)print(msg) except KeyboardInterrupt: pass if __name__ == '__main__': bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_recv()
2.傳送報文
from can.interfaces.pcan.pcan import PcanBus def bus_send(): """can訊息傳送""" while True: time.sleep(0.02) try: bus.send(msg) print("訊息傳送 {}".format(bus.channel_info)) except can.CanError: print("訊息未傳送") if __name__ == '__main__': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 報文 bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_send()
3.定期傳送報文
def bus_send_periodic(): """週期傳送報文""" print("開始每200毫秒傳送一條訊息。持續時間10s") task = bus.send_periodic(msg, 1.5) # 定期傳送 if not isinstance(task, can.ModifiableCyclicTaskABC): # 斷言task型別 print("此介面似乎不支援") task.stop() return time.sleep(5) # 持續時間 print("傳送完成") print("更改執行任務的資料以99開頭") msg.data[0] = 0x99 task.modify_data(msg) # 修改data首位元組 time.sleep(10) task.stop() print("停止迴圈傳送") print("將停止任務的資料更改為單個 ff 位元組") msg.data = bytearray([0xff]) # 重新定向data msg.dlc = 1 # 定義data長度 task.modify_data(msg) # 修改data time.sleep(10) print("重新開始") task.start() # 重新啟動已停止的週期性任務 time.sleep(10) task.stop() print("完畢") if __name__ == '__main__': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 報文 bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_send_periodic()
4.迴圈收發訊息
from can.interfaces.pcan.pcan import PcanBus def send_cyclic(stop_event): """迴圈傳送訊息""" print("開始每1秒傳送1條訊息") start_time = time.time() while not stop_event.is_set(): msg.timestamp = time.time() - start_time bus.send(msg) print(f"tx: {msg}") time.sleep(1) print("停止傳送訊息") def receive(stop_event): """迴圈接收訊息""" print("開始接收訊息") while not stop_event.is_set(): rx_msg = bus.recv(1) if rx_msg is not None: print(f"rx: {rx_msg}") print("停止接收訊息") def send_and_recv_msg(): """傳送一個訊息並接收一個訊息,需要雙通道CAN""" stop_event = threading.Event() t_send_cyclic = threading.Thread(target=send_cyclic, args=(stop_event,)) t_receive = threading.Thread(target=receive, args=(stop_event,)) t_receive.start() t_send_cyclic.start() try: while True: time.sleep(0) # yield except KeyboardInterrupt: pass # 正常退出 stop_event.set() time.sleep(0.5) if __name__ == '__main__': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 報文 bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) send_and_recv_msg()—————————————————————————————— 選擇正確的事、再把事做正確 ——————————————————————————————