1. 程式人生 > >boost庫多執行緒(Thread)程式設計(執行緒操作,互斥體mutex,條件變數)

boost庫多執行緒(Thread)程式設計(執行緒操作,互斥體mutex,條件變數)

轉載地址:


1 建立執行緒

就像std::fstream類就代表一個檔案一樣,boost::thread類就代表一個可執行的執行緒。預設建構函式建立一個代表當前執行執行緒的例項。一個過載的建構函式以一個不需任何引數的函式物件作為引數,並且沒有返回值。這個建構函式建立一個新的可執行執行緒,它呼叫了那個函式物件。

起先,大家認為傳統C建立執行緒的方法似乎比這樣的設計更有用,因為C建立執行緒的時候會傳入一個void*指標,通過這種方法就可以傳入資料。然而,由於Boost執行緒庫是使用函式物件來代替函式指標,那麼函式物件本身就可以攜帶執行緒所需的資料。這種方法更具靈活性,也是型別安全(type-safe)的。當和Boost.Bind這樣的功能庫一起使用時,這樣的方法就可以讓你傳遞任意數量的資料給新建的執行緒。

目前,由Boost執行緒庫建立的執行緒物件功能還不是很強大。事實上它只能做兩項操作。執行緒物件可以方便使用==和!=進行比較來確定它們是否是代表同一個執行緒;你還可以呼叫boost::thread::join來等待執行緒執行完畢。其他一些執行緒庫可以讓你對執行緒做一些其他操作(比如設定優先順序,甚至是取消執行緒)。然而,由於要在普遍適用(portable)的介面中加入這些操作不是簡單的事,目前仍在討論如何將這些操組加入到Boost執行緒庫中。

Listing1展示了boost::thread類的一個最簡單的用法。 新建的執行緒只是簡單的在std::out上列印“hello,world”,main函式在它執行完畢之後結束。


例1:

#include <boost/thread/thread.hpp>
#include <iostream>

void hello()
{
        std::cout <<
        "Hello world, I'm a thread!"
        << std::endl;
}

int main(int argc, char* argv[])
{
        boost::thread thrd(&hello);
        thrd.join();
        return 0;
}

2 互斥體

任何寫過多執行緒程式的人都知道避免不同執行緒同時訪問共享區域的重要性。如果一個執行緒要改變共享區域中某個資料,而與此同時另一執行緒正在讀這個資料,那麼結果將是未定義的。為了避免這種情況的發生就要使用一些特殊的原始型別和操作。其中最基本的就是互斥體(mutex,mutual exclusion的縮寫)。一個互斥體一次只允許一個執行緒訪問共享區。當一個執行緒想要訪問共享區時,首先要做的就是鎖住(lock)互斥體。如果其他的執行緒已經鎖住了互斥體,那麼就必須先等那個執行緒將互斥體解鎖,這樣就保證了同一時刻只有一個執行緒能訪問共享區域。

互斥體的概念有不少變種。Boost執行緒庫支援兩大類互斥體,包括簡單互斥體(simple mutex)和遞迴互斥體(recursive mutex)。如果同一個執行緒對互斥體上了兩次鎖,就會發生死鎖(deadlock),也就是說所有的等待解鎖的執行緒將一直等下去。有了遞迴互斥體,單個執行緒就可以對互斥體多次上鎖,當然也必須解鎖同樣次數來保證其他執行緒可以對這個互斥體上鎖。

在這兩大類互斥體中,對於執行緒如何上鎖還有多個變種。一個執行緒可以有三種方法來對一個互斥體加鎖:

  1. 一直等到沒有其他執行緒對互斥體加鎖。
  2. 如果有其他互斥體已經對互斥體加鎖就立即返回。
  3. 一直等到沒有其他執行緒互斥體加鎖,直到超時。

似乎最佳的互斥體型別是遞迴互斥體,它可以使用所有三種上鎖形式。然而每一個變種都是有代價的。所以Boost執行緒庫允許你根據不同的需要使用最有效率的互斥體型別。Boost執行緒庫提供了6中互斥體型別,下面是按照效率進行排序:

boost::mutex,
boost::try_mutex, 
boost::timed_mutex, 
boost::recursive_mutex, 
boost::recursive_try_mutex,  
boost::recursive_timed_mutex 
如果互斥體上鎖之後沒有解鎖就會發生死鎖。這是一個很普遍的錯誤,Boost執行緒庫就是要將其變成不可能(至少時很困難)。直接對互斥體上鎖和解鎖對於Boost執行緒庫的使用者來說是不可能的。mutex類通過teypdef定義在RAII中實現的型別來實現互斥體的上鎖和解鎖。這也就是大家知道的Scope Lock模式。為了構造這些型別,要傳入一個互斥體的引用。建構函式對互斥體加鎖,解構函式對互斥體解鎖。C++保證了解構函式一定會被呼叫,所以即使是有異常丟擲,互斥體也總是會被正確的解鎖。

這種方法保證正確的使用互斥體。然而,有一點必須注意:儘管Scope Lock模式可以保證互斥體被解鎖,但是它並沒有保證在異常丟擲之後貢獻資源仍是可用的。所以就像執行單執行緒程式一樣,必須保證異常不會導致程式狀態異常。另外,這個已經上鎖的物件不能傳遞給另一個執行緒,因為它們維護的狀態並沒有禁止這樣做。

List2給出了一個使用boost::mutex的最簡單的例子。例子中共建立了兩個新的執行緒,每個執行緒都有10次迴圈,在std::cout上打印出執行緒id和當前迴圈的次數,而main函式等待這兩個執行緒執行完才結束。std::cout就是共享資源,所以每一個執行緒都使用一個全域性互斥體來保證同時只有一個執行緒能向它寫入。

許多讀者可能已經注意到List2中傳遞資料給執行緒還必須的手工寫一個函式。儘管這個例子很簡單,如果每一次都要寫這樣的程式碼實在是讓人厭煩的事。別急,有一種簡單的解決辦法。函式庫允許你通過將另一個函式繫結,並傳入呼叫時需要的資料來建立一個新的函式。 List3向你展示瞭如何使用Boost.Bind庫來簡化List2中的程式碼,這樣就不必手工寫這些函式物件了。

例2:

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

boost::mutex io_mutex;

struct count
{
        count(int id) : id(id) { }
        
        void operator()()
        {
                for (int i = 0; i < 10; ++i)
                {
                        boost::mutex::scoped_lock
                        lock(io_mutex);
                        std::cout << id << ": "
                        << i << std::endl;
                }
        }
        
        int id;
};

int main(int argc, char* argv[])
{
        boost::thread thrd1(count(1));
        boost::thread thrd2(count(2));
        thrd1.join();
        thrd2.join();
        return 0;
}

例3:// 這個例子和例2一樣,除了使用Boost.Bind來簡化建立執行緒攜帶資料,避免使用函式物件

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex io_mutex;

void count(int id)
{
        for (int i = 0; i < 10; ++i)
        {
                boost::mutex::scoped_lock
                lock(io_mutex);
                std::cout << id << ": " <<
                i << std::endl;
        }
}

int main(int argc, char* argv[])
{
        boost::thread thrd1(
        boost::bind(&count, 1));
        boost::thread thrd2(
        boost::bind(&count, 2));
        thrd1.join();
        thrd2.join();
        return 0;
}

3 條件變數

有的時候僅僅依靠鎖住共享資源來使用它是不夠的。有時候共享資源只有某些狀態的時候才能夠使用。比方說,某個執行緒如果要從堆疊中讀取資料,那麼如果棧中沒有資料就必須等待資料被壓棧。這種情況下的同步使用互斥體是不夠的。另一種同步的方式--條件變數,就可以使用在這種情況下。

條件變數的使用總是和互斥體及共享資源聯絡在一起的。執行緒首先鎖住互斥體然後檢驗共享資源的狀態是否處於可使用的狀態如果不是,那麼執行緒就要等待條件變數。要指向這樣的操作就必須在等待的時候將互斥體解鎖,以便其他執行緒可以訪問共享資源並改變其狀態。它還得保證從等到的執行緒返回時互斥體是被上鎖的當另一個執行緒改變了共享資源的狀態時,它就要通知正在等待條件變數的執行緒,並將之返回等待的執行緒

List4是一個使用了boost::condition的簡單例子。有一個實現了有界快取區的類和一個固定大小的先進先出的容器。由於使用了互斥體boost::mutex,這個快取區是執行緒安全的。put和get使用條件變數來保證執行緒等待完成操作所必須的狀態。有兩個執行緒被建立,一個在buffer中放入100個整數,另一個將它們從buffer中取出。這個有界的快取一次只能存放10個整數,所以這兩個執行緒必須週期性的等待另一個執行緒。為了驗證這一點,put和get在std::cout中輸出診斷語句。最後,當兩個執行緒結束後,main函式也就執行完畢了。

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <iostream>

const int BUF_SIZE = 10;
const int ITERS = 100;

boost::mutex io_mutex;

class buffer
{
        public:
        typedef boost::mutex::scoped_lock
        scoped_lock;
        
        buffer()
        : p(0), c(0), full(0)
        {
        }
        
        void put(int m)
        {
                scoped_lock lock(mutex);
                if (full == BUF_SIZE)
                {
                        {
                                boost::mutex::scoped_lock
                                lock(io_mutex);
                                std::cout <<
                                "Buffer is full. Waiting..."
                                << std::endl;
                        }
                        while (full == BUF_SIZE)
                        cond.wait(lock);
                }
                buf[p] = m;
                p = (p+1) % BUF_SIZE;
                ++full;
                cond.notify_one();
        }
        
        int get()
        {
                scoped_lock lk(mutex);
                if (full == 0)
                {
                        {
                                boost::mutex::scoped_lock
                                lock(io_mutex);
                                std::cout <<
                                "Buffer is empty. Waiting..."
                                << std::endl;
                        }
                        while (full == 0)
                        cond.wait(lk);
                }
                int i = buf[c];
                c = (c+1) % BUF_SIZE;
                --full;
                cond.notify_one();
                return i;
        }
        
        private:
        boost::mutex mutex;
        boost::condition cond;
        unsigned int p, c, full;
        int buf[BUF_SIZE];
};

buffer buf;

void writer()
{
        for (int n = 0; n < ITERS; ++n)
        {
                {
                        boost::mutex::scoped_lock
                        lock(io_mutex);
                        std::cout << "sending: "
                        << n << std::endl;
                }
                buf.put(n);
        }
}

void reader()
{
        for (int x = 0; x < ITERS; ++x)
        {
                int n = buf.get();
                {
                        boost::mutex::scoped_lock
                        lock(io_mutex);
                        std::cout << "received: "
                        << n << std::endl;
                }
        }
}

int main(int argc, char* argv[])
{
        boost::thread thrd1(&reader);
        boost::thread thrd2(&writer);
        thrd1.join();
        thrd2.join();
        return 0;
}

*******************************  Begin **************************************

該內容轉自:http://www.boost.org/doc/libs/1_62_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

//#include <boost/thread/condition_variable.hpp>

namespace boost
{
    class condition_variable
    {
    public:
        condition_variable();
        ~condition_variable();

        void notify_one() noexcept;
        void notify_all() noexcept;

        void wait(boost::unique_lock<boost::mutex>& lock);

        template<typename predicate_type>
        void wait(boost::unique_lock<boost::mutex>& lock,predicate_type predicate);

        template <class Clock, class Duration>
        typename cv_status::type
        wait_until(
            unique_lock<mutex>& lock,
            const chrono::time_point<Clock, Duration>& t);

        template <class Clock, class Duration, class Predicate>
        bool
        wait_until(
            unique_lock<mutex>& lock,
            const chrono::time_point<Clock, Duration>& t,
            Predicate pred);

        template <class Rep, class Period>
        typename cv_status::type
        wait_for(
            unique_lock<mutex>& lock,
            const chrono::duration<Rep, Period>& d);

        template <class Rep, class Period, class Predicate>
        bool
        wait_for(
            unique_lock<mutex>& lock,
            const chrono::duration<Rep, Period>& d,
            Predicate pred);

    #if defined BOOST_THREAD_USES_DATETIME
        bool timed_wait(boost::unique_lock<boost::mutex>& lock,boost::system_time const& abs_time);
        template<typename duration_type>
        bool timed_wait(boost::unique_lock<boost::mutex>& lock,duration_type const& rel_time);
        template<typename predicate_type>
        bool timed_wait(boost::unique_lock<boost::mutex>& lock,boost::system_time const& abs_time,predicate_type predicate);
        template<typename duration_type,typename predicate_type>
        bool timed_wait(boost::unique_lock<boost::mutex>& lock,duration_type const& rel_time,predicate_type predicate);
        bool timed_wait(boost::unique_lock<boost::mutex>& lock,boost::xtime const& abs_time);

        template<typename predicate_type>
        bool timed_wait(boost::unique_lock<boost::mutex>& lock,boost::xtime const& abs_time,predicate_type predicate);
    #endif

    };
}
Effects:

Constructs an object of class condition_variable.

Throws:

boost::thread_resource_error if an error occurs.

Precondition:

All threads waiting on *this have been notified by a call to notify_one or notify_all (though the respective calls to wait or timed_wait need not have returned).

Effects:

Destroys the object.

Throws:

Nothing.

Effects:

If any threads are currently blocked waiting on *this in a call to wait or timed_wait, unblocks one of those threads.

Throws:

Nothing.

Effects:

If any threads are currently blocked waiting on *this in a call to wait or timed_wait, unblocks all of those threads.

Throws:

Nothing.

Precondition:

lock is locked by the current thread, and either no other thread is currently waiting on *this, or the execution of the mutex() member function on the lock objects supplied in the calls to wait or timed_wait in all the threads currently waiting on *this would return the same value as lock->mutex() for this call to wait.

Effects:(作用)

Atomically call lock.unlock() and blocks the current thread. The thread will unblock when notified by a call to this->notify_one() or this->notify_all(), or spuriously. When the thread is unblocked (for whatever reason),the lock is reacquired by invoking lock.lock() before the call to wait returns.The lock is also reacquired by invoking lock.lock() if the function exits with an exception.

Postcondition:

lock is locked by the current thread.

Throws:

boost::thread_resource_error if an error occurs. boost::thread_interrupted if the wait was interrupted by a call to interrupt() on the boost::thread object associated with the current thread of execution.

其他的函式解釋直接去官網檢視

boost::unique_lock<boost::mutex> 鎖,

**********************************  End***********************************

4 執行緒區域性儲存

大多數函式都不是可重入的。這也就是說在某一個執行緒已經呼叫了一個函式時,如果你再呼叫同一個函式,那麼這樣是不安全的。一個不可重入的函式通過連續的呼叫來儲存靜態變數或者是返回一個指向靜態資料的指標。 舉例來說,std::strtok就是不可重入的,因為它使用靜態變數來儲存要被分割成符號的字串。

有兩種方法可以讓不可重用的函式變成可重用的函式。第一種方法就是改變介面,用指標或引用代替原先使用靜態資料的地方。比方說,POSIX定義了strok_r,std::strtok中的一個可重入的變數,它用一個額外的char**引數來代替靜態資料。這種方法很簡單,而且提供了可能的最佳效果。但是這樣必須改變公共介面,也就意味著必須改程式碼。另一種方法不用改變公有介面,而是用本地儲存執行緒(thread local storage)來代替靜態資料(有時也被成為特殊執行緒儲存,thread-specific storage)。

Boost執行緒庫提供了智慧指標boost::thread_specific_ptr來訪問本地儲存執行緒。每一個執行緒第一次使用這個智慧指標的例項時,它的初值是NULL,所以必須要先檢查這個它的只是否為空,並且為它賦值。Boost執行緒庫保證本地儲存執行緒中儲存的資料會線上程結束後被清除。

List5是一個使用boost::thread_specific_ptr的簡單例子。其中建立了兩個執行緒來初始化本地儲存執行緒,並有10次迴圈,每一次都會增加智慧指標指向的值,並將其輸出到std::cout上(由於std::cout是一個共享資源,所以通過互斥體進行同步)。main執行緒等待這兩個執行緒結束後就退出。從這個例子輸出可以明白的看出每個執行緒都處理屬於自己的資料例項,儘管它們都是使用同一個boost::thread_specific_ptr。

例5:

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/tss.hpp>
#include <iostream>

boost::mutex io_mutex;
boost::thread_specific_ptr<int> ptr;

struct count
{
        count(int id) : id(id) { }
        
        void operator()()
        {
                if (ptr.get() == 0)
                ptr.reset(new int(0));
                
                for (int i = 0; i < 10; ++i)
                {
                        (*ptr)++;
                        boost::mutex::scoped_lock
                        lock(io_mutex);
                        std::cout << id << ": "
                        << *ptr << std::endl;
                }
        }
        
        int id;
};

int main(int argc, char* argv[])
{
        boost::thread thrd1(count(1));
        boost::thread thrd2(count(2));
        thrd1.join();
        thrd2.join();
        return 0;
}

5 僅執行一次的例程

還有一個問題沒有解決:如何使得初始化工作(比如說建構函式)也是執行緒安全的。比方說,如果一個引用程式要產生唯一的全域性的物件,由於例項化順序的問題,某個函式會被呼叫來返回一個靜態的物件,它必須保證第一次被呼叫時就產生這個靜態的物件。這裡的問題就是如果多個執行緒同時呼叫了這個函式,那麼這個靜態物件的建構函式就會被呼叫多次,這樣錯誤產生了。

解決這個問題的方法就是所謂的“一次實現”(once routine)。“一次實現”在一個應用程式只能執行一次。如果多個執行緒想同時執行這個操作,那麼真正執行的只有一個,而其他執行緒必須等這個操作結束。為了保證它只被執行一次,這個routine由另一個函式間接的呼叫,而這個函式傳給它一個指標以及一個標誌著這個routine是否已經被呼叫的特殊標誌。這個標誌是以靜態的方式初始化的,這也就保證了它在編譯期間就被初始化而不是執行時。因此也就沒有多個執行緒同時將它初始化的問題了。Boost執行緒庫提供了boost::call_once來支援“一次實現”,並且定義了一個標誌boost::once_flag及一個初始化這個標誌的巨集BOOST_ONCE_INIT。

List6是一個使用了boost::call_once的例子。其中定義了一個靜態的全域性整數,初始值為0;還有一個由BOOST_ONCE_INIT初始化的靜態boost::once_flag例項。main函式建立了兩個執行緒,它們都想通過傳入一個函式呼叫boost::call_once來初始化這個全域性的整數,這個函式是將它加1。main函式等待著兩個執行緒結束,並將最後的結果輸出的到std::cout。由最後的結果可以看出這個操作確實只被執行了一次,因為它的值是1。

#include <boost/thread/thread.hpp>
#include <boost/thread/once.hpp>
#include <iostream>

int i = 0;
boost::once_flag flag =
BOOST_ONCE_INIT;

void init()
{
        ++i;
}

void thread()
{
        boost::call_once(&init, flag);
}

int main(int argc, char* argv[])
{
        boost::thread thrd1(&thread);
        boost::thread thrd2(&thread);
        thrd1.join();
        thrd2.join();
        std::cout << i << std::endl;
        return 0;
}


boost庫多執行緒(Thread)程式設計知識點總結

標頭檔案:#include<boost/thread.hpp>

1) 執行緒建立,管理

thread類

(1)執行緒建構函式:

   boost::thread(Callable func);

(2)帶引數函式對應的執行緒建構函式:

   boost::thread(F f,A1 a1,A2 a2,...); //目前最多支援帶九個額外引數(a1 到 a9 )的f。

(3)成員函式join()

   阻塞呼叫當前執行緒的執行緒,直到當前執行緒執行完畢。

(4)成員函式get_id()

   返回當前執行緒的ID。

(5)成員函式hardware_concurrency()

   返回當前執行緒數。

獨立於thread類的this_thread名稱空間

(1)boost::this_thread::get_id

 同上get_id

(2)boost::this_thread::sleep

 將呼叫執行緒掛起指定時間。例:boost::this_thread::sleep(boost::posix_time::seconds(seconds));

2) 同步

  使用執行緒同時執行幾個函式時,訪問共享資源(全域性變數等)時要考慮同步,同步所需工作如下:

(1)互斥量

   互斥的原則是當執行緒擁有共享資源時防止其他執行緒奪取其所有權,一旦釋放,其他執行緒可以取得所有權,這將導致執行緒等待另一個執行緒處理完一些操作,釋放互斥量的所有權。

獨佔式互斥量:boost::mutex ,併發呼叫成員函式lock(),unlock()

multiple-reader / single-writer互斥量:boost::shared_mutex,併發呼叫成員函式lock(),unlock()

(2)更多功能的鎖

boost::unique_lock 獨佔鎖

boost::shared_lock multiple-reader / single-writer鎖

3) 死鎖

  死鎖主要發生在有多個依賴鎖存在時,會在一個執行緒試圖與另一個執行緒以相反的順序鎖住互斥量。如何避免死鎖:

(1)對共享資源操作前,一定要獲得鎖;

(2)完成操作後,一定要釋放鎖;

(3)儘量短時佔用鎖;

(4)如果有多鎖,如獲得順序是ABC,則釋放順序也是ABC。

(5)如果執行緒錯誤返回,應該釋放他所獲得的鎖。