使用 Qt 獲取 UDP 數據並顯示成圖片
一個項目,要接收 UDP 數據包,解析並獲取其中的數據,主要根據解析出來的行號和序號將數據拼接起來,然後將拼接起來的數據(最重要的數據是 R、G、B 三個通道的像素值)顯示在窗口中。考慮到每秒鐘要接收的數據包的數量較大,Python 的處理速度可能沒有那麽快,而且之前對 Qt 也比較熟悉了,所以用Qt 作為客戶端接收處理數據包,用近期學習的 Python 模擬發送數據包。
數據格式
在 TCP/IP 協議中,UDP 數據包的大小是由限制的,因此用 UDP 傳輸數據時,還要在 UDP 層上再封裝一層自定義的協議。這個自定義的協議比較簡單,每個 UDP 包的大小為 1432 個字節,分為幾個部分:
部分 | 起始字節 | 字節長度 | 說明 |
---|---|---|---|
Start | 0 | 4 | 包頭部的 Magic Number,設為 0x53746172 |
PartialCnt | 4 | 1 | 分包總數,一個字節(0-255)以內 |
PartialIdx | 5 | 1 | 分包序號 |
SampleLine | 6 | 1 | 采樣率 |
RGB | 7 | 1 | rgb 通道標識符 |
LineIdx | 8 | 4 | 行號,每一行可以包含 RGB 三個通道的數據,每個通道由多個分包組成 |
ValidDataLen | 12 | 4 | 數據部分有效字節數 |
LineBytes | 16 | 4 | 每行數據包含的字節總數 |
Reserve | 20 | 128 | 保留部分 |
Data | 148 | 1280 | 數據部分 |
end | 1428 | 4 | 包尾部的 Magic Number,設為 0x54456e64 |
上述表格描述的就是一個完整的 UDP 包。這裏的一個 UDP 數據包包含的是 RGB 某個通道的某一部分的數據。換種說法:
- 一行數據
- R 通道數據(若幹個分包組成)
- G 通道數據(若幹個分包組成)
- B 通道數據(若幹個分包組成)
所以要生成/解析 UDP 包,最重要的是 PartialCnt、PartialIdx、RGB、LineIdx、Data 這幾個部分。清楚了自定義協議就可以開始編寫模擬包的生成和相應的接收邏輯了。
使用 Python 模擬 UDP 發包
由於本地開發的時候缺少必要的硬件環境,為了方便開發,用 Python 編寫一個簡單的 UDPServer,發送模擬生成的數據包。根據上述協議,可以寫出如下的 CameraData 類來表示 UDP 數據包:
# -*- coding: utf-8 -*-
DATA_START_MAGIC = bytearray(4)
DATA_START_MAGIC[0] = 0x53 # S
DATA_START_MAGIC[1] = 0x74 # t
DATA_START_MAGIC[2] = 0x61 # a
DATA_START_MAGIC[3] = 0x72 # r
DATA_END_MAGIC = bytearray(4)
DATA_END_MAGIC[0] = 0x54 # T
DATA_END_MAGIC[1] = 0x45 # E
DATA_END_MAGIC[2] = 0x6e # n
DATA_END_MAGIC[3] = 0x64 # d
slice_start_magic = slice(0, 4)
slice_partial_cnt = 4
slice_partial_idx = 5
slice_sample_line = 6
slice_rgb_extern = 7
slice_line_idx = slice(8, 12)
slice_valid_data_len = slice(12, 16)
slice_line_bytes = slice(16, 20)
slice_resv = slice(20, 148)
slice_data = slice(148, 1428)
slice_end_magic = slice(1428, 1432)
import numpy as np
class CameraData(object):
def __init__(self):
# self.new()
# self.rawdata = rawdata
self.dataLow = 10
self.dataHigh = 20
self.new()
def genRandomByte(self, by=4):
r = bytearray(by)
for i in range(by):
r[i] = np.random.randint(0, 255)
def setPackageIdx(self, i = 0):
self.rawdata[slice_partial_idx] = i
def setRGB(self, c = 1):
self.rawdata[slice_rgb_extern] = c
def setLineIdx(self, line):
start = slice_line_idx.start
self.rawdata[start+3] = 0x000000ff & line
self.rawdata[start+2] = (0x0000ff00 & line) >> 8
self.rawdata[start+1] = (0x00ff0000 & line) >> 16
self.rawdata[start+0] = (0xff000000 & line) >> 24
def setValidDataLen(self, len):
start = slice_valid_data_len.start
self.rawdata[start+3] = 0x000000ff & len
self.rawdata[start+2] = (0x0000ff00 & len) >> 8
self.rawdata[start+1] = (0x00ff0000 & len) >> 16
self.rawdata[start+0] = (0xff000000 & len) >> 24
def setLineBytes(self, len):
start = slice_line_bytes.start
self.rawdata[start+3] = 0x000000ff & len
self.rawdata[start+2] = (0x0000ff00 & len) >> 8
self.rawdata[start+1] = (0x00ff0000 & len) >> 16
self.rawdata[start+0] = (0xff000000 & len) >> 24
def randomData(self):
size = slice_data.stop - slice_data.start
arr = np.random.randint(self.dataLow, self.dataHigh, size, dtype=np.uint8)
self.rawdata[slice_data] = bytearray(arr)
def new(self):
"""構造新的數據對象
"""
self.rawdata = bytearray(1432)
self.rawdata[slice_start_magic] = DATA_START_MAGIC
self.rawdata[slice_partial_cnt] = 0x02
self.rawdata[slice_partial_idx] = 0x00
self.rawdata[slice_sample_line] = 0x03
self.rawdata[slice_rgb_extern] = 0x01
self.setLineIdx(0x00)
self.setValidDataLen(1280)
self.setLineBytes(1432)
self.randomData()
self.rawdata[slice_end_magic] = DATA_END_MAGIC
def hex(self):
return self.rawdata.hex()
def __repr__(self):
return '<CameraData@{} hex len: {}>'.format(hex(id(self)), len(self.rawdata))
CameraData 中的 rawdata 是一個 bytearray 對象,它將會被 UdpServer 通過網絡接口發送出去。設置 4 個字節大小的整數時(如寫 LineIdx 行號),不能直接將數值賦到 rawdata 中,要將其中的 4 個字節分別賦值到對應的地址上才行。
CameraData 中的 randomData 方法是模擬隨機數據,更好的做法不是完全隨機給每個像素點賦值,而是有規律的變化,這樣在接收數據出現問題、分析問題的時候可以直觀地看到哪裏有問題。
然後我們需要定義一個 UdpServer,用它來將數據對象中包含的信息發送出去。
import socket
class UdpServer( object ):
"""該類功能是處理底層的 UDP 數據包發送和接收,利用隊列緩存所有數據
"""
def __init__(self, *args, **kwargs):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self._sock.bind( ('', DATA_PORT+11 ) )
self._sock.settimeout( None ) # never timeout
# self._sock.setblocking( 0 ) # none block
def send_msg( self, msg ):
"""發送消息,
@param msg 字典對象,發送 msg 的 rawdata 字段
"""
self._sock.sendto( msg.rawdata, ('192.168.8.1', DATA_PORT))
這個 UdpServer 非常簡單,因為後續會通過這個 UdpServer 不停的發包,但是每次發包必須等待發送端成功將 UDP 包發送出去,這裏不要將 socket 對象設置成非阻塞的,否則程序運行時會出現錯誤提示(盡管可以忽略掉這個錯誤提示,但是沒必要設置成非阻塞的,阻塞模式完全足夠了)。
在 github 中可以找到完整的 Python 文件,裏面定義了其他類,如 DataSender
、RGBSender
。DataSender
是在一個線程裏面發送 RGB 三個通道的值,RGBSender
的一個對象只會發送 RGB 三個通道中的某一個的值。
小結和註意事項
在本地測試的時候,為了方便在任務管理器中看到網絡占用率,最初是在 VirtualBox 的 ubuntu 虛擬機上運行這個 Python 程序的,但是受到虛擬機的資源分配和電腦性能影響,調用 singleMain
函數時每秒鐘最多只能產生 50MB 的數據量。但是在本地非虛擬機環境運行的時候最多可以達到 80MB 的數據量。所以盡可能地使用本地環境運行該 Python 程序可以最大限度的生成數據包。
如果讓 RGB 三個通道分別在三個不同的進程中執行發送過程(註釋掉 singleMain
的調用,換用 multiSend
方法),那麽每秒鐘的數據量可到 200MB,不過 80MB 的數據量已經足夠多了(接近千兆網卡的上限了,網絡利用率過高的話通過網線傳輸時會出現嚴重丟包的情況),不需要使用 multiSend
方法加大數據量。
在 singleMain 方法中,不直接執行 dataSender.serve()
,而是在新進程中執行,可以更好的利用多核優勢,發送數據更快:
# singleMain()
dataSender = DataSender()
# dataSender.serve()
p = Process(target=dataSender.serve)
p.start()
實際開發過程並不是這麽順利,因為一開始並不知道在大量數據發送的時候,發送端能否有效地將數據發送出去,實際上是邊編寫 Python 的模擬發送數據程序,邊編寫 Qt 獲取數據的程序,根據出現的問題逐步解決發送端和接收端的問題的。
編寫 Qt 獲取數據包的代碼及簡單的 GUI
Qt 這邊作為客戶端,只需要將接收到的數據包保存下來,獲取其中的有效數據,再將 RGB 數據賦到 QImage 對應的像素上顯示出來即可。GUI 部分比較簡單,使用 QWidget 中的 label 控件,將 QImage 轉換成 QPixmap,顯示到 label 上就好了。初始化後的窗口如圖:
比較麻煩的是接收數據和拼接。同樣地,為了方便表示和解析每個 UDP 包,我們構造一些類來存儲這些信息(現在想想似乎直接用結構體表示會更簡單)。
定義數據實體
我們在 Qt 中定義 CameraData
類來表示數據包實體:
/**
* @brief The CameraData class
* 對應從下位機接收到的字節數組的類,原始數據包,需要經過處理後變成一行數據
*/
class CameraData : public DataObj
{
Q_OBJECT
public:
enum RGBType {
R = 1,
G = 2,
B = 3,
UNKOWN = 0
};
static const QByteArray DATA_START_MAGIC;
static const QByteArray DATA_END_MAGIC;
static const int PacketSize;
explicit CameraData(QObject *parent = 0);
~CameraData();
bool isPackageValid();
// 獲取保留區域的數據
QByteArray getReserved();
// 設置原始數據
void setRawData(const QByteArray &value);
void setRawData(const char *data);
// 獲取數據區域內的所有數據,默認獲取有效數據
QByteArray getData(bool valid = true);
int getPackageCntInLine();
int getPackageIdxInLine();
int getSampleDiffLine();
int getRGBExtern();
RGBType getRGBType();
int getLineIdx();
int getValidDataLen();
int getLineBytes();
int sliceToInt(int start, int len = 4);
// DataObj interface
void reset();
signals:
public slots:
private:
inline QByteArray slice(int start, int len = -1);
inline QByteArray getStartMagic();
inline QByteArray getEndMagic();
QByteArray data;
int packageCntInLine = -1;
int packegeIdxInLine = -1;
int lineIdx = -1;
int lineBytes = -1;
int rgbType = -1;
};
CameraData
類繼承自 DataObj
類,而 DataObj
類又繼承自 QObject
,這樣方便進行內存管理和對象上的操作。DataObj
是為了方便復用對象而定義的基類,詳細代碼可參考 github 上的完整代碼。
C++ 部分的 CameraData
類與 Python 中定義的 CameraData
類是對應的,不過 C++ 部分的 CameraData
類只需要調用 CameraData::setRawData
傳入一個 QByteArray 對象後就可以自動將其中包含的數據解析出來,並且它只提供獲取數據的接口而不提供修改數據的接口。
另外我們還需要定義一個類 PreProcessData,來表示一行數據:
/**
* @brief The PreProcessData class
* 預處理數據
*/
class PreProcessData: public DataObj
{
Q_OBJECT
public:
static const int PacketSize;
static const int PacketPerLine;
explicit PreProcessData(QObject *parent = 0, int line = -1);
void put(CameraData *cd);
bool isReady();
void reset();
int line() const;
void setLine(int line);
const QByteArrayList &getDataList() const;
QByteArray repr();
private:
/**
* @brief cameraData
* 每 2 個 CameraData 構成一行的單通道數據,有序存放 RGB 通道數據
* 0-1 存放 R,2-3 存放 G, 4-5 存放 B
*/
QByteArrayList dataList;
int m_line;
int m_readyCount = 0;
int m_duplicateCount = 0;
bool *dataPlaced = 0;
};
目前的協議中,每 2 個數據包(對應 2 個 CameraData
對象)構成某一行的單通道數據,所以 PreProcessData
中至少會包含 6 個 CameraData
對象,處理完 CameraData
對象後,只需要存儲 Data 部分即可,所以這裏沒有用 QListQByteArrayList
來存儲數據。當三個通道的數據都準備好後,PreProcessData::isReady
就會返回 true,表示該行數據已經準備好,可以顯示在窗口中。
在子線程中執行接收 UDP 包和處理過程
我們定義一個 Controller
類用來操作數據接收對象和子線程。用 Qt 的事件槽機制和 QObject::moveToThread
實現多線程非常方便,不重寫 QThread 的 run 方法就可以讓對象的方法在子線程中執行。
class Controller : public QObject
{
Q_OBJECT
public:
explicit Controller(QObject *parent = 0);
~Controller();
static const int DataPort;
static const int CONTROL_PORT;
static const QStringList BOARD_IP;
void start();
void stop();
DataProcessor *getDataProcessor() const;
signals:
public slots:
private:
CameraDataReceiver *cdr;
QThread recvThread;
QThread recvProcessThread;
QByteArrayList rawdataList;
DataProcessor *dp = 0;
QTimer *statsTimer;
int statsInterval;
};
其中 CameraDataReceiver
對象會被實例化,在子線程中接收 UDP 數據包(因為發送和接收數據的端口是不同的,操作和數據是分離的)。這裏將 DataProcessor 通過 getDataProcessor
暴露給上層應用,以便上層應用連接信號槽接收圖像。僅到接收數據,就用到了三個線程:分別是 GUI 線程,用於接收 UDP 包的 recvThread 線程和處理 UDP 的 recvProcessThread。
為什麽接收 UDP 包和處理 UDP 包不是放在一個線程中執行呢?因為這裏的數據量實在太多,最開始實現的時候這兩個邏輯代碼確實是在同一個線程中執行,然而由於處理數據的代碼執行起來也要消耗時間,將會導致無法接收其他的 UDP 包,這樣的話就會導致比較嚴重的丟包。為了保證接收端不會丟包,只好將處理邏輯放在其他的線程中執行。
Qt 接收 UDP 包
將接收數據和處理數據放在不同的線程中執行,確實可以解決丟包問題了,但是會出現新的問題:接收到的包如果不能夠及時處理完,並且釋放掉相應的資源,那麽可能會出現程序將數據緩存下來但無法處理,程序占用的內存越來越大,導致程序運行起來越來越慢。
在編寫程序時誤以為是 Qt 的事件循環機制過慢導致程序處理不了那麽多數據(實際上它的速度足夠處理這些數據),因此將程序中使用的 QUdpSocket 對象換成了 [Windows 平臺的 Socket 通信代碼][winsock demo],並將其改寫成類方便調用。實際上是在 QThread 子線程中無限循環地運行 recvfrom(clientSocket, recvedData.data(), recvbuflen, 0, &fromaddr, &addrLen);
這樣的接收數據包函數,跳過了 Qt 事件循環機制,然後當接收到包之後再通過回調函數通知數據處理線程進行處理。
但當我寫這篇博客,重新用正常的代碼進行測試時,發現即便使用 QUdpSocket::readyRead
信號來接收 UDP 數據,只要數據處理進程不堆積數據,就不會出現占用內存越來越多的情況。換句話說,不是 Qt 無法處理實時性的數據,而是自己編寫的代碼裏面有問題。
回想最開始寫的程序,在處理 QByteArray 表示的原始數據時,會為每一個接收到的數據包分配地址,而且分配的地址位於堆中。而實際上在堆 heap 中分配回收內存地址相較於在棧 stack 中是慢得多的。為每個到來的數據用 new 構造一個新的 CameraData 對象,然後在處理完後將這個 CameraData delete 掉其實是很慢的,如果你這樣做了,並且你在 CameraData 的析構函數中加上 qDebug 語句打印 "CameraData is deleting...",你會發現,當發送方(我們的 Python 模擬發送程序)停止發送數據包後很長一段時間內,Qt 程序在一直打印著 "CameraData is deleting"。
而我最開始就是這麽做的,所以發生了 Qt 程序隨著數據接收的變多,占用的內存越來越大的情況。當然,這不排除 qDebug 語句輸出到控制臺上也會占用很多時間。如果每秒鐘要調用上萬次 qDebug() << "CameraData is deleting"
,那麽建議你使用一個計數變量控制 qDebug 的調用次數,因為這條語句的調用也會讓數據處理變得緩慢。
處理接收到的 UDP 包
為了讓接收端不丟包,需要快速的處理接收到的 UDP 包,並且在處理的代碼中不要調用耗時的函數或者 new 操作。為了避免重復調用 new 和 delete 操作符,我們需要構建一個對象池,以便復用池中的對象,減少 new 操作。池的定義比較簡單,封裝一個 QList
容器類就好了,為了簡化和復用池的代碼,我用到了 c++ 的 template 特性,但是這個 DataObjPool
中的容器只能是 DataObj 的子類:
template<class T>
class DataObjPool
{
public:
virtual ~DataObjPool() {
qDeleteAll(pool);
numAvailable = 0;
}
T *getAvailable() {
if( numAvailable == 0 ) {
return 0;
}
for(int i = 0; i < pool.size(); i++) {
T *item = pool[i];
if(item->isValid()) {
item->setValid(false);
numAvailable -= 1;
return item;
}
}
return 0;
}
T *get(int id) {
return pool[id];
}
inline bool release(T *dobj) {
dobj->reset();
numAvailable += 1;
return true;
}
int releaseTimeout(int now, int timeout = 100) {
int releaseCount = 0;
for(int i = 0; i < pool.size(); i++) {
T *item = pool[i];
if(now > item->getGenerateMs() + timeout) {
item->reset();
numAvailable += 1;
releaseCount += 1;
}
}
return releaseCount;
}
void releaseAll() {
for(int i = 0; i < pool.size(); i++) {
T *item = pool[i];
if(item->isValid()) {
continue;
}
item->reset();
numAvailable += 1;
}
}
int getNumAvailable() const {
return numAvailable;
}
template<class T2> operator DataObjPool<T2>();
protected:
DataObjPool(int size = 100);
private:
QList<T *> pool;
int numAvailable = 0;
};
class RawDataObjPool: public DataObjPool<CameraData>
{
public:
RawDataObjPool(int size = 100);
};
class LineDataPool : public DataObjPool<PreProcessData>
{
public:
LineDataPool(int size = 100);
};
當然你也可以直接編寫兩個類 RawDataObjPool
和 LineDataPool
,把池的操作分別復制到兩個類中,使用模板特化的好處是改動的時候不需要改動兩個類了。前面說過,DataObj
類繼承自 QObject
,就是為了簡化在對象池中進行的操作。DataObjPool
會在構造時在內存中預分配一定數量的對象,以 RawDataObjPool
為例,構造時傳入 size 參數,便會預先在內存中創建 size 個 CameraData,在程序運行過程中,這些對象都會被我們這個 Qt 程序循環利用,直到關閉程序才會釋放掉這些 CameraData(如果操作系統的內存不足,過多的對象占用的內存還是會被釋放)。
對象池的主要接口有兩個:getAvailable
和 release
分別用於獲取可用的對象或釋放掉池中的對象,註意這裏的釋放是讓對象池對該對象進行標記,以便重復使用,而不是釋放掉該對象占用的內存空間或 delete 掉。當對象池中無可用對象時,可以根據需要釋放掉超時的對象或者釋放掉全部對象。
使用對象池減少 new 操作符的使用後,處理數據的子線程的速度明顯加快。正常情況下就可以看到如下的圖片:
這裏數據顯示的部分還有待完善,因為發送端的發送數據大小不夠湊成一行,所以圖片的右側部分是空白的。
數據的復制
這裏說一下數據的復制,從 Socket 接口中傳上來的數據,我們用 QByteArray
對象保存了底層的數據,即便在 UDP 數據包中含有很多個 \x00
這樣的數據,QByteArray 也會正確識別出字符串的結束位置。
在設置 CameraData::setRawData(const QByteArray &value)
函數中,盡量避免手動調用 memcpy(data.data(), value, value.size());
這個底層 API,因為你不知道它會將 QByteArray 對象 CameraData.data
中的 char * data()
指針指向哪個位置。
我在 CameraData.cpp
文件中將它註釋掉了,因為在程序運行和調試時它給我帶來了巨大的困惑:經常出現 invalid address specified to rtlvalidateheap
這種類型的錯誤。經過很長時間的排查後發現註釋掉這行代碼,程序就能一直穩定運行。
總結
- 在 c++ 程序中要使用大量可重用的對象時,盡量避免頻繁地使用 new 操作符新建對象,使用對象池來獲取對象,這樣可以加快程序的運行速度。
- Qt 的事件循環機制實際上運行地足夠快,是可以處理實時性的數據的,在程序出現問題時,還是應該多找找自己編寫的代碼中的問題。
- 對於 memcpy 這類的底層 API,不熟悉的話盡量少用,否則出現問題很難 debug。
完整的項目代碼可以在 github 中找到。
參考
- QMap erase 用法
- QMap 無效指針
- winsock demo
- 正確使用堆棧
使用 Qt 獲取 UDP 數據並顯示成圖片