1. 程式人生 > >Qt之執行緒同步(生產者消費者模式

Qt之執行緒同步(生產者消費者模式



簡述

生產者將資料寫入緩衝區,直到它到達緩衝區的末尾,此時,它將從開始位置重新啟動,覆蓋現有資料。消費者執行緒讀取資料並將其寫入標準錯誤。

Semaphore(訊號量) 比 mutex(互斥量)有一個更高階的併發性。如果緩衝區的訪問由一個 QMutex 把守,當生產者執行緒訪問緩衝區時,消費者執行緒將無法訪問。然而,有兩個執行緒同一時間訪問不同的緩衝區是沒有害處的。

示例包括兩個類:Producer 和 Consumer,均繼承自 QThread。迴圈緩衝區用於這兩個類之間的溝通,訊號量用於保護全域性變數。

全域性變數

首先,一起來看迴圈緩衝區和相關的訊號量:

const int DataSize = 100000
; const int BufferSize = 8192; char buffer[BufferSize]; QSemaphore freeBytes(BufferSize); QSemaphore usedBytes;

DataSize 是生產者將生成的資料數量,為了讓示例儘可能地簡單,把它定義為一個常數。BufferSize 是迴圈緩衝區的大小,小於 DataSize,這意味著在某一時刻生產者將達到緩衝區的末尾,會從開始位置重新啟動。

要同步生產者和消費者,需要兩個訊號量。freeBytes 訊號量用於控制緩衝區的 "free" 區域(生產者尚未填充資料,或消費者已經讀取的區域)。usedBytes 訊號量用於控制緩衝區的 "used"

區域(生產者已經填充資料,但消費者尚未讀取的區域)。

總之,這些訊號量確保生產者不會先於消費者超過 BufferSize 的大小,而消費者永遠不會讀取生產者尚未生成的資料。

freeBytes 訊號量用 BufferSize 來初始化,因為最初整個緩衝區是空的。usedBytes 訊號量初始化為 0(預設值,如果未指定)。

Producer

Producer 類的程式碼如下:

class Producer : public QThread
{
public:
    void run() Q_DECL_OVERRIDE
    {
        qsrand(QTime(0,0,0).secsTo(QTime
::currentTime()
)); for (int i = 0; i < DataSize; ++i) { freeBytes.acquire(); buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4]; usedBytes.release(); } } };

生產者生成 DataSize 位元組的資料。在往迴圈緩衝區寫入一個位元組之前,它必須使用 freeBytes 訊號量來獲得一個 "free" 位元組。如果消費者沒有與生產者保持著同樣的速度,QSemaphore::acquire() 呼叫可能會阻塞。

最後,生產者使用 usedBytes 訊號量釋放一個位元組。該 "free" 位元組已成功轉化為一個 "used" 位元組,準備好供消費者讀取。

Consumer

現在轉向 Consumer 類:

class Consumer : public QThread
{
    Q_OBJECT
public:
    void run() Q_DECL_OVERRIDE
    {
        for (int i = 0; i < DataSize; ++i) {
            usedBytes.acquire();
            fprintf(stderr, "%c", buffer[i % BufferSize]);
            freeBytes.release();
        }
        fprintf(stderr, "\n");
    }

signals:
    void stringConsumed(const QString &text);

protected:
    bool finish;
};

程式碼非常類似於生產者,不同的是,獲得 "used" 位元組並釋放一個 "free" 位元組,而非相反。

main() 函式

在 main() 函式中,我們建立兩個執行緒,並呼叫 QThread::wait(),以確保在退出之前,這兩個執行緒有時間完成。

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

當執行這個程式時,會發生什麼呢?

最初,生產者是唯一一個可以做任何事情的執行緒,消費者阻塞並等待 usedBytes 訊號量的釋放(available() 初始數是 0)。一旦生產者把一個位元組放入緩衝區,freeBytes.available() 就會變為 BufferSize - 1,並且 usedBytes.available() 變為 1。這時,可能發生兩件事:要麼消費者執行緒接管和讀取位元組,要麼生產者開始生成第二個位元組。

此示例中提出的“生產者 - 消費者”模式,適用於編寫高併發多執行緒應用。在多處理器計算機中,程式可能比基於 mutex 的方案快達兩倍之多,因為兩個執行緒可以同一時間在緩衝區的不同部分處於啟用狀態。

要知道,這些好處並不總能實現,獲取和釋放一個 QSemaphore 是需要成本的。在實踐中,可能需要把緩衝區分為塊,並針對塊操作而非單個位元組。緩衝區的大小也是一個必須仔細選擇的引數,需要基於實驗。