1. 程式人生 > >QT迴圈佇列實時處理資料(二)

QT迴圈佇列實時處理資料(二)

  上一篇多執行緒介紹的是,QT多執行緒處理機制,這篇,將對接收資料,實時處理進行分析。

         QT通過socket通訊,從接收緩衝區中讀取資料,交給執行緒進行處理,那麼問題來了,如果執行緒還沒有處理完資料,則執行緒就沒有辦法繼續從緩衝區中取數,那麼當資料量過大的時候,緩衝區會滿,繼而被覆蓋,從而造成資料的丟失。那麼如何將資料儲存在某個特定的空間內,並且讓其他執行緒進行讀取。這個是執行緒通訊的問題,這個問題有多種方式,一種是作業系統課本上,通過執行緒同步、互斥、訊號量這三種機制實現執行緒通訊,或者就是通過迴圈佇列的方式,完成執行緒通訊。

         這篇主要介紹的是第二種方式,即迴圈佇列的方式,進行通訊。

         迴圈佇列的實現方式,通過全域性變數定義一個大的陣列,同時,定義兩個讀寫指標,這個指標不是語言中指標這個型別,可以理解成,兩個標誌位,記錄讀和寫的位置,通過這種方式,可以實現一個迴圈佇列的基本模型。如下圖:


         write表示寫指標,read表示讀指標。我們將從socket緩衝區接收到的資料,快取到佇列中,將寫指標向後移動,另外一個執行緒,操作讀指標,不斷跟隨寫指標,將資料取出,處理。下面把程式碼貼上來供大家參考:

  1. MyThread.h  
  2. #ifndef MYTHREAD_H
  3. #define MYTHREAD_H
  4. #include <QThread>
  5. #include <QDebug>
  6. #include "basetype.h"
  7. #include "mythreadstore.h"
  8. /** 
  9.  * @brief The MyThreadRecv class 
  10.  * 該類負責接收從tcp讀取到的資料。並將資料儲存到緩衝區中 
  11.  */
  12. class MyThreadRecv  
  13. {  
  14. public:  
  15.     MyThreadRecv();  
  16.     ~MyThreadRecv(){};  
  17.     void RecvMsg(constBYTE *data, int len);            ///< 接收資料,存入迴圈佇列
  18. };  
  19. /** 
  20.  * @brief The MyThread class 
  21.  * 處理資料的執行緒 
  22.  */
  23. class MyThread: public QThread  
  24. {  
  25. public:  
  26. public:  
  27.     MyThread();  
  28.     ~MyThread(){};  
  29.     void init();  
  30.     void run();                 ///< 任務執行
  31. private:  
  32.     volatilebool stopped;  
  33.     int Flag;  
  34.     MythreadStore *mythreadstore;  
  35. };  
  36. #endif // MYTHREAD_H
  1. MyThread.cpp  
  2. #include "mythread.h"
  3. BYTE Queue1[(1024 * 500)] = {0};            ///< 迴圈訊息佇列
  4. int wReadPoint = 0;                         ///< 讀指標
  5. int wWritePoint = 0;                        ///< 寫指標
  6. MyThreadRecv::MyThreadRecv()  
  7. {  
  8. }  
  9. void MyThreadRecv::RecvMsg(constBYTE *data, int len)  
  10. {  
  11.     qDebug()<<"I will gointo for";  
  12.     for(int iNum = 0; iNum < len; iNum++) {  
  13.         /** 
  14.          * @brief tempWReadPoint 
  15.          * 儲存,到程式執行到此處的時候,wReadPoint的值,因為執行緒一直在執行,很有可能執行到這步的時候,wReadPoint的值被改變。 
  16.          */
  17.         int tempWReadPoint = wReadPoint;  
  18.         if((wWritePoint + 1) % (1024 * 500) == tempWReadPoint) {  
  19.             /** 
  20.              * 佇列已滿 
  21.              */
  22.             continue;  
  23.         }  
  24.         /** 
  25.          * 處理佇列不滿的情況 
  26.          */
  27.         Queue1[wWritePoint % (1024 * 500)] = data[iNum];  
  28.         wWritePoint = (wWritePoint +1) % (1024 * 500);  
  29.     }  
  30.     qDebug()<<"After for";  
  31. }  
  32. void MyThread::init()  
  33. {  
  34.     start();  
  35. }  
  36. MyThread::MyThread()  
  37. {  
  38.     stopped = false;  
  39.     Flag = 0;  
  40.     mythreadstore = new MythreadStore();  
  41. }  
  42. void MyThread::run()  
  43. {  
  44.     qDebug()<<"In run";  
  45.     int iFlag = 0;              ///< 標誌位
  46.     int iNum = 0;  
  47.     BYTE NeedDealdata[200] = {0};  
  48.     while(!stopped) {  
  49.         /** 
  50.          * @brief itempWritePoint 
  51.          * 儲存,到程式執行到此處的時候,wWritePoint的值,因為執行緒一直在執行,很有可能執行到這步的時候,wWritePoint的值被改變。 
  52.          */
  53.         int itempWritePoint = wWritePoint;  
  54.         if((wReadPoint) % (1024 * 500) != itempWritePoint) {  
  55.             /** 
  56.              * 佇列不空 
  57.              */
  58.             if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0x54 == Queue1[(wReadPoint) % (1024 * 500)])) {  
  59.                 /** 
  60.                  * 找幀頭 
  61.                  */
  62.                 iNum = 0;  
  63.                 NeedDealdata[iNum++] = Queue1[(wReadPoint -1) % (1024 * 500)];  
  64.                 NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];  
  65.                 wReadPoint = (wReadPoint + 1) % (1024 * 500);  
  66.                 iFlag = 1;  
  67.             }  
  68.             if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0xFE == Queue1[(wReadPoint) % (1024 * 500)]) && (1 == iFlag)) {  
  69.                 NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];  
  70.                 wReadPoint = (wReadPoint + 1) % (1024 * 500);  
  71.                 qDebug()<<"I will store msg || iNum = " + QString::number(iNum);  
  72.                 /** 
  73.                  * 找到需要處理的資料,處理資料 
  74.                  */
  75.                 mythreadstore->StoreMsgDeal(NeedDealdata, iNum);  
  76.                 memset(NeedDealdata, '\0'sizeof(NeedDealdata));  
  77.                 iFlag = 0;  
  78.                 iNum = 0;  
  79.             }  
  80.             if(1 == iFlag) {  
  81.                 NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];  
  82.                 wReadPoint = (wReadPoint + 1) % (1024 * 500);  
  83.             } else {  
  84.                 wReadPoint = (wReadPoint + 1) % (1024 * 500);  
  85.             }  
  86.         }  
  87.         usleep(10);  
  88.     }  
  89. }  
        上面兩段程式碼中,存在兩個類,MyThreadRecv和MyThread,前者負責儲存從socket中獲取到的資料,後面這個類負責從佇列中讀取資料,並且處理。這樣的機制,能夠有效的處理迴圈佇列中的資料,從而達到實時處理資料,並且保證資料準確性和實時性的問題,但是一個重點是,需要考慮什麼時候佇列滿,什麼時候佇列空,這個問題是設計這個迴圈佇列並且處理的關鍵,需要根據問題仔細的去分析,然後設計。

        對於迴圈佇列的基本內容,這裡就不做詳細的描述,因為比較簡單,可以自行查詢資料結構相關的內容。