Qt之執行緒同步(生產者消費者模式
簡述
生產者將資料寫入緩衝區,直到它到達緩衝區的末尾,此時,它將從開始位置重新啟動,覆蓋現有資料。消費者執行緒讀取資料並將其寫入標準錯誤。
Semaphore(訊號量) 比 mutex(互斥量)有一個更高階的併發性。如果緩衝區的訪問由一個 QMutex 把守,當生產者執行緒訪問緩衝區時,消費者執行緒將無法訪問。然而,有兩個執行緒同一時間訪問不同的緩衝區是沒有害處的。
示例包括兩個類:Producer 和 Consumer,均繼承自 QThread。迴圈緩衝區用於這兩個類之間的溝通,訊號量用於保護全域性變數。
全域性變數
首先,一起來看迴圈緩衝區和相關的訊號量:
const int DataSize = 100000 ;
const int BufferSize = 8192;
char buffer[BufferSize];
QSemaphore freeBytes(BufferSize);
QSemaphore usedBytes;
DataSize 是生產者將生成的資料數量,為了讓示例儘可能地簡單,把它定義為一個常數。BufferSize 是迴圈緩衝區的大小,小於 DataSize,這意味著在某一時刻生產者將達到緩衝區的末尾,會從開始位置重新啟動。
要同步生產者和消費者,需要兩個訊號量。freeBytes 訊號量用於控制緩衝區的 "free"
區域(生產者尚未填充資料,或消費者已經讀取的區域)。usedBytes 訊號量用於控制緩衝區的
"used"
總之,這些訊號量確保生產者不會先於消費者超過 BufferSize 的大小,而消費者永遠不會讀取生產者尚未生成的資料。
freeBytes 訊號量用 BufferSize 來初始化,因為最初整個緩衝區是空的。usedBytes 訊號量初始化為 0(預設值,如果未指定)。
Producer
Producer 類的程式碼如下:
class Producer : public QThread
{
public:
void run() Q_DECL_OVERRIDE
{
qsrand(QTime(0,0,0).secsTo(QTime ::currentTime()));
for (int i = 0; i < DataSize; ++i) {
freeBytes.acquire();
buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4];
usedBytes.release();
}
}
};
生產者生成 DataSize 位元組的資料。在往迴圈緩衝區寫入一個位元組之前,它必須使用 freeBytes 訊號量來獲得一個 "free"
位元組。如果消費者沒有與生產者保持著同樣的速度,QSemaphore::acquire() 呼叫可能會阻塞。
最後,生產者使用 usedBytes 訊號量釋放一個位元組。該 "free"
位元組已成功轉化為一個 "used"
位元組,準備好供消費者讀取。
Consumer
現在轉向 Consumer 類:
class Consumer : public QThread
{
Q_OBJECT
public:
void run() Q_DECL_OVERRIDE
{
for (int i = 0; i < DataSize; ++i) {
usedBytes.acquire();
fprintf(stderr, "%c", buffer[i % BufferSize]);
freeBytes.release();
}
fprintf(stderr, "\n");
}
signals:
void stringConsumed(const QString &text);
protected:
bool finish;
};
程式碼非常類似於生產者,不同的是,獲得 "used"
位元組並釋放一個 "free"
位元組,而非相反。
main() 函式
在 main() 函式中,我們建立兩個執行緒,並呼叫 QThread::wait(),以確保在退出之前,這兩個執行緒有時間完成。
int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
Producer producer;
Consumer consumer;
producer.start();
consumer.start();
producer.wait();
consumer.wait();
return 0;
}
當執行這個程式時,會發生什麼呢?
最初,生產者是唯一一個可以做任何事情的執行緒,消費者阻塞並等待 usedBytes 訊號量的釋放(available() 初始數是 0)。一旦生產者把一個位元組放入緩衝區,freeBytes.available() 就會變為 BufferSize - 1,並且 usedBytes.available() 變為 1。這時,可能發生兩件事:要麼消費者執行緒接管和讀取位元組,要麼生產者開始生成第二個位元組。
此示例中提出的“生產者 - 消費者”模式,適用於編寫高併發多執行緒應用。在多處理器計算機中,程式可能比基於 mutex 的方案快達兩倍之多,因為兩個執行緒可以同一時間在緩衝區的不同部分處於啟用狀態。
要知道,這些好處並不總能實現,獲取和釋放一個 QSemaphore 是需要成本的。在實踐中,可能需要把緩衝區分為塊,並針對塊操作而非單個位元組。緩衝區的大小也是一個必須仔細選擇的引數,需要基於實驗。