1. 程式人生 > >QT多執行緒詳解

QT多執行緒詳解

一、Qt執行緒類

Qt 提供了一些執行緒相關的類:
QThread 提供了開始一個新執行緒的方法
QThreadStorage 提供逐執行緒資料儲存
QMutex   提供相互排斥的鎖,或互斥量
QMutexLocker 是一個便利類,它可以自動對QMutex 加鎖與解鎖
QReadWriterLock 提供了一個可以同時讀操作的鎖
QReadLocker 與QWriteLocker 是便利類,它自動對QReadWriteLock 加鎖與解鎖
QSemaphore 提供了一個整型訊號量,是互斥量的泛化
QWaitCondition 提供了一種方法,使得執行緒可以在被另外執行緒喚醒之前一直休眠。

二、Qt執行緒的建立

   Qt執行緒中有一個公共的抽象類,所有的執行緒都是從這個QThread抽象類中派生的,要實現QThread中的純虛擬函式run(),run()函式是通過start()函式來實現呼叫的。

class MyThread : public QThread 
{

public :
	virtual   void  run();

	void  MyThread::run()
	{
		for (  int  count  =   0 ; count  <   20 ; count ++  ) {
			sleep(  1  );
			qDebug(  " Ping! "  );
		}
	}
}

int  main()
{
	MyThread a;
	MyThread b;

	a.start(); // 自動呼叫run(),否則即使該執行緒建立,也是一開始就掛起 
	b.start();
	// 要等待執行緒a,b都退出 
	a.wait();
	b.wait();

}

三、Qt執行緒同步

1. QMutex

QMutex ( bool recursive = FALSE )

void lock ()   //試圖鎖定互斥量。如果另一個執行緒已經鎖定這個互斥量,那麼這次呼叫將阻塞 直到那個執行緒把它解鎖。

bool tryLock () //如果另一個程序已經鎖定了這個互斥量,這個函式返回假,而不是一直等到這個鎖可用為止,比如,它不是阻塞的。

QMutex mutex;
void someMethod()
{
	mutex. lock ();
	qDebug( " Hello " );
	qDebug( " World " );
	mutex.unlock();
}

// 用Java的術語,這段程式碼應該是: 
void someMethod()
{
	synchronized {
		qDebug( " Hello " );
		qDebug( " World " );
	}

}

    不過在Qt中我們可用通過另一個類來簡化這種應用,因為如果使用QMutex.lock()而沒有對應的使用QMutex.unlcok()的話

就會造成死鎖,別的執行緒永遠也得不到接觸該mutex鎖住的共享資源的機會。儘管可以不使用lock()而使用tryLock(timeout)

來避免因為死等而造成的死鎖( tryLock(負值)==lock()),但是還是很有可能造成錯誤。

   對於上述的情況MFC中用CSingleLock 或 MultiLock,Boost中用boost::mutex::scoped_lock來進行解決,而在Qt中用QMutexLocker來進行解決。下面是沒有采用 QMutexLocker的例子和採用 QMutexLocker的方案。

2. QMutexLocker

 this complex function locks a QMutex upon entering the function and unlocks the mutex at all the exit points

int  complexFunction( int  flag)
{
	mutex. lock ();

	int  retVal  =   0 ;

	switch  (flag)
	{
	case   0 :
	case   1 :
		mutex.unlock();
		return  moreComplexFunction(flag);
	case   2 :
		{
			int  status  =  anotherFunction();
			if  (status  <   0 ) {
				mutex.unlock();
				return   - 2 ;
			}
			retVal  =  status  +  flag;
		}
		break ;
	default :
		if  (flag  >   10 ) {
			mutex.unlock();
			return   - 1 ;
			break ;
		}

		mutex.unlock();
		return  retVal;
	} 
}

This example increases the likelihood that errors will occur.Using QMutexLocker greatly simplifies the code, and makes it more readable:
int  complexFunction( int  flag)
{
	QMutexLocker locker( & mutex);

	int  retVal  =   0 ;

	switch  (flag) {
	case   0 :
	case   1 :
		return  moreComplexFunction(flag);
	case   2 :
		{
			int  status  =  anotherFunction();
			if  (status  <   0 )
				return   - 2 ;
			retVal  =  status  +  flag;
		}
		break ;
	default :
		if  (flag  >   10 )
			return   - 1 ;
		break ;
	}

	return  retVal;
} 
Now, the mutex will always be unlocked when the QMutexLocker object is destroyed (when the function returns since locker is an auto variable) . 

3. QReadWriteLock

      用mutex進行執行緒同步有一個問題就是mutex只允許某個時刻只允許一個執行緒對共享資源進行訪問,如果同時有多個執行緒對共享資源進行讀訪問,而只有一個寫操作執行緒,那麼在這種情況下如果採用mutex就成為程式執行效能的瓶頸了。在這種情況下Qt下采用QReadWriteLock來實現多個執行緒讀,一個執行緒寫。寫執行緒執行的時候會阻塞所有的讀執行緒,而讀執行緒之間的執行不需要進行同步。

MyData data;
QReadWriteLock  lock;

void  ReaderThread::run()
{

	lock .lockForRead();
	access_data_without_modifying_it( & data);
	lock .unlock();
}

void  WriterThread::run()
{

	lock .lockForWrite();
	modify_data( & data);
	lock .unlock();
}
QReadWriterLock 與QMutex相似,除了它對 "read","write"訪問進行區別對待。它使得多個讀者可以共時訪問資料。使用QReadWriteLock而不是QMutex,可以使得多執行緒程式更具有併發性。

4. QReadLocker和QWriteLocker

 對於QMutex有QMutexLocker來簡化使用,而對於QReadWriteLock有QReadLocker  QWriteLocker。

Here's an example that uses QReadLocker to lock and unlock a read-write lock for reading: 

QReadWriteLock lock ;
QByteArray readData()
{
	QReadLocker locker(&lock); 

	return  data;
} 

QReadWriteLock  lock ;
QByteArray readData()
{
	lock.lockForRead(); 

	lock.unlock(); 
	return  data;
} 

  QSemaphore 是QMutex 的一般化,它可以保護一定數量的相同資源,與此相對,一個mutex只保護一個資源。下面例子中,使用QSemaphore 來控制對環狀緩衝區 的訪問,此緩衝區被生產者執行緒和消費者執行緒共享。生產者不斷向緩衝寫入資料直到緩衝末端,消費者從緩衝不斷從緩衝頭部 讀取資料。

    訊號量比互斥量有更好的併發性 ,假如我們用互斥量來控制對緩衝的訪問,那麼生產者,消費者不能同時訪問緩衝 。然而,我們知道在同一時刻,不同執行緒訪問緩衝的不同部分並沒有什麼危害。

      QSemaphore semaphore(1); |     QMutex mutex;

      Qsemaphore.acquire();         |     Qmutex.lock();

      Qsemaphore.release();         |     Qmutex.unlock();

     Public Functions

 Semaphores support two fundamental operations, 

  • acquire(n) tries to acquire n resources. If there aren't that many resources available, the call will block until this is the case.
  • release(n ) releases n resources.
  • tryAcquire () returns immediately if it cannot acquire the resources
  • available () returns the number of available resources at any time.

 Example:

 QSemaphore sem( 5 );       //  sem.available() == 5
 sem.acquire(
 3 );          //  sem.available() == 2  sem.acquire( 2 );          //  sem.available() == 0  sem.release( 5 );          //  sem.available() == 5  sem.release( 5 );          //  sem.available() == 10
 sem.tryAcquire(
 1 );       //  sem.available() == 9, returns true  sem.tryAcquire( 250 );     //  sem.available() == 9, returns false 

     生產者執行緒寫資料到buffer直到緩衝末端,然後重新從buffer的頭部開始寫。

     顯然producer執行緒和consumer執行緒是需要進行同步的,If the producer generates the data too fast, it will overwrite data that the consumer hasn't yet read; if the consumer reads the data too fast, it will pass the producer and read garbage.   

 A crude way to solve this problem is to have the producer fill the buffer, then wait until the consumer has read the entire buffer, and so on. 顯然這樣做效率是比較低的。

const   int  DataSize  =   100000 ;
const   int  BufferSize  =   8192 ;
char  buffer[BufferSize];

// When the application starts, the reader thread will start  
// acquiring "free" bytes and convert them into "used" bytes
QSemaphore freeBytes(BufferSize);    // producer執行緒在此區域寫入資料 ,初始資源數量為BufferSize
QSemaphore usedBytes;                  // consumer執行緒讀取此區域的資料,初始資源數量為0

// For this example, each byte counts as one resource.
// In a real-world application, we would probably operate on larger
// units (for example, 64 or 256 bytes at a time) 

class  Producer :  public  QThread
{
public :
	void  run();
};

//生產者每acquire一次就,使用掉Buffer個資源中的一個,而寫入的字元存入到buffer陣列中
//從而消費者可用讀取字元,從而消費者獲取一個資

void  Producer::run()
{
	//qsrand(QTime(0,0,0).secsTo(QTime::currentTime())); 
	for  ( int  i  =   0 ; i  <  DataSize;  ++ i) {
		freeBytes.acquire ();
		buffer[i  %  BufferSize]  =   " ACGT " [( int )qrand()  %   4 ];
		usedBytes.release ();
	}
}

class  Consumer :  public  QThread
{
public :
	void  run();
};

void  Consumer::run()
{
	for  ( int  i  =   0 ; i  <  DataSize;  ++ i) {
		usedBytes.acquire ();
		fprintf(stderr,  " %c " , buffer[i  %  BufferSize]);
		freeBytes.release ();
	}
	fprintf(stderr,  " /n " );
}

// Finally, in main(), we start the producer and consumer threads. 
// What happens then is that the producer converts some "free" space
// into "used" space, and the consumer can then convert it back to  // "free" space.

int  main( int  argc,  char   * argv[])
{
	QCoreApplication app(argc, argv);
	Producer producer;
	Consumer consumer;
	producer.start();
	consumer.start();
	producer.wait();
	consumer.wait();
	return   0 ;
}

  producer的run函式:

   當producer執行緒執行run函式,如果buffer中已經滿了,而沒有consumer執行緒沒有讀,這樣producer就不能再往buffer

中寫字元。此時在 freeBytes.acquire 處就阻塞直到 consumer執行緒讀(consume)資料。一旦producer獲取到一個位元組(資源)

就寫如一個隨機的字元,並呼叫 usedBytes.release 從而 consumer執行緒獲取一個資源可以讀一個位元組的資料了。

  consumer的run函式:

    當consumer執行緒執行run函式,如果buffer中沒有資料,就是資源=0,則consumer執行緒在此處阻塞。直到producer執行緒執行

寫操作,寫入一個位元組,並執行usedBytes.release 從而使得consumer執行緒的可用資源數=1。則consumer執行緒從阻塞狀態中退出,

並將 usedBytes 資源數-1,當前資源數=0。

5. QWaitCondition

  • bool wait ( QMutex * mutex, unsigned long time = ULONG_MAX )
  • Public function:

    bool QWaitCondition::wait ( QMutex  * mutex, unsigned long time = ULONG_MAX )

     1) 釋放鎖定的mutex

     2) 線上程物件上等待

    mutex必須由呼叫執行緒進行初鎖定 。注意呼叫wait的話,會自動呼叫unlock解鎖之前鎖住的資源,不然會造成死鎖。

    執行緒1等待執行緒2來改變共享資源,從而達到一定的條件然後發出訊號,使得執行緒1從wait中的阻塞狀態中被喚醒。

    但是執行緒2想改變資源,卻無法辦到,因為執行緒1呼叫lock之後就在wait中blocking,了但是沒有及時的unlock,那麼這就

    構成了死鎖的條件。所以說wait函式除了使呼叫執行緒切換到核心態之外,還自動unlock(&mutex)

    mutex 將被解鎖,並且呼叫執行緒將會阻塞,直到下列條件之一滿足時才醒來:

    • 另一個執行緒使用wakeOne ()或wakeAll ()傳輸訊號給它。在這種情況下,這個函式將返回真。
    • time 毫秒過去了。如果time 為ULONG_MAX(預設值),那麼這個等待將永遠不會超時(這個事件必須被傳輸)。如果等待的事件超時,這個函式將會返回假互斥量將以同樣的鎖定狀態返回。這個函式提供的是允許從鎖定狀態到等待狀態的原子轉換。

     void QWaitCondition::wakeAll ()

    這將會喚醒所有等待QWaitCondition的執行緒。這些執行緒被喚醒的順序依賴於操組系統的排程策略,並且不能被控制或預知。

    void  QWaitCondition::wakeOne ()

    這將會喚醒所有等待QWaitCondition的執行緒中的一個執行緒。這個被喚醒的執行緒依賴於操組系統的排程策略,並且不能被控制或預知。

    假定每次使用者按下一個鍵,我們有三個任務要同時執行,每個任務都可以放到一個執行緒中,每個執行緒的run()都應該是這樣:

QWaitCondition key_pressed;

  
 for  (;;) {
     key_pressed.wait(); 
 //  這是一個QWaitCondition全域性變數
     
 //  鍵被按下,做一些有趣的事      do_something();
  }

 或是這樣:

forever {
     mutex.
 lock ();
     keyPressed.wait(
 & mutex);
     do_something();
     mutex.unlock ();
 } 

第四個執行緒回去讀鍵按下並且每當它接收到一個的時候喚醒其它三個執行緒,就像這樣:

 QWaitCondition key_pressed;

  
 for  (;;) {
     getchar();
     
 //  在key_pressed中導致引起任何一個執行緒。wait()將會從這個方法中返回並繼續執行      key_pressed.wakeAll();
  }

    注意這三個執行緒被喚醒的順序是未定義的,並且當鍵被按下時,這些執行緒中的一個或多個還在do_something(),它們將不會被喚醒(因為它們現在沒有等待條件變數) 並且這個任務也就不會針對這次按鍵執行操作。這種情況是可以避免得,比如,就像下面這樣做:

QMutex mymutex;
QWaitCondition key_pressed;
int  mycount = 0 ;

// Worker執行緒程式碼 
for  (;;) {

	key_pressed.wait () ;  //  這是一個QWaitCondition全域性變數

	//keyPressed.wait(&mutex) ;
	mymutex. lock ();
	mycount ++ ;

	mymutex.unlock ();

	do_something ();

	mymutex. lock ();
	mycount -- ;
	mymutex.unlock ();
}

//  讀取按鍵執行緒程式碼 
for  (;;) {
	getchar();
	mymutex. lock ();
	//  睡眠,直到沒有忙碌的工作執行緒才醒來。 count==0說明沒有Worker執行緒在do something
	while ( count  >   0  ) {
		mymutex.unlock ();
		sleep(  1  );
		mymutex. lock ();
	}
	mymutex.unlock ();
	key_pressed.wake All () ;
}

 應用條件變數對前面用訊號量進行保護的環狀緩衝區的例子進行改進:

   下面的例子中:

     1)生產者首先必須檢查緩衝是否已滿(numUsedBytes==BufferSize),如果是,執行緒停下來等待bufferNotFull條件。如果不是,在緩衝中生產資料,增加numUsedBytes,啟用條件 bufferNotEmpty。

     2)使用mutex來保護對numUsedBytes的訪問。

     另外,QWaitCondition::wait ()接收一個mutex作為引數,這個mutex應該被呼叫執行緒初始化為鎖定狀態。線上程進入休眠狀態之前,mutex會被解鎖。而當執行緒被喚醒時,mutex會再次處於鎖定狀態。

     而且,從鎖定狀態到等待狀態的轉換是原子操作,這阻止了競爭條件的產生。當程式開始執行時,只有生產者可以工作。消費者被阻塞等待 bufferNotEmpty條件,一旦生產者在緩衝中放入一個位元組,bufferNotEmpty條件被激發,消費者執行緒於是被喚醒。

const   int  DataSize  =   100000 ;
const   int  BufferSize  =   8192 ;
char  buffer[BufferSize];

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;

QMutex mutex;


int  numUsedBytes  =   0 ;

class  Producer :  public  QThread
{
public :
	void  run();
};

void  Producer::run()
{
	qsrand(QTime( 0 , 0 , 0 ).secsTo(QTime::currentTime()));

	for  ( int  i  =   0 ; i  <  DataSize;  ++ i) {

		mutex. lock ();

		//producer執行緒首先檢查緩衝區是否已滿

		if  (numUsedBytes  ==  BufferSize)//緩衝區已滿,等待consumer來減少numUsedBytes

			// bufferNotFull.wait(&mutex)先呼叫 mutex.unlock ()然後收到訊號時呼叫 mutex. lock ()
			bufferNotFull.wait( & mutex);//緩衝區已滿等待bufferNotFull的條件變數成立變為有訊號 
		mutex.unlock ();

		buffer[i  %  BufferSize]  =   " ACGT " [( int )qrand()  %   4 ];

		mutex. lock ();
		++ numUsedBytes; //producer用掉一個Bytes,表示producer寫入buffer中的位元組數 
		bufferNotEmpty.wakeAll();
		mutex.unlock ();
	}
}

class  Consumer :  public  QThread
{
public :
	void  run();
};

void  Consumer::run()
{
	for  ( int  i  =   0 ; i  <  DataSize;  ++ i) {
		mutex. lock ();
		if  (numUsedBytes  ==   0 )
			bufferNotEmpty.wait( & mutex);
		mutex.unlock ();

		fprintf(stderr,  " %c " , buffer[i  %  BufferSize]);

		mutex. lock ();
		-- numUsedBytes;
		bufferNotFull.wakeAll();
		mutex.unlock ();
	}
	fprintf(stderr,  " /n " );
}

int  main( int  argc,  char   * argv[])
{
	QCoreApplication app(argc, argv);
	Producer producer;
	Consumer consumer;
	producer.start();
	consumer.start();
	producer.wait();
	consumer.wait();
	return   0 ;
} 
另外一個例子:
#include  <qapplication.h> 
#include  <qpushbutton.h> 

//  全域性條件變數 
QWaitCondition mycond;

//  Worker類實現 
class  Worker :  public  QPushButton,  public  QThread
{
	Q_OBJECT

public :
	Worker(QWidget  * parent  =   0 ,  const   char   * name  =   0 )
		: QPushButton(parent, name)
	{
		setText( " Start Working " );

		//  連線從QPushButton繼承來的訊號和我們的slotClicked()方法 
		connect( this , SIGNAL(clicked()), SLOT(slotClicked()));

		//  呼叫從QThread繼承來的start()方法……這將立即開始執行緒的執行 
		QThread::start();
	}

	public  slots:
		void  slotClicked()
		{
			//  喚醒等待這個條件變數的一個執行緒 
			mycond.wakeOne ();
		}

protected :
	void  run()
	{
		//  這個方法將被新建立的執行緒呼叫…… 

		while  ( TRUE ) {
			//  鎖定應用程式互斥鎖,並且設定視窗標題來表明我們正在等待開始工作 
			qApp -> lock ();
			setCaption(  " Waiting "  );
			qApp -> unlock ();

			//  等待直到我們被告知可以繼續 
			mycond.wait ();

			//  如果我們到了這裡,我們已經被另一個執行緒喚醒……讓我們來設定標題來表明我們正在工作 
			qApp -> lock ();
			setCaption(  " Working! "  );
			qApp -> unlock ();

			//  這可能會佔用一些時間,幾秒、幾分鐘或者幾小時等等,因為這個一個和GUI執行緒分開的執行緒,在處理事件時,GUI執行緒不會停下來…… 
			do_complicated_thing();
		}
	}
};

//  主執行緒——所有的GUI事件都由這個執行緒處理。 
int  main(  int  argc,  char   ** argv )
{
	QApplication app( argc, argv );

	//  建立一個worker……當我們這樣做的時候,這個worker將在一個執行緒中執行 
	Worker firstworker(  0 ,  " worker "  );

	app.setMainWidget(  & worker );
	worker.show();

	return  app.exec();
}

6. 執行緒安全

非執行緒安全類

       這個類不是執行緒安全的,因為假如多個執行緒都試圖修改資料成員 n,結果未定義。這是因為c++中的++和--操作符不是原子操作。實際上,它們會被擴充套件為三個機器指令:
1. 把變數值裝入暫存器
2. 增加或減少暫存器中的值
3. 把暫存器中的值寫回記憶體 

     假如執行緒A與B同時裝載變數的舊值,在暫存器中增值,回寫。他們寫操作重疊了,導致變數值僅增加了一次。很明顯,訪問應該序列化:A執行123步驟時不應被打斷。

class  Counter
{
public :
	Counter() {n = 0 ;}
	void  increment() { ++ n;}
	void  decrement() { -- n;}
	int  value()  const  { return  n;}
private :
	int  n;
};
	 

執行緒安全類

class  Counter
{
public :
	Counter() { n  =   0 ; }

	void  increment() { QMutexLocker locker( & mutex);  ++ n; }
	void  decrement() { QMutexLocker locker( & mutex);  -- n; }
	int  value()  const  { QMutexLocker locker( & mutex);  return  n; }

private :
	mutable  QMutex mutex;
	int  n;
};

   QMutexLocker類在建構函式中自動對mutex進行加鎖,在解構函式中進行解鎖。隨便一提的是,mutex使用了mutable 關鍵字來修飾,因為我們在value()函式中對mutex進行加鎖與解鎖操作,而value ()是一個const 函式。