1. 程式人生 > 其它 >Python實現CAN匯流排J1939報文接收、傳送

Python實現CAN匯流排J1939報文接收、傳送

一、環境搭建

1.概述

本文主要是通過Python3實現CAN總線上J1939報文接收、傳送等功能,通過模擬單幀、多幀實現定期傳送報文等模擬場景。

2.CAN工具

本案例採用的是PCAN-USB工具

PCAN-USB驅動:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

3.Python安裝

下載地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

庫:pip install can-j1939

二、專案演示

1.J1939報文接收

2.J1939報文傳送

三、完整程式碼

#!/usr/bin/python
# _*_ coding:utf-8 _*_

import time, j1939, threading


def on_message(priority, pgn, sa, timestamp, data):
    """接收來自匯流排的訊息"""
    print(priority, pgn, sa, timestamp, [data[i] for i in range(len(data))])  # 列印訊息


def recv_j1939(read_time):
    """訂閱匯流排訊息"""
    ecu.subscribe(on_message)
    time.sleep(read_time)
    ecu.unsubscribe(on_message)  
# 停止訂閱訊息 def cycle_send_j1939(): """傳送j1939報文""" j1939_loog_data = [{'cycle': 1000, "pgn": 65251, "dp": 0, "pf": 254, "ps": 227, "p": 6, "sa": 0, "data": [0, 25, 132, 112, 63, 137, 64, 31, 133, 128, 37, 133, 32, 53, 136, 155, 66, 255, 255,
164, 3, 0, 0,60, 70, 213, 125, 133, 155, 66, 134, 0, 255, 255, 255, 255, 255, 255, 255,255]}] # j1939長幀報文 j1939_data = [{'cycle': 250, "can_id": 0x18FEDF00, 'pgn': 65247, "data": [130, 128, 37, 125, 251, 255, 255, 240]}, {'cycle': 50, "can_id": 0x18F00400, "pgn": 61444, "data": [14, 189, 130, 0, 0, 0, 0, 125]}, {'cycle': 100, "can_id": 0x18FE9200, "pgn": 65170,"data": [255, 255, 254, 255, 255, 255, 0, 0]}] # j1939短幀報文 for i in j1939_loog_data: i['currenttime'] = int(round(time.time() * 1000)) for i in j1939_data: i['currenttime'] = int(round(time.time() * 1000)) while True: currenttime = int(round(time.time() * 1000)) for i in j1939_loog_data: interval = currenttime - i['currenttime'] if interval >= i['cycle']: ecu.send_pgn(data_page=i["dp"], pdu_format=i["pf"], pdu_specific=i["ps"], priority=i["p"], src_address=i["sa"], data=i["data"]) # 傳送長幀報文 print('PGN%d:長幀報文傳送成功' % i['pgn']) i['currenttime'] = currenttime # 更新發送時間 for i in j1939_data: interval = currenttime - i['currenttime'] if interval >= i['cycle']: ecu.send_message(can_id=i["can_id"], data=i["data"]) # 傳送短幀報文 print('PGN%d:短幀報文傳送成功' % i['pgn']) i['currenttime'] = currenttime # 更新發送時間 if __name__ == '__main__': print("初始化匯流排") ecu = j1939.ElectronicControlUnit() # 建立ECU ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=500000) # 連線CAN匯流排 # s1 = threading.Thread(target=recv_j1939, name="接收J1939訊息執行緒",args=(10,)) # s1.start() s2 = threading.Thread(target=cycle_send_j1939, name="傳送J1939訊息執行緒") s2.start() # print('取消初始化') # ecu.disconnect() #斷開匯流排連線
—————————————————————————————— 選擇正確的事、再把事做正確 ——————————————————————————————