1. 程式人生 > >boost::condition_variable 設計c++ 生產者消費者佇列

boost::condition_variable 設計c++ 生產者消費者佇列

boost::condition_variable 用法:

當執行緒間的共享資料發生變化的時候,可以通過condition_variable來通知其他的執行緒。消費者wait 直到生產者通知其狀態發生改變,Condition_variable是使用方法如下:

·當持有鎖之後,執行緒呼叫wait

·wait解開持有的互斥鎖(mutex),阻塞本執行緒,並將自己加入到喚醒佇列中

·當收到通知(notification),該執行緒從阻塞中恢復,並加入互斥鎖佇列(mutex queue)

 執行緒被喚醒之後繼續持有鎖執行。

以下一個例子轉自:

?
template<typenameData>  class
concurrent_queue  {  private:  std::queue<Data> the_queue;  mutableboost::mutex the_mutex;  boost::condition_variable the_condition_variable;  public:  voidpush(Data const& data)  {  boost::mutex::scoped_lock lock(the_mutex);  the_queue.push(data);  lock.unlock();  the_condition_variable.notify_one();  
}  boolempty() const  {  boost::mutex::scoped_lock lock(the_mutex);  returnthe_queue.empty();  }  booltry_pop(Data& popped_value)  {  boost::mutex::scoped_lock lock(the_mutex);  if(the_queue.empty())  {  returnfalse;  }  popped_value=the_queue.front();  the_queue.pop();  returntrue;  }  voidwait_and_pop(Data& popped_value)  
{  boost::mutex::scoped_lock lock(the_mutex);  while(the_queue.empty())  {  the_condition_variable.wait(lock);  }  popped_value=the_queue.front();  the_queue.pop();  }  };

Boost多執行緒程式設計

背景

今天網際網路應用服務程式普遍使用多執行緒來提高與多客戶連結時的效率;為了達到最大的吞吐量,事務伺服器在單獨的執行緒上執行服務程式;

GUI應用程式將那些費時,複雜的處理以執行緒的形式單獨執行,以此來保證使用者介面能夠及時響應使用者的操作。這樣使用多執行緒的例子還有很多。

跨平臺

建立執行緒

標頭檔案 <boost/thread/thread.hpp>

namespace boost {

 class thread;

 class thread_group;

}

•thread():構造一個表示當前執行執行緒的執行緒物件

•explicit thread(const boost::function0<void>& threadfunc)

注:boost::function0<void>可以簡單看為:一個無返回(返回void),無引數的函式。這裡的函式也可以是類過載operator()構成的函式。

第一種方式:最簡單方法

•#include <boost/thread/thread.hpp>

•#include <iostream>

• 

•void hello()

•{

•std::cout <<

•"Hello world, I''m a thread!"

•<< std::endl;

•}

• 

•int main(int argc, char* argv[])

•{

•boost::thread thrd(&hello);

•thrd.join();

•return 0;

•}

第二種方式:複雜型別物件作為引數來建立執行緒

•#include <boost/thread/thread.hpp>

•#include <boost/thread/mutex.hpp>

•#include <iostream>

• 

•boost::mutex io_mutex;

• 

•struct count

•{

•count(int id) : id(id) { }

•void operator()()

•{

•for (int i = 0; i < 10; ++i)

•{

•boost::mutex::scoped_lock

•lock(io_mutex);

•std::cout << id << ": "

•<< i << std::endl;

•}

•}

•int id;

•};

• 

•int main(int argc, char* argv[])

•{

•boost::thread thrd1(count(1));

•boost::thread thrd2(count(2));

•thrd1.join();

•thrd2.join();

•return 0;

•}

第三種方式:在類內部建立執行緒

1)類內部靜態方法啟動執行緒

•#include <boost/thread/thread.hpp>

•#include <iostream>

•class HelloWorld

•{

•public:

•static void hello()

•{

•std::cout <<

•"Hello world, I''m a thread!"

•<< std::endl;

•}

•static void start()

•{

• 

• boost::thread thrd( hello );

• thrd.join();

•}

•};

•int main(int argc, char* argv[])

•{

•HelloWorld::start();

•return 0;

•}

在這裡start()hello()方法都必須是static方法。

2)如果要求start()hello()方法不能是靜態方法則採用下面的方法建立執行緒:

•#include <boost/thread/thread.hpp>

•#include <boost/bind.hpp>

•#include <iostream>

•class HelloWorld

•{

•public:

•void hello()

•{

•std::cout <<

•"Hello world, I''m a thread!"

•<< std::endl;

•}

•void start()

•{

• boost::function0< void> f = boost::bind(&HelloWorld::hello,this);

• boost::thread thrd( f );

• thrd.join();

•}

•};

•int main(int argc, char* argv[])

•{

•HelloWorld hello;

•hello.start();

•return 0;

•}

3)在Singleton模式內部建立執行緒:

•#include <boost/thread/thread.hpp>

•#include <boost/bind.hpp>

•#include <iostream>

•class HelloWorld

•{

•public:

•void hello()

•{

•std::cout <<

•"Hello world, I''m a thread!"

•<< std::endl;

•}

•static void start()

•{

• boost::thread thrd( boost::bind 

•(&HelloWorld::hello,&HelloWorld::getInstance() ) ) ;

• thrd.join();

•}

•static HelloWorld& getInstance()

•{

• if ( !instance )

•instance = new HelloWorld;

• return *instance;

•}

•private:

•HelloWorld(){}

•static HelloWorld* instance;

•};

•HelloWorld* HelloWorld::instance = 0;

•int main(int argc, char* argv[])

•{

•HelloWorld::start();

•return 0;

•}

第四種方法:用類內部函式在類外部建立執行緒

•#include <boost/thread/thread.hpp>

•#include <boost/bind.hpp>

•#include <string>

•#include <iostream>

•class HelloWorld

•{

•public:

•void hello(const std::string& str)

•{

•std::cout <<str<< std::endl;

•}

•};

• 

•int main(int argc, char* argv[])

•{

•HelloWorld obj;

•boost::thread thrd( boost::bind(&HelloWorld::hello,&obj,"Hello

•world, I''m a thread!" ) ) ;

•thrd.join();

•return 0;

•}

如果執行緒需要繫結的函式有引數則需要使用boost::bind。比如想使用 boost::thread建立一個執行緒來執行函式:void f(int i)

如果這樣寫:boost::thread thrd(f)是不對的,因為thread建構函式宣告接受的是一個沒有引數且返回型別為void的型別,而且

不提供引數i的值f也無法執行,這時就可以寫:boost::thread thrd(boost::bind(f,1))。涉及到有參函式的繫結問題基本上都

boost::threadboost::functionboost::bind結合起來使用。

互斥體

一個互斥體一次只允許一個執行緒訪問共享區。當一個執行緒想要訪問共享區時,首先要做的就是鎖住(lock)互斥體。

•Boost執行緒庫支援兩大類互斥體,包括簡單互斥體(simple mutex)和遞迴互斥體(recursive mutex)

有了遞迴互斥體,單個執行緒就可以對互斥體多次上鎖,當然也必須解鎖同樣次數來保證其他執行緒可以對這個互斥體上鎖。

•Boost執行緒庫提供的互斥體型別:

boost::mutex,

boost::try_mutex,

boost::timed_mutex,

boost::recursive_mutex,

boost::recursive_try_mutex,

boost::recursive_timed_mutex,

boost::shared_mutex

•mutex類採用Scope Lock模式實現互斥體的上鎖和解鎖。即建構函式對互斥體加鎖,解構函式對互斥體解鎖。

對應現有的幾個mutex匯入了scoped_lock,scoped_try_lock,scoped_timed_lock.

•scoped系列的特色就是析構時解鎖,預設構造時加鎖,這就很好的確定在某個作用域下某執行緒獨佔某段程式碼。

mutex+scoped_lock

•#include <boost/thread/thread.hpp>

•#include <boost/thread/mutex.hpp>

•#include <boost/bind.hpp>

•#include <iostream>

•boost::mutexio_mutex;

•void count(int id)

•{

•for (int i = 0; i < 10; ++i)

•{

•boost::mutex::scoped_locklock(io_mutex);

•std::cout << id << ": " << i << std::endl;

•}

•}

•int main(int argc, char* argv[])

•{

•boost::thread thrd1(boost::bind(&count, 1));

•boost::thread thrd2(boost::bind(&count, 2));

•thrd1.join();

•thrd2.join();

•return 0;

•}

try_mutex+scoped_try_lock

•void loop(void)

•{

•bool running = true;

•while (running)

•{

•static boost::try_mutex iomutex;

•{

•boost::try_mutex::scoped_try_locklock(iomutex);//鎖定mutex

•if (lock.owns_lock())

•{

•std::cout << "Get lock." << std::endl;

•}

•else

•{

•// To do

•std::cout << "Not get lock." << std::endl;

•boost::thread::yield(); //釋放控制權

•continue;

•}

•} //lock析構,iomutex解鎖

•}

•}

timed_mutex+scoped_timed_mutex

•void loop(void)

•{

•bool running = true;

•while (running)

•{

•typedef boost::timed_mutex MUTEX;

•typedef MUTEX::scoped_timed_lock LOCK;

•static MUTEX iomutex;

•{

•boost::xtime xt;

•boost::xtime_get(&xt,boost::TIME_UTC);

•xt.sec += 1; //超時時間秒

•LOCK lock(iomutex, xt); //鎖定mutex

•if (lock.owns_lock())

•{

•std::cout << "Get lock." << std::endl;

•}

•else

•{

•std::cout << "Not get lock." << std::endl;

•boost::thread::yield(); //釋放控制權

•}

•//::sleep(10000); //長時間

•} //lock析構,iomutex解鎖

•//::sleep(250);

•}

•}

shared_mutex

應用boost::threadshared_mutex實現singled_write/multi_read的簡單例子

•#include <iostream>

•#include <boost/thread/thread.hpp>

•#include <boost/thread/shared_mutex.hpp>

•using namespace std;

•using namespace boost;

•boost::shared_mutex shr_mutex;

•/// 這個是輔助類,能夠保證log_info被完整的輸出

•class safe_log {

•public:

•static void log(const std::string& log_info) {

•boost::mutex::scoped_lock lock(log_mutex);

•cout << log_info << endl;

•}

•private:

•static boost::mutex log_mutex;

•};

•boost::mutex safe_log::log_mutex;

•void write_process() {

•shr_mutex.lock();

•safe_log::log("begin of write_process");

•safe_log::log("end of write_process");

•shr_mutex.unlock();

•}

•void read_process() {

•shr_mutex.lock_shared();

•safe_log::log("begin of read_process");

•safe_log::log("end of read_process");

•shr_mutex.unlock_shared();

•}

•int main() {

•thread_group threads;

•for (int i = 0; i < 10; ++ i) {

•threads.create_thread(&write_process);

•threads.create_thread(&read_process);

•}

•threads.join_all();

• ::system("PAUSE");

•return 0;

•}

條件變數

有的時候僅僅依靠鎖住共享資源來使用它是不夠的。有時候共享資源只有某些狀態的時候才能夠使用。

比方說,某個執行緒如果要從堆疊中讀取資料,那麼如果棧中沒有資料就必須等待資料被壓棧。這種情

況下的同步使用互斥體是不夠的。另一種同步的方式--條件變數,就可以使用在這種情況下。

•boost::condition

typedef condition_variable_any condition;

void wait(unique_lock<mutex>& m);

•boost::condition_variable

template<typename lock_type>

void wait(lock_type& m);

•#include <boost/thread/thread.hpp>

•#include <boost/thread/mutex.hpp>

•#include <boost/thread/condition.hpp>

•#include <iostream>

•const int BUF_SIZE = 10;

•const int ITERS = 100;

•boost::mutex io_mutex;

•class buffer

•{

•public:

•typedef boost::mutex::scoped_lock scoped_lock;

•buffer()

•: p(0), c(0), full(0)

•{

•}

•void put(int m)

•{

•scoped_lock lock(mutex);

•if (full == BUF_SIZE)

•{

•{

•boost::mutex::scoped_lock lock(io_mutex);

•std::cout << "Buffer is full. Waiting..." << std::endl;

•}

•while (full == BUF_SIZE)

•cond.wait(lock);

•}

•buf[p] = m;

•p = (p+1) % BUF_SIZE;

•++full;

•cond.notify_one();

•}

•int get()

•{

•scoped_lock lk(mutex);

•if (full == 0)

•{

•{

•boost::mutex::scoped_lock lock(io_mutex);

•std::cout << "Buffer is empty. Waiting..." << std::endl;

•}

•while (full == 0)

•cond.wait(lk);

•}

•int i = buf[c];

•c = (c+1) % BUF_SIZE;

•--full;

•cond.notify_one();

•return i;

•}

•private:

•boost::mutex mutex;

•boost::condition cond;

•unsigned int p, c, full;

•int buf[BUF_SIZE];

•};

•buffer buf;

•void writer()

•{

•for (int n = 0; n < ITERS; ++n)

•{

•{

•boost::mutex::scoped_lock lock(io_mutex);

•std::cout << "sending: " << n << std::endl;

•}

•buf.put(n);

•}

•}

•void reader()

•{

•for (int x = 0; x < ITERS; ++x)

•{

•int n = buf.get();

•{

•boost::mutex::scoped_lock lock(io_mutex);

•std::cout << "received: " << n << std::endl;

•}

•}

•}

•int main(int argc, char* argv[])

•{

•boost::thread thrd1(&reader);

•boost::thread thrd2(&writer);

•thrd1.join();

•thrd2.join();

•return 0;

•}

執行緒區域性儲存

函式的不可重入。

•Boost執行緒庫提供了智慧指標boost::thread_specific_ptr來訪問本地儲存執行緒(thread local storage)。

•#include <boost/thread/thread.hpp>

•#include <boost/thread/mutex.hpp>

•#include <boost/thread/tss.hpp>

•#include <iostream>

•boost::mutex io_mutex;

•boost::thread_specific_ptr<int> ptr;

•struct count

•{

•count(int id) : id(id) { }

•void operator()()

•{

•if (ptr.get() == 0)

•ptr.reset(new int(0));

•for (int i = 0; i < 10; ++i)

•{

•(*ptr)++; // 往自己的執行緒上加

•boost::mutex::scoped_lock lock(io_mutex);

•std::cout << id << ": " << *ptr << std::endl;

•}

•}

•int id;

•};

•int main(int argc, char* argv[])

•{

•boost::thread thrd1(count(1));

•boost::thread thrd2(count(2));

•thrd1.join();

•thrd2.join();

•return 0;

•}

僅執行一次的例程

如何使得初始化工作(比如說建構函式)也是執行緒安全的。

•“一次實現once routine)。一次實現在一個應用程式只能執行一次。

•Boost執行緒庫提供了boost::call_once來支援一次實現,並且定義了一個標誌boost::once_flag及一個初始化這個標誌的巨集 BOOST_ONCE_INIT

•#include <boost/thread/thread.hpp>

•#include <boost/thread/once.hpp>

•#include <iostream>

•int i = 0;

•boost::once_flag flag = BOOST_ONCE_INIT;

•void init()

•{

•++i;

•}

•void thread()

•{

•boost::call_once(&init, flag);

•}

•int main(int argc, char* argv[])

•{

•boost::thread thrd1(&thread);

•boost::thread thrd2(&thread);

•thrd1.join();

•thrd2.join();

•std::cout << i << std::endl;

•return 0;

•}

Boost執行緒庫的未來

•Boost執行緒庫正在計劃加入一些新特性。其中包括boost::read_write_mutex,它可以讓多個執行緒同時從共享區中讀取資料,

但是一次只可能有一個執行緒向共享區寫入資料;boost::thread_barrier,它使得一組執行緒處於等待狀態,知道所有得執行緒

都都進入了屏障區;boost::thread_pool,他允許執行一些小的routine而不必每一都要建立或是銷燬一個執行緒。

•Boost執行緒庫已經作為標準中的類庫技術報告中的附件提交給C++標準委員會,它的出現也為下一版C++標準吹響了第一聲號角。

委員會成員對 Boost執行緒庫的初稿給予了很高的評價,當然他們還會考慮其他的多執行緒庫。他們對在C++標準中加入對多執行緒的

支援非常感興趣。從這一點上也可以看出,多執行緒在C++中的前途一片光明。