1. 程式人生 > >Boost執行緒庫學習筆記

Boost執行緒庫學習筆記

一、建立一個執行緒

建立執行緒

boost::thread myThread(threadFun);

需要注意的是:引數可以是函式物件或者函式指標。並且這個函式無引數,並返回void型別。

當一個thread執行完成時,這個子執行緒就會消失。注意這個執行緒物件不會消失,它仍然是一個還處在它的生存期的C++物件。同理,當對一個堆上的執行緒物件的指標呼叫delete時候,執行緒物件被銷燬,作業系統的執行緒並不能保證就消失。

放棄時間片

 boost::thread::yield();
 當前執行緒放棄餘下的時間片

等待一個執行緒

 myThread.join();

呼叫這個方法的執行緒進入wait狀態,直到myThread代表的執行緒完成為止。如果它不結束的話,join方法就不會返回。join是一個等待子執行緒結束的最好的方法。如果主程式不呼叫join方法而直接結束,它的子執行緒有可能沒有執行完成,但是所有的子執行緒也隨之退出。不呼叫join方法,主執行緒就不會等待它的子執行緒。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/xtime.hpp>
 
struct MyThreadFunc {
   void operator( )( ) {
      // Do something long-running...
   }
} threadFun;
 
int main( ) {
 
   boost::thread myThread(threadFun); // Create a thread that starts
                                      // running threadFun
 
   boost::thread::yield( );           // Give up the main thread's timeslice
                                      // so the child thread can get some work
                                      // done.
 
   // Go do some other work...
 
   myThread.join( );                  // The current (i.e., main) thread will wait
                                      // for myThread to finish before it returns
 
}

執行緒組

如果你需要建立幾個執行緒,考慮使用一個執行緒組物件thread_group來組織它們。一個thread_group物件可以使用多種方法管理執行緒。首先,可以使用一個指向動態建立的執行緒物件的指標作為引數來呼叫add_thread方法,將這個執行緒加入執行緒組。也可以直接使用執行緒組類的create_thread方法,可不先建立執行緒而直接把執行緒加入到執行緒組中。

當執行緒組物件的解構函式被呼叫時,它將刪除(delete)所有這些通過add_thread方法加入的執行緒指標。所以,只能將堆上的執行緒物件指標通過add_thread方法加入執行緒組。remove_thread方法從執行緒組刪除某個執行緒的指標,但是我們仍需負責把執行緒本身記憶體釋放掉。

執行緒組物件的成員方法join_all方法等待執行緒組中所有執行緒結束,才返回

boost::thread_group grp;
boost::thread *p = new boost::thread(threadFun);
grp.add_thread(p);
//do something...
grp.remove_thread(p);
 
grp.create_thread(threadFun);
grp.create_thread(threadFun);   //Now there are two threads in grp
 
grp.join_all();                 //Wait for all threads to finish
 

二、使資源是執行緒安全的

保證同一時刻多個執行緒不會同時修改同一個共享資源,那麼這個程式是執行緒安全的,或者是序列化訪問資源的。可以使用mutex類來控制執行緒的併發問題。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <string>
 
// A simple queue class; don't do this, use std::queue
template<typename T>
class Queue {
public:
   Queue( ) {}
  ~Queue( ) {}
 
   void enqueue(const T& x) {
      // Lock the mutex for this queue
      boost::mutex::scoped_lock lock(mutex_);
      list_.push_back(x);
      // A scoped_lock is automatically destroyed (and thus unlocked)
      // when it goes out of scope
   } 
 
   T dequeue( ) {
      boost::mutex::scoped_lock lock(mutex_);
 
      if (list_.empty( ))
         throw "empty!";     // This leaves the current scope, so the
      T tmp = list_.front( ); // lock is released
      list_.pop_front( );
      return(tmp);
   } // Again: when scope ends, mutex_ is unlocked
 
private:
   std::list<T> list_;
   boost::mutex mutex_;
};
 
Queue<std::string> queueOfStrings;
 
void sendSomething( ) {
   std::string s;
   for (int i = 0; i < 10; ++i) {
      queueOfStrings.enqueue("Cyrus");
   }
}
 
void recvSomething( ) {
   std::string s;
 
   for (int i = 0; i < 10; ++i) {
      try {s = queueOfStrings.dequeue( );}
      catch(...) {}
   }
}
 
int main( ) {
   boost::thread thr1(sendSomething);
   boost::thread thr2(recvSomething);
 
   thr1.join( );
   thr2.join( );
}

mutex物件本身並不知道它代表什麼,它僅僅是被多個消費者執行緒使用的資源訪問的鎖定解鎖標誌。在某個時刻,只有一個執行緒可以鎖定這個mutex物件,這就阻止了同一時刻有多個執行緒併發訪問共享資源。一個mutex就是一個簡單的訊號機制。

給mutex加解鎖有多種策略,最簡單的是使用scoped_lock類,它使用一個mutex引數來構造,並一直鎖定這個mutex直到物件被銷燬。如果這個正在被構造的mutex已經被別的執行緒鎖定的話,當前執行緒就會進入wait狀態,直到這個鎖被解開。

三、讀寫鎖

mutex有一個美中不足,它不區分讀和寫。執行緒如果只是進行讀操作,mutex強制執行緒序列化訪問資源,效率低。而且這種操作不需要排他性訪問。基於這個原因,Boost執行緒庫提供了read_write_mutex。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/read_write_mutex.hpp>
#include <string>
 
template<typename T>
class Queue {
public:
   Queue( ) :  // Use a read/write mutex and give writers priority
      rwMutex_(boost::read_write_scheduling_policy::writer_priority){}
  ~Queue( ) {}
 
   void enqueue(const T& x) {
      // Use a r/w lock since enqueue updates the state
      boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);
      list_.push_back(x);
   } 
 
   T dequeue( ) {
      // Again, use a write lock
      boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);
 
      if (list_.empty( ))
         throw "empty!";
      T tmp = list_.front( );
      list_.pop_front( );
      return(tmp);
   }
 
   T getFront( ) {
      // This is a read-only operation, so you only need a read lock
      boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);
      if (list_.empty( ))
         throw "empty!";
      return(list_.front( ));
   }
 
private:
   std::list<T> list_;
   boost::read_write_mutex rwMutex_;
};
 
Queue<std::string> queueOfStrings;
 
void sendSomething( ) {
   std::string s;
 
   for (int i = 0; i < 10; ++i) {
      queueOfStrings.enqueue("Cyrus");
   }
}
 
void checkTheFront( ) {
   std::string s;
 
   for (int i = 0; i < 10; ++i) {
      try {s = queueOfStrings.getFront( );}
      catch(...) {}
   }
}
 
int main( ) {
 
   boost::thread thr1(sendSomething);
   boost::thread_group grp;
 
   grp.create_thread(checkTheFront);
   grp.create_thread(checkTheFront);
   grp.create_thread(checkTheFront);
   grp.create_thread(checkTheFront);
 
   thr1.join( );
   grp.join_all( );
}

注意Queue的建構函式中隊讀寫鎖rwMutex的初始化。同一時刻,可能有多個讀寫執行緒要鎖定一個read_write_mutex,而這些鎖的排程策略依賴於構造這個mutex時選定的排程策略。Boost庫中提供了四種排程策略:

1)reader_priority:等待讀鎖的執行緒優先於等待寫鎖的執行緒

2)writer_priority:等待寫鎖的執行緒優先於等待讀鎖的執行緒

3)alternating_single_read:在讀鎖和寫鎖之間交替

4)alternating_many_reads:在讀鎖和寫鎖之間交替,這個策略將在兩個寫鎖之間使得所有的在這個queue上掛起的讀鎖都被允許。

選擇使用哪種策略要慎重,因為使用前兩種的話可能會導致某些鎖始終不能成功,出現餓死的現象。

死鎖、餓死和競態條件

1)死鎖,是涉及至少2個執行緒和2個資源的情況。執行緒A和B,資源X和Y。A鎖定了X,而B鎖定了Y。此時A和B有彼此想要對方的資源,死鎖就出現了。
死鎖的預防有兩種方法。一種是,通過小心的按照一定的順序對不同的mutex來加鎖。另一種是,使用Boost提供的try_mutex互斥量和scoped_try_lock。或者使用時間鎖。scoped_try_lock對try_mutex加鎖時,可能成功,也可能失敗,但不會阻塞。時間鎖則有一個超時時間。

bool dequeue(T& x)
{
    boost::try_mutex::scope_try_lock lock(tryMutex_);
    if(!lock.locked())
        return false;
    else{
        if (list_.empty())
            throw "empty!";
        x = list_.front();
        list_.pop_front();
        return true;
    }
}
private:
    boost::try_mutex tryMutex_;

2)餓死,如果你正在使用write_priority策略,並且你有很多建立寫鎖的執行緒,那麼讀鎖的執行緒就可能餓死。

3)競態條件,

if(q.getFront() == "Cyrus"){
   str = q.dequeue();
   //....
}

這個程式碼在單執行緒環境中工作很好,因為q在第一行和第二行程式碼之間不會被修改。多執行緒環境中則會出現問題。此為競態條件。解決的方法是為Queue新增一個成員函式dequeueIfEquals,在函式執行過程中始終鎖定互斥量。

四、從一個執行緒中給另一個執行緒傳送通知

當需要執行緒等待某個事物時,可以建立一個condition物件,然後通過這個物件來通知那些等待的執行緒。

#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
#include <list>
#include <string>
 
class Request { /*...*/ };


#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
#include <list>
#include <string>
 
class Request { /*...*/ };
 
// A simple job queue class; don't do this, use std::queue
template<typename T>
class JobQueue {
public:
   JobQueue( ) {}
  ~JobQueue( ) {}
 
   void submitJob(const T& x) {
      boost::mutex::scoped_lock lock(mutex_);
      list_.push_back(x);
      workToBeDone_.notify_one( );
   } 
 
   T getJob( ) {
      boost::mutex::scoped_lock lock(mutex_);
 
      workToBeDone_.wait(lock); // Wait until this condition is
                                // satisfied, then lock the mutex
      T tmp = list_.front( );
      list_.pop_front( );
      return(tmp);
   }
 
private:
   std::list<T> list_;
   boost::mutex mutex_;
   boost::condition workToBeDone_;
};

JobQueue<Request> myJobQueue;
 
void boss( ) {
   for (;;) {
      // Get the request from somewhere
      Request req;
      myJobQueue.submitJob(req);
   }
}
 
void worker( ) {
   for (;;) {
      Request r(myJobQueue.getJob( ));
      // Do something with the job...
   }
}
 int main( ) {
   boost::thread thr1(boss);
   boost::thread thr2(worker);
   boost::thread thr3(worker);
 
   thr1.join( );
   thr2.join( );
   thr3.join( );
}

boost::mutex::scoped_lock lock(mutex_);

workToBeDone_.wait(lock);

這兩行程式碼,第一行鎖定這個mutex物件。第二行程式碼解開這個mutex上的鎖,然後進行等待或者休眠,直到它的條件得到了滿足。這個mutex互斥物件的解鎖讓其他的執行緒能夠使用這個mutex物件,它們中的某個需要設定這個等待條件,之後通知另外的執行緒。

notify_all函式,通知那些所有正在等待某個條件變為真的執行緒,那些執行緒隨後進入執行狀態。wait方法做兩件事情:它一直等待直到有人在它正等待的condition上呼叫notify_one或notify_all,然後它就試圖鎖定相關的mutex。當呼叫的是notify_all時,儘管多個等待的執行緒都儘量去獲得下一個鎖,但誰將獲得依賴於這個mutex的型別和使用的優先策略。

一個condition物件能讓消費者執行緒休眠,因此在還沒有碰到一個condition時處理器可以去處理別的事情。例如一個web伺服器使用一個工作執行緒池來處理進來的請求。當沒有需求進來時,讓這些子執行緒處於等待狀態比讓它們迴圈的查詢或者睡眠然後偶爾喚醒來檢查這個佇列,要好很多。

五、只初始化一次共享資源