QT迴圈佇列實時處理資料(二)
阿新 • • 發佈:2019-01-30
上一篇多執行緒介紹的是,QT多執行緒處理機制,這篇,將對接收資料,實時處理進行分析。
QT通過socket通訊,從接收緩衝區中讀取資料,交給執行緒進行處理,那麼問題來了,如果執行緒還沒有處理完資料,則執行緒就沒有辦法繼續從緩衝區中取數,那麼當資料量過大的時候,緩衝區會滿,繼而被覆蓋,從而造成資料的丟失。那麼如何將資料儲存在某個特定的空間內,並且讓其他執行緒進行讀取。這個是執行緒通訊的問題,這個問題有多種方式,一種是作業系統課本上,通過執行緒同步、互斥、訊號量這三種機制實現執行緒通訊,或者就是通過迴圈佇列的方式,完成執行緒通訊。
這篇主要介紹的是第二種方式,即迴圈佇列的方式,進行通訊。
迴圈佇列的實現方式,通過全域性變數定義一個大的陣列,同時,定義兩個讀寫指標,這個指標不是語言中指標這個型別,可以理解成,兩個標誌位,記錄讀和寫的位置,通過這種方式,可以實現一個迴圈佇列的基本模型。如下圖:
write表示寫指標,read表示讀指標。我們將從socket緩衝區接收到的資料,快取到佇列中,將寫指標向後移動,另外一個執行緒,操作讀指標,不斷跟隨寫指標,將資料取出,處理。下面把程式碼貼上來供大家參考:
- MyThread.h
- #ifndef MYTHREAD_H
- #define MYTHREAD_H
-
#include <QThread>
- #include <QDebug>
- #include "basetype.h"
- #include "mythreadstore.h"
- /**
- * @brief The MyThreadRecv class
- * 該類負責接收從tcp讀取到的資料。並將資料儲存到緩衝區中
- */
- class MyThreadRecv
- {
- public:
- MyThreadRecv();
- ~MyThreadRecv(){};
-
void RecvMsg(constBYTE *data, int len); ///< 接收資料,存入迴圈佇列
- };
- /**
- * @brief The MyThread class
- * 處理資料的執行緒
- */
- class MyThread: public QThread
- {
- public:
- public:
- MyThread();
- ~MyThread(){};
- void init();
- void run(); ///< 任務執行
- private:
- volatilebool stopped;
- int Flag;
- MythreadStore *mythreadstore;
- };
- #endif // MYTHREAD_H
- MyThread.cpp
- #include "mythread.h"
- BYTE Queue1[(1024 * 500)] = {0}; ///< 迴圈訊息佇列
- int wReadPoint = 0; ///< 讀指標
- int wWritePoint = 0; ///< 寫指標
- MyThreadRecv::MyThreadRecv()
- {
- }
- void MyThreadRecv::RecvMsg(constBYTE *data, int len)
- {
- qDebug()<<"I will gointo for";
- for(int iNum = 0; iNum < len; iNum++) {
- /**
- * @brief tempWReadPoint
- * 儲存,到程式執行到此處的時候,wReadPoint的值,因為執行緒一直在執行,很有可能執行到這步的時候,wReadPoint的值被改變。
- */
- int tempWReadPoint = wReadPoint;
- if((wWritePoint + 1) % (1024 * 500) == tempWReadPoint) {
- /**
- * 佇列已滿
- */
- continue;
- }
- /**
- * 處理佇列不滿的情況
- */
- Queue1[wWritePoint % (1024 * 500)] = data[iNum];
- wWritePoint = (wWritePoint +1) % (1024 * 500);
- }
- qDebug()<<"After for";
- }
- void MyThread::init()
- {
- start();
- }
- MyThread::MyThread()
- {
- stopped = false;
- Flag = 0;
- mythreadstore = new MythreadStore();
- }
- void MyThread::run()
- {
- qDebug()<<"In run";
- int iFlag = 0; ///< 標誌位
- int iNum = 0;
- BYTE NeedDealdata[200] = {0};
- while(!stopped) {
- /**
- * @brief itempWritePoint
- * 儲存,到程式執行到此處的時候,wWritePoint的值,因為執行緒一直在執行,很有可能執行到這步的時候,wWritePoint的值被改變。
- */
- int itempWritePoint = wWritePoint;
- if((wReadPoint) % (1024 * 500) != itempWritePoint) {
- /**
- * 佇列不空
- */
- if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0x54 == Queue1[(wReadPoint) % (1024 * 500)])) {
- /**
- * 找幀頭
- */
- iNum = 0;
- NeedDealdata[iNum++] = Queue1[(wReadPoint -1) % (1024 * 500)];
- NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
- wReadPoint = (wReadPoint + 1) % (1024 * 500);
- iFlag = 1;
- }
- if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0xFE == Queue1[(wReadPoint) % (1024 * 500)]) && (1 == iFlag)) {
- NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
- wReadPoint = (wReadPoint + 1) % (1024 * 500);
- qDebug()<<"I will store msg || iNum = " + QString::number(iNum);
- /**
- * 找到需要處理的資料,處理資料
- */
- mythreadstore->StoreMsgDeal(NeedDealdata, iNum);
- memset(NeedDealdata, '\0', sizeof(NeedDealdata));
- iFlag = 0;
- iNum = 0;
- }
- if(1 == iFlag) {
- NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
- wReadPoint = (wReadPoint + 1) % (1024 * 500);
- } else {
- wReadPoint = (wReadPoint + 1) % (1024 * 500);
- }
- }
- usleep(10);
- }
- }
對於迴圈佇列的基本內容,這裡就不做詳細的描述,因為比較簡單,可以自行查詢資料結構相關的內容。