1. 程式人生 > 其它 >python-can庫基於PCAN-USB使用方法

python-can庫基於PCAN-USB使用方法

一、概述

1.介紹

python-can庫為Python提供了控制器區域網的支援,為不同的硬體裝置提供了通用的抽象,並提供了一套實用程式,用於在CAN總線上傳送和接收訊息。

支援硬體介面:

Name

Documentation

"socketcan"

SocketCAN

"kvaser"

Kvaser’s CANLIB

"serial"

CAN over Serial

"slcan"

CAN over Serial / SLCAN

"ixxat"

IXXAT Virtual CAN Interface

"pcan"

PCAN Basic API

"usb2can"

USB2CAN Interface

"nican"

NI-CAN

"iscan"

isCAN

"neovi"

neoVI

"vector"

Vector

"virtual"

Virtual

"canalystii"

CANalyst-II

"systec"

SYSTEC interface

2.環境搭建

Python安裝:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

PCAN-USB驅動:

https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

庫:pip install python-can

3.參考文件

https://python-can.readthedocs.io/en/master/#

二、常用方法

1.接收報文

from can.interfaces.pcan.pcan import PcanBus


def bus_recv():
    """輪詢接收訊息"""
    try:
        while True:
            msg = bus.recv(timeout=100)
            
print(msg) except KeyboardInterrupt: pass if __name__ == '__main__': bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_recv()

2.傳送報文

from can.interfaces.pcan.pcan import PcanBus


def bus_send():
    """can訊息傳送"""
    while True:
        time.sleep(0.02)
        try:
            bus.send(msg)
            print("訊息傳送 {}".format(bus.channel_info))
        except can.CanError:
            print("訊息未傳送")


if __name__ == '__main__':
    msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE],
                      is_extended_id=True)  # 報文
    bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000)
    bus_send()

3.定期傳送報文

def bus_send_periodic():
    """週期傳送報文"""
    print("開始每200毫秒傳送一條訊息。持續時間10s")
    task = bus.send_periodic(msg, 1.5)  # 定期傳送
    if not isinstance(task, can.ModifiableCyclicTaskABC):  # 斷言task型別
        print("此介面似乎不支援")
        task.stop()
        return
    time.sleep(5)  # 持續時間
    print("傳送完成")
    print("更改執行任務的資料以99開頭")
    msg.data[0] = 0x99
    task.modify_data(msg)  # 修改data首位元組
    time.sleep(10)

    task.stop()
    print("停止迴圈傳送")
    print("將停止任務的資料更改為單個 ff 位元組")
    msg.data = bytearray([0xff])  # 重新定向data
    msg.dlc = 1  # 定義data長度
    task.modify_data(msg)  # 修改data
    time.sleep(10)
    print("重新開始")
    task.start()  # 重新啟動已停止的週期性任務
    time.sleep(10)
    task.stop()
    print("完畢")

if __name__ == '__main__':
    msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE],
                      is_extended_id=True)  # 報文
    bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000)
    bus_send_periodic()

4.迴圈收發訊息

from can.interfaces.pcan.pcan import PcanBus


def send_cyclic(stop_event):
    """迴圈傳送訊息"""
    print("開始每1秒傳送1條訊息")
    start_time = time.time()
    while not stop_event.is_set():
        msg.timestamp = time.time() - start_time
        bus.send(msg)
        print(f"tx: {msg}")
        time.sleep(1)
    print("停止傳送訊息")


def receive(stop_event):
    """迴圈接收訊息"""
    print("開始接收訊息")
    while not stop_event.is_set():
        rx_msg = bus.recv(1)
        if rx_msg is not None:
            print(f"rx: {rx_msg}")
    print("停止接收訊息")


def send_and_recv_msg():
    """傳送一個訊息並接收一個訊息,需要雙通道CAN"""
    stop_event = threading.Event()
    t_send_cyclic = threading.Thread(target=send_cyclic, args=(stop_event,))
    t_receive = threading.Thread(target=receive, args=(stop_event,))
    t_receive.start()
    t_send_cyclic.start()
    try:
        while True:
            time.sleep(0)  # yield
    except KeyboardInterrupt:
        pass  # 正常退出
    stop_event.set()
    time.sleep(0.5)


if __name__ == '__main__':
    msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE],
                      is_extended_id=True)  # 報文
    bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000)
    send_and_recv_msg()
—————————————————————————————— 選擇正確的事、再把事做正確 ——————————————————————————————