1. 程式人生 > >C++11中std condition variable的使用

C++11中std condition variable的使用

               

<condition_variable>是C++標準程式庫中的一個頭檔案,定義了C++11標準中的一些用於併發程式設計時表示條件變數的類與方法等。

條件變數是併發程式設計中的一種控制結構。多個執行緒訪問一個共享資源(或稱臨界區)時,不但需要用互斥鎖實現獨享訪問以避免併發錯誤(稱為競爭危害),在獲得互斥鎖進入臨界區後還需要檢驗特定條件是否成立:

(1)、如果不滿足該條件,擁有互斥鎖的執行緒應該釋放該互斥鎖,把自身阻塞(block)並掛到(suspend)條件變數的執行緒佇列中

(2)、如果滿足該條件,擁有互斥鎖的執行緒在臨界區內訪問共享資源,在退出臨界區時通知(notify)在條件變數的執行緒佇列中處於阻塞狀態的執行緒,被通知的執行緒必須重新申請對該互斥鎖加鎖。

C++11的標準庫中新增加的條件變數的實現,與pthread的實現語義完全一致。使用條件變數做併發控制時,某一時刻阻塞在一個條件變數上的各個執行緒應該在呼叫wait操作時指明同一個互斥鎖,此時該條件變數與該互斥鎖繫結;否則程式的行為未定義。條件變數必須與互斥鎖配合使用,其理由是程式需要判定某個條件(condition或稱predict)是否成立,該條件可以是任意複雜。

離開臨界區的執行緒用notify操作解除阻塞(unblock)在條件變數上的各個執行緒時,按照公平性(fairness)這些執行緒應該有平等的獲得互斥鎖的機會,不應讓某個執行緒始終難以獲得互斥鎖被餓死(starvation),並且比後來到臨界區的其它執行緒更為優先(即基本上FIFO)。一種辦法是呼叫了notify_all的執行緒保持互斥鎖,直到所有從條件變數上解除阻塞的執行緒都已經掛起(suspend)到互斥鎖上,然後發起了notify_all的執行緒再釋放互斥鎖。互斥鎖上一般都有比較完善的阻塞執行緒排程演算法,一般會按照執行緒優先順序排程,相同優先順序按照FIFO排程。

發起notify的執行緒不需要擁有互斥鎖。即將離開臨界區的執行緒是先釋放互斥鎖還是先notify操作解除在條件變數上掛起執行緒的阻塞?表面看兩種順序都可以。但一般建議是先notify操作,後對互斥鎖解鎖。因為這既有利於上述的公平性,同時還避免了相反順序時可能的優先順序倒置。這種先notify後解鎖的做法是悲觀的(pessimization),因為被通知(notified)執行緒將立即被阻塞,等待通知(notifying)執行緒釋放互斥鎖。很多實現(特別是pthreads的很多實現)為了避免這種”匆忙與等待”(hurry up and wait)情形,把在條件變數的執行緒佇列上處於等待的被通知執行緒直接移到互斥鎖的執行緒佇列上,而不喚醒這些執行緒。

C++11中引入了條件變數,其相關內容均在<condition_variable>中。這裡主要介紹std::condition_variable類。

條件變數std::condition_variable用於多執行緒之間的通訊,它可以阻塞一個或同時阻塞多個執行緒。std::condition_variable需要與std::unique_lock配合使用。std::condition_variable效果上相當於包裝了pthread庫中的pthread_cond_*()系列的函式。

當std::condition_variable物件的某個wait函式被呼叫的時候,它使用std::unique_lock(通過std::mutex)來鎖住當前執行緒。當前執行緒會一直被阻塞,直到另外一個執行緒在相同的std::condition_variable物件上呼叫了notification函式來喚醒當前執行緒。

std::condition_variable物件通常使用std::unique_lock<std::mutex>來等待,如果需要使用另外的lockable型別,可以使用std::condition_variable_any類。

std::condition_variable類的成員函式:

(1)、建構函式:僅支援預設建構函式,拷貝、賦值和移動(move)均是被禁用的。

(2)、wait:當前執行緒呼叫wait()後將被阻塞,直到另外某個執行緒呼叫notify_*喚醒當前執行緒;當執行緒被阻塞時,該函式會自動呼叫std::mutex的unlock()釋放鎖,使得其它被阻塞在鎖競爭上的執行緒得以繼續執行。一旦當前執行緒獲得通知(notify,通常是另外某個執行緒呼叫notify_*喚醒了當前執行緒),wait()函式也是自動呼叫std::mutex的lock()。wait分為無條件被阻塞和帶條件的被阻塞兩種。

無條件被阻塞:呼叫該函式前,當前執行緒應該已經對unique_lock<mutex> lck完成了加鎖。所有使用同一個條件變數的執行緒必須在wait函式中使用同一個unique_lock<mutex>。該wait函式內部會自動呼叫lck.unlock()對互斥鎖解鎖,使得其他被阻塞在互斥鎖上的執行緒恢復執行。使用本函式被阻塞的當前執行緒在獲得通知(notified,通過別的執行緒呼叫 notify_*系列的函式)而被喚醒後,wait()函式恢復執行並自動呼叫lck.lock()對互斥鎖加鎖。

帶條件的被阻塞:wait函式設定了謂詞(Predicate),只有當pred條件為false時呼叫該wait函式才會阻塞當前執行緒,並且在收到其它執行緒的通知後只有當pred為true時才會被解除阻塞。因此,等效於while (!pred())  wait(lck).

(3)、wait_for:與wait()類似,只是wait_for可以指定一個時間段,在當前執行緒收到通知或者指定的時間超時之前,該執行緒都會處於阻塞狀態。而一旦超時或者收到了其它執行緒的通知,wait_for返回,剩下的步驟和wait類似。

(4)、wait_until:與wait_for類似,只是wait_until可以指定一個時間點,在當前執行緒收到通知或者指定的時間點超時之前,該執行緒都會處於阻塞狀態。而一旦超時或者收到了其它執行緒的通知,wait_until返回,剩下的處理步驟和wait類似。

(5)、notify_all: 喚醒所有的wait執行緒,如果當前沒有等待執行緒,則該函式什麼也不做。

(6)、notify_one:喚醒某個wait執行緒,如果當前沒有等待執行緒,則該函式什麼也不做;如果同時存在多個等待執行緒,則喚醒某個執行緒是不確定的(unspecified)。

條件變化存在虛假喚醒的情況,因此線上程被喚醒後需要檢查條件是否滿足。無論是notify_one或notify_all都是類似於發出脈衝訊號,如果對wait的呼叫發生在notify之後是不會被喚醒的,所以接收者在使用wait等待之前也需要檢查條件是否滿足。

std::condition_variable_any類與std::condition_variable用法一樣,區別僅在於std::condition_variable_any的wait函式可以接受任何lockable引數,而std::condition_variable只能接受std::unique_lock<std::mutex>型別的引數。

std::notify_all_at_thread_exit函式:當呼叫該函式的執行緒退出時,所有在cond條件變數上等待的執行緒都會收到通知。

std::condition_variable:A condition variable is an object able to block the calling thread until notified to resume. It uses a unique_lock (over a mutex) to lock the thread when one of its wait functions is called. The thread remains blocked until woken up by another thread that calls a notification function on the same condition_variable object. Objects of type condition_variable always use unique_lock<mutex> to wait: for an alternative that works with any kind of lockable type, see condition_variable_any.

The condition_variable class is a synchronization primitive that can be used to block a thread, or multiple threads at the same time, until another thread both modifies a shared variable (the condition), and notifies the condition_variable.

The thread that intends to modify the variable has to:(1)、acquire a std::mutex (typically via std::lock_guard);(2)、perform the modification while the lock is held;(3)、execute notify_one or notify_all on the std::condition_variable (the lock does not need to be held for notification).

Any thread that intends to wait on std::condition_variable has to:(1)、acquire a std::unique_lock<std::mutex>, on the same mutex as used to protect the shared variable;(2)、execute wait, wait_for, or wait_until. The wait operations atomically release the mutex and suspend the execution of the thread;(3)、When the condition variable is notified, a timeout expires, or a spurious wake up occurs,the thread is awakened, and the mutex is atomically reacquired. The thread should then check the condition and resume waiting if the wake up was spurious.

std::condition_variable works only with std::unique_lock<std::mutex>; this restriction allows for maximal efficiency on some platforms. std::condition_variable_any provides a condition variable that works with any BasicLockable object, such as std::shared_lock.

 下面是從其它文章中copy的std::condition_variable測試程式碼,詳細內容介紹可以參考對應的reference:

#include "condition_variable.hpp"#include <iostream>#include <chrono>#include <thread>#include <mutex>#include <condition_variable>#include <string>namespace condition_variable_ {//////////////////////////////////////////////////////////////////////// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/std::mutex mtx;std::condition_variable cv;bool ready = false;static void print_id(int id)std::unique_lock<std::mutex> lck(mtx); while (!ready) cv.wait(lck); // ... std::cout << "thread " << id << '\n';}static void go()std::unique_lock<std::mutex> lck(mtx); ready = true; cv.notify_all();}int test_condition_variable_1()std::thread threads[10]; // spawn 10 threads: for (int i = 0; i<10; ++i)  threads[i] = std::thread(print_id, i); std::cout << "10 threads ready to race...\n"; go();                       // go! for (auto& th : threads) th.join(); return 0;}/////////////////////////////////////////////////////////////////////////// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/wait/// condition_variable::wait: Wait until notified,// The execution of the current thread (which shall have locked lck's mutex) is blocked until notified.// At the moment of blocking the thread, the function automatically calls lck.unlock(), allowing other locked threads to continue.// If pred is specified, the function only blocks if pred returns false,// and notifications can only unblock the thread when it becomes true (which is specially useful to check against spurious wake-up calls).std::mutex mtx2;std::condition_variable cv2;int cargo = 0;static bool shipment_available() { return cargo != 0; }static void consume(int n)for (int i = 0; i<n; ++i) {  std::unique_lock<std::mutex> lck(mtx2);  cv2.wait(lck, shipment_available);  // consume:  std::cout << cargo << '\n';  cargo = 0;  std::cout << "****: " << cargo << std::endl; }}int test_condition_variable_wait()std::thread consumer_thread(consume, 10)// produce 10 items when needed: for (int i = 0; i<10; ++i) {  while (shipment_available()) std::this_thread::yield();  std::unique_lock<std::mutex> lck(mtx2);  cargo = i + 1;  cv2.notify_one(); } consumer_thread.join(); return 0;}///////////////////////////////////////////////////////////////////////////// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/wait_for/// condition_variable::wait_for: Wait for timeout or until notified// The execution of the current thread (which shall have locked lck's mutex) is blocked during rel_time,// or until notified (if the latter happens first).// At the moment of blocking the thread, the function automatically calls lck.unlock(),// allowing other locked threads to continue.std::condition_variable cv3;int value;static void read_value()std::cin >> value; cv3.notify_one();}int test_condition_variable_wait_for()std::cout << "Please, enter an integer (I'll be printing dots): \n"std::thread th(read_value)std::mutex mtx; std::unique_lock<std::mutex> lck(mtx); while (cv3.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout) {  std::cout << '.' << std::endl; } std::cout << "You entered: " << value << '\n'; th.join(); return 0;}//////////////////////////////////////////////////////////////////// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/notify_one/// condition_variable::notify_one: Notify one, Unblocks one of the threads currently waiting for this condition.// If no threads are waiting, the function does nothing.// If more than one, it is unspecified which of the threads is selected.std::mutex mtx4;std::condition_variable produce4, consume4;int cargo4 = 0;     // shared value by producers and consumersstatic void consumer4()std::unique_lock<std::mutex> lck(mtx4); while (cargo4 == 0) consume4.wait(lck); std::cout << cargo4 << '\n'; cargo4 = 0; produce4.notify_one();}static void producer(int id)std::unique_lock<std::mutex> lck(mtx4); while (cargo4 != 0) produce4.wait(lck); cargo4 = id; consume4.notify_one();}int test_condition_variable_notify_one()std::thread consumers[10], producers[10]; // spawn 10 consumers and 10 producers: for (int i = 0; i<10; ++i) {  consumers[i] = std::thread(consumer4);  producers[i] = std::thread(producer, i + 1); } // join them back: for (int i = 0; i<10; ++i) {  producers[i].join();  consumers[i].join(); } return 0;}/////////////////////////////////////////////////////////////// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/notify_all/// condition_variable::notify_all: Notify all, Unblocks all threads currently waiting for this condition.// If no threads are waiting, the function does nothing.std::mutex mtx5;std::condition_variable cv5;bool ready5 = false;static void print_id5(int id) std::unique_lock<std::mutex> lck(mtx5); while (!ready5) cv5.wait(lck); // ... std::cout << "thread " << id << '\n';}static void go5()std::unique_lock<std::mutex> lck(mtx5); ready5 = true; cv5.notify_all();}int test_condition_variable_notify_all()std::thread threads[10]; // spawn 10 threads: for (int i = 0; i<10; ++i)  threads[i] = std::thread(print_id5, i); std::cout << "10 threads ready to race...\n"; go5();                       // go! for (auto& th : threads) th.join(); return 0;}////////////////////////////////////////////////////////////// reference: http://en.cppreference.com/w/cpp/thread/condition_variablestd::mutex m;std::condition_variable cv6;std::string data;bool ready6 = false;bool processed = false;static void worker_thread()// Wait until main() sends data std::unique_lock<std::mutex> lk(m); cv6.wait(lk, []{return ready6; }); // after the wait, we own the lock. std::cout << "Worker thread is processing data\n"; data += " after processing"// Send data back to main() processed = truestd::cout << "Worker thread signals data processing completed\n"// Manual unlocking is done before notifying, to avoid waking up // the waiting thread only to block again (see notify_one for details) lk.unlock(); cv6.notify_one();}int test_condition_variable_2()std::thread worker(worker_thread); data = "Example data"// send data to the worker thread {  std::lock_guard<std::mutex> lk(m);  ready6 = true;  std::cout << "main() signals data ready for processing\n"; } cv6.notify_one(); // wait for the worker {  std::unique_lock<std::mutex> lk(m);  cv6.wait(lk, []{return processed; }); } std::cout << "Back in main(), data = " << data << '\n'; worker.join(); return 0;}} // namespace condition_variable_
GitHubhttps://github.com/fengbingchun/Messy_Test