1. 程式人生 > 實用技巧 >Qt同步執行緒(QMutex QMutexLocker QReadWriteLock QSemaphore QWaitCondition )

Qt同步執行緒(QMutex QMutexLocker QReadWriteLock QSemaphore QWaitCondition )

Qt同步執行緒

我們知道,多執行緒有的時候是很有用的,但是在訪問一些公共的資源或者資料時,需要進行同步,否則會使資料遭到破壞或者獲取的值不正確。Qt提供了一些類來實現執行緒的同步,如QMutexQMutexLockerQReadWriteLockQReadLockerQWriteLockerQSemaphoreQWaitCondition。下面我們分別來看它們的用法:

QMutex

首先,簡單的瞭解一下QMutex提供的函式。

建構函式:QMutex(RecursionModemode= NonRecursive )。

需要注意的是建構函式的引數,RecursionMode遞迴模式。列舉型別

RecursionMode有兩個值:

QMutex::Recursive,在這個模式下,一個執行緒可以多次鎖同一個互斥量。需要注意的是,呼叫lock()多少次鎖,就必須相應的呼叫unlock()一樣次數解鎖。

QMutex::NonRecursive(預設),在這個模式下,一個執行緒只能鎖互斥量一次

voidQMutex::lock()

該函式用來鎖住一個互斥量。如果另外的執行緒已經鎖住了互斥量,函式將被阻塞等待另外的執行緒解鎖互斥量。

如果是一個可遞迴的互斥量,則可以從同一個執行緒多次呼叫這個函式,如果是非遞迴的互斥量多次呼叫這個函式將會引發死鎖。我們來看看原始碼是怎麼實現的。

void QMutex::lock()
{
  QMutexPrivate *d = static_cast<QMutexPrivate*>(this->d);

  Qt::HANDLE self;

  if(d->recursive) {

    self = QThread::currentThreadId();

    if(d->owner == self) {

    ++d->count;             //同一個執行緒多次lock時,僅僅自增count

    //當然遞迴次數太多也會導致棧溢位

    Q_ASSERT_X(d->count != 0, "QMutex::lock", "Overflowin recursion counter");

    return;

  }

  bool isLocked = d->contenders.testAndSetAcquire(0, 1); //嘗試加鎖
  if(!isLocked) {     // didn't get the lock, wait for it     isLocked = d->wait();     Q_ASSERT_X(isLocked, "QMutex::lock",     "Internalerror, infinite wait has timed out.");   }   d->owner = self; //遞迴模式時,owner記錄擁有互斥量的執行緒   ++d->count; //記錄lock的次數   Q_ASSERT_X(d->count != 0, "QMutex::lock", "Overflowin recursion counter");   return; } //非遞迴模式時,   bool isLocked = d->contenders.testAndSetAcquire(0, 1); //嘗試加鎖
  if(!isLocked) {     lockInternal(); //加鎖失敗則在lockInternal()中一直等到別的執行緒解鎖。   } } 看看lockInternal的實現 void QMutex::lockInternal() { 。。。 do { 。。。。//其他程式碼太複雜,感覺最重要的就是這個while迴圈了, //一直迴圈檢測,試圖加鎖。這我們就好理解,非遞迴模式的//互斥量,不要在同一個執行緒裡,多次呼叫lock了。因為第二次呼叫的時候會在這裡死迴圈了 } while(d->contenders != 0 || !d->contenders.testAndSetAcquire(0, 1));//嘗試加鎖 。。。。。。。 }

boolQMutex::tryLock()

該函式試圖鎖一個互斥量,如果成功則返回true。如果另外的執行緒已經鎖住了互斥量,函式直接返回false。

boolQMutex::tryLock(inttimeout)

該函式跟上面的trylock()相似。不同的是,如果互斥量在別的執行緒鎖住的情況下,函式會等待timeout毫秒。需要注意的是,如果傳入的timeout為負數,函式將無限期等待,跟呼叫lock()一樣的效果。這個函式跟上面的差不多,所以只看該函式的原始碼實現就好了。

bool QMutex::tryLock(inttimeout)
{
    QMutexPrivate *d = static_cast<QMutexPrivate*>(this->d);

    Qt::HANDLE self;

    if(d->recursive) 
{ self = QThread::currentThreadId(); if(d->owner == self) {   ++d->count;    Q_ASSERT_X(d->count != 0, "QMutex::tryLock", "Overflow in recursion counter");     return true;     } bool isLocked = d->contenders.testAndSetAcquire(0, 1); if(!isLocked) { // didn'tget the lock, wait for it isLocked = d->wait(timeout); //嘗試加鎖失敗則等待 if(!isLocked) return false; } d->owner = self; ++d->count; Q_ASSERT_X(d->count != 0, "QMutex::tryLock", "Overflow in recursion counter"); return true; } //嘗試加鎖失敗,(d->contenders.testAndSetAcquire(0,1)返回false,所以繼續執行d->wait(timeout); return (d->contenders.testAndSetAcquire(0, 1) ||d->wait(timeout)); } //在win下,wait函式實際上是用事件物件實現的 bool QMutexPrivate::wait(inttimeout) { if(contenders.fetchAndAddAcquire(1) == 0) { // lockacquired without waiting return true; } // 當timeout 小於0,則等待時間為INFINITE,這也就是為什麼傳負數引數時跟lock一樣會無限期等待了 boolreturnValue = (WaitForSingleObject(event,timeout < 0 ? INFINITE : timeout) == WAIT_OBJECT_0); contenders.deref(); returnreturnValue; }

voidQMutex::unlock()

該函式對互斥量進行解鎖。如果在另外的執行緒加鎖,嘗試在別的執行緒進行解鎖則會引發錯誤。試圖對沒有加鎖的互斥量解鎖結果是未定義的。

QMutexLocker

QmutexLocker只是為了簡化我們對互斥量的加鎖和解鎖操作。就像智慧指標方便我們使用普通指標一樣。

QMutexLocker(QMutex *mutex)。

建構函式必須傳入一個互斥量指標,然後在建構函式裡mutex直接呼叫lock()

inline explicitQMutexLocker(QMutex *m)

{

Q_ASSERT_X((reinterpret_cast<quintptr>(m)& quintptr(1u)) == quintptr(0),

"QMutexLocker","QMutex pointer is misaligned");

if (m){

m->lockInline();    // mutex呼叫lock()加鎖

val = reinterpret_cast<quintptr>(m)| quintptr(1u);

} else{

val = 0;

}

}

inline ~QMutexLocker() { unlock(); }

inline void unlock()

{

if((val & quintptr(1u)) == quintptr(1u)) {

val &= ~quintptr(1u);

mutex()->unlockInline();   //析構時呼叫unlock,確保mutex在離開呼叫執行緒時被解鎖。

}

}

下面來看看具體的用法:

假設有個函式有很多return 語句,那麼我們就必須記得在每個return語句前unlock互斥量,否則互斥量將無法得到解鎖,導致其他等待的執行緒無法繼續執行。

int complexFunction(intflag)

{

  mutex.lock();

  int retVal = 0;

  switch (flag) {

    case 0:

    case1:

    retVal = moreComplexFunction(flag);

    break;

  case 2:

  {

    int status = anotherFunction();

    if (status < 0) {

      mutex.unlock();

      return -2;

    }

    retVal = status + flag;

  }

  break;

  default:

  if (flag > 10) {

    mutex.unlock();

    return -1;

  }

  break;

}

mutex.unlock();

return retVal;

}

這樣的程式碼顯得很冗餘又容易出錯。如果我們用QMutexLocker

intcomplexFunction(int flag)

{

QMutexLocker locker(&mutex);

int retVal = 0;

switch (flag) {

case 0:

case 1:

return moreComplexFunction(flag);

case 2:

{

int status = anotherFunction();

if (status < 0)

return -2;

retVal = status + flag;

}

break;

default:

if (flag > 10)

return -1;

break;

}

return retVal;

}

由於locker 是區域性變數,在離開函式作用域時,mutex肯定會被解鎖

QReadWriteLock

QReadWriteLock是一個讀寫鎖,主要用來同步保護需要讀寫的資源。當你想多個讀執行緒可以同時讀取資源,但是只能有一個寫執行緒操作資源,而其他執行緒必須等待寫執行緒完成時,這時候用這個讀寫鎖就很有用了。QreadWriteLock也有遞迴和非遞迴模式之分。

我們主要來看看最重要的兩個函式是如何實現讀寫操作的同步的。

voidQReadWriteLock::lockForRead()

該函式lock接了讀操作的鎖。如果有別的執行緒已經對lock接了寫操作的鎖,則函式會阻塞等待。

void QReadWriteLock::lockForRead()

{

  QMutexLocker lock(&d->mutex);

  Qt::HANDLE self = 0;

  if(d->recursive) {

    self = QThread::currentThreadId();

    QHash<Qt::HANDLE, int>::iterator it = d->currentReaders.find(self);

    if (it!= d->currentReaders.end()) {

      ++it.value();

      ++d->accessCount;

      Q_ASSERT_X(d->accessCount >0, "QReadWriteLock::lockForRead()",

      "Overflowin lock counter");

  return;

  }

}

// accessCount 小於0說明有寫執行緒在操作資源,則阻塞

while(d->accessCount < 0 || d->waitingWriters) {

++d->waitingReaders;             //自增等待的讀執行緒數

d->readerWait.wait(&d->mutex);

--d->waitingReaders;

}

if(d->recursive)

d->currentReaders.insert(self, 1);

++d->accessCount;    //自增,記錄有多少個執行緒訪問了資源

Q_ASSERT_X(d->accessCount > 0, "QReadWriteLock::lockForRead()", "Overflow in lock counter");

}

void QReadWriteLock::lockForWrite ()

該函式給lock加了寫操作的鎖,如果別的執行緒已經加了讀或者寫的鎖,則函式會被阻塞。

void QReadWriteLock::lockForWrite()

{

QMutexLocker lock(&d->mutex);

Qt::HANDLE self = 0;

if(d->recursive) {

self = QThread::currentThreadId();

if(d->currentWriter == self) {

--d->accessCount;

Q_ASSERT_X(d->accessCount <0, "QReadWriteLock::lockForWrite()",

"Overflowin lock counter");

return;

}

}

// accessCount不等於0,說明有執行緒在操作資源,則函式阻塞等待。

// accessCount大於0說明有讀執行緒在讀取資源,

// accessCount小於0說明有寫執行緒在寫資料

while(d->accessCount != 0) {

++d->waitingWriters;        //自增等待的寫執行緒數

d->writerWait.wait(&d->mutex);

--d->waitingWriters;

}

if(d->recursive)

d->currentWriter = self;

--d->accessCount;

Q_ASSERT_X(d->accessCount < 0, "QReadWriteLock::lockForWrite()", "Overflow in lock counter");

}

voidQReadWriteLock::unlock()

解鎖函式,下面我們看看原始碼是如何實現,讓等待的寫執行緒優先於讀執行緒獲得互斥量的鎖的。

void QReadWriteLock::unlock()

{

QMutexLocker lock(&d->mutex);

Q_ASSERT_X(d->accessCount != 0, "QReadWriteLock::unlock()", "Cannot unlock an unlocked lock");

boolunlocked = false;

if(d->accessCount > 0) {

// releasinga read lock

if(d->recursive) {

Qt::HANDLE self =QThread::currentThreadId();

QHash<Qt::HANDLE, int>::iterator it =d->currentReaders.find(self);

if(it != d->currentReaders.end()) {

if(--it.value() <= 0)

d->currentReaders.erase(it);

}

}

// d->accessCount  說明沒有執行緒在操作資源了unlocked為true

unlocked = --d->accessCount == 0;

} else if (d->accessCount < 0 &&++d->accessCount == 0)

{

// d->accessCount <0 說明有寫執行緒在操作。則解鎖unlocked = true;

// released awrite lock

unlocked = true;

d->currentWriter = 0;

}

//最重要的就是這裡

if(unlocked) {

if(d->waitingWriters) {

//如果有寫執行緒在等待,則wake一個寫執行緒。前面我們已經知道,寫執行緒是隻能有一個對資源進行操作的,所以就wakeone了。

d->writerWait.wakeOne();

} else if (d->waitingReaders) {

//如果沒有等待的寫執行緒,則wake全部的讀執行緒。因為讀執行緒是可以多個對資源進行操作的。

d->readerWait.wakeAll();

}

}

}

下面是我自己簡單的實現用例:

class Lock:publicQObject

{

Q_OBJECT

public:

Lock();

~Lock();

void Start();

void Read();

void Write();

void ReadThread1();

void ReadThread2();

void WriteThread1();

void WriteThread2();

protected:

private:

string strResource;

QReadWriteLock lock;    

};

Lock::Lock()

{

strResource = "Hellworld ......";

}

Lock::~Lock()

{

}

void Lock::Read()

{

cout<<"Readdata :"<<strResource<<endl;

QEventLoop loop;

QTimer::singleShot(2000,&loop,SLOT(quit()));   

loop.exec();

}

void Lock::Write()

{

strResource = "writelock ";

cout<<"Writedata :"<<strResource<<endl;

QEventLoop loop;

QTimer::singleShot(2000,&loop,SLOT(quit()));  

loop.exec();

}

void Lock::ReadThread1()

{

lock.lockForRead();

cout<<"ReadThread1  lockForRead"<<endl;

Read();

cout<<"ReadThread1  unlock"<<endl;

lock.unlock();

}

void Lock::ReadThread2()

{

lock.lockForRead();

cout<<"ReadThread2  lockForRead"<<endl;

Read();

cout<<"ReadThread2  unlock"<<endl;

lock.unlock();

}

void Lock::WriteThread1()

{

lock.lockForWrite();

cout<<"WriteThread1  lockForWrite"<<endl;

Write();

cout<<"WriteThread1  unlock"<<endl;

lock.unlock();

}

void Lock::WriteThread2()

{

lock.lockForWrite();

cout<<"WriteThread2  lockForWrite"<<endl;

Write();

cout<<"WriteThread2  unlock"<<endl;

lock.unlock();

}

void Lock::Start()

{

QtConcurrent::run(this,&Lock::ReadThread1);

QtConcurrent::run(this,&Lock::ReadThread2);

QtConcurrent::run(this,&Lock::WriteThread1);

QtConcurrent::run(this,&Lock::WriteThread2);

}

這裡我先啟動兩個讀執行緒,再啟動寫執行緒,執行結果如下。我們發現先讀執行緒1先加了鎖,讀執行緒1還沒解鎖的時候,讀執行緒2已經加了鎖,驗證了讀執行緒是可以同時進入的。

如果我改一下程式碼:

voidLock::Start()

{

QtConcurrent::run(this,&Lock::WriteThread1);

QtConcurrent::run(this,&Lock::ReadThread1);

QtConcurrent::run(this,&Lock::ReadThread2);

QtConcurrent::run(this,&Lock::WriteThread2);

}

我先啟動WriteThread1,然後啟動兩個讀執行緒,最後啟動WriteThread2。執行結果如下,我們發現,WriteThread1執行完之後,先執行WriteThread2,最後才是兩個讀執行緒。驗證了寫執行緒比讀執行緒先獲得鎖。

QSemaphore

QSemaphore是提供一個計數的訊號量。訊號量是泛化的互斥量。一個訊號量只能鎖一次,但是我們可以多次獲得訊號量。訊號量可以用來同步保護一定數量的資源。

訊號量支援兩個基本是函式,acquire()和release():

acquire(n):嘗試獲取n個資源。 如果獲取不到足夠的資源,這個會一直鎖住直到可以獲取足夠的資源

release(n):釋放n個資源。

它們的原始碼實現也很簡單:

void QSemaphore::acquire(intn)

{

Q_ASSERT_X(n >= 0, "QSemaphore::acquire", "parameter 'n' must be non-negative");

QMutexLocker locker(&d->mutex);

while (n> d->avail)  //申請的資源n 大於可用資源avail則進入等待。

d->cond.wait(locker.mutex());

d->avail -= n;

}

void QSemaphore::release(intn)

{

Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative");

QMutexLocker locker(&d->mutex);

d->avail += n;

d->cond.wakeAll();

}

由於avail變數,實際就是一個int的計數變數 。所以我們在呼叫release()傳入的引數n大於訊號量初始值也沒關係,只是說明可用資源增加了。

例如以下程式碼:

int main(int argc, char *argv[])

{

QCoreApplication a(argc, argv);

QSemaphore sem(5);

sem.acquire(5);

cout<<"acquire(5);  "<<"remaindresource :"<<sem.available()<<endl;

sem.release(5);

cout<<"release(5)  "<<"remaindresource :"<<sem.available()<<endl;

sem.release(10);

cout<<"release(10)  "<<"remaindresource :"<<sem.available()<<endl;

sem.acquire(15);

cout<<"acquire(15);  "<<"remaindresource :"<<sem.available()<<endl;

returna.exec();

}

訊號量最著名的就是生產者與消費者的例子,以後再研究了。

QWaitCondition

QWaitCondition類提供了一個條件變數,它允許我們通知其他執行緒,等待的某些條件已經滿足。等待QWaitCondition變數的可以是一個或多個執行緒。當我們用wakeOne()通知其他執行緒時,系統會隨機的選中一個等待執行緒進行喚醒,讓它繼續執行。其實前面的訊號量和讀寫鎖內部實現都有用到QWaitCondition的。

下面我們來看這個類重要的幾個函式:

boolQWaitCondition::wait(QMutex*mutex,unsignedlongtime=ULONG_MAX )

該函式對mutex解鎖,然後等待。在呼叫這個函式之前,mutex必須是加鎖狀態。如果mutex沒有加鎖,則函式直接返回。如果mutex是可遞迴的,函式也直接返回。該函式對mutex解鎖,然後等待,直到以下條件之一滿足:

1.另外的執行緒呼叫wakeOne()或wakeAll(),則該函式會返回true。

2.時間過了Time毫秒。如果time為ULONG_MAX(預設),則將會一直等待不會超時。如果超時則返回false。

boolQWaitCondition::wait(QReadWriteLock*readWriteLock,unsignedlongtime=ULONG_MAX )

函式對readWriteLock解鎖並等待條件變數。在呼叫這個函式之前,readWriteLock必須是加鎖狀態的。如果不是加鎖狀態,則函式立即返回。readWriteLock必須不能是遞迴加鎖的,否則將不能正確的解鎖。返回的滿足條件跟上面的函式一樣。

轉自:https://www.cnblogs.com/xiangtingshen/p/11267523.html