C++11 併發指南五(std::condition_variable 詳解)
前面三講《C++11 併發指南二(std::thread 詳解)》,《C++11 併發指南三(std::mutex 詳解)》分別介紹了 std::thread,std::mutex,std::future 等相關內容,相信讀者對 C++11 中的多執行緒程式設計有了一個最基本的認識,本文將介紹 C++11 標準中 <condition_variable> 標頭檔案裡面的類和相關函式。
<condition_variable > 標頭檔案主要包含了與條件變數相關的類和函式。相關的類包括 std::condition_variable 和 std::condition_variable_any,還有列舉型別std::cv_status。另外還包括函式 std::notify_all_at_thread_exit(),下面分別介紹一下以上幾種型別。
std::condition_variable 類介紹
std::condition_variable 是條件變數,更多有關條件變數的定義參考維基百科。Linux 下使用 Pthread 庫中的 pthread_cond_*() 函式提供了與條件變數相關的功能, Windows 則參考 MSDN。
當 std::condition_variable 物件的某個 wait 函式被呼叫的時候,它使用 std::unique_lock(通過 std::mutex) 來鎖住當前執行緒。當前執行緒會一直被阻塞,直到另外一個執行緒在相同的 std::condition_variable 物件上呼叫了 notification 函式來喚醒當前執行緒。
std::condition_variable 物件通常使用 std::unique_lock<std::mutex> 來等待,如果需要使用另外的 lockable 型別,可以使用 std::condition_variable_any 類,本文後面會講到 std::condition_variable_any 的用法。
首先我們來看一個簡單的例子
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> //std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; // 全域性互斥鎖. std::condition_variable cv; // 全域性條件變數. bool ready = false; // 全域性標誌位. void do_print_id(int id) { std::unique_lock <std::mutex> lck(mtx); while (!ready) // 如果標誌位不為 true, 則等待... cv.wait(lck); // 當前執行緒被阻塞, 當全域性標誌位變為 true 之後, // 執行緒被喚醒, 繼續往下執行列印執行緒編號id. std::cout << "thread " << id << '\n'; } void go() { std::unique_lock <std::mutex> lck(mtx); ready = true; // 設定全域性標誌位為 true. cv.notify_all(); // 喚醒所有執行緒. } int main() { std::thread threads[10]; // spawn 10 threads: for (int i = 0; i < 10; ++i) threads[i] = std::thread(do_print_id, i); std::cout << "10 threads ready to race...\n"; go(); // go! for (auto & th:threads) th.join(); return 0; }
執行結果如下:
concurrency ) ./ConditionVariable-basic1 10 threads ready to race... thread 1 thread 0 thread 2 thread 3 thread 4 thread 5 thread 6 thread 7 thread 8 thread 9
好了,對條件變數有了一個基本的瞭解之後,我們來看看 std::condition_variable 的各個成員函式。
std::condition_variable 建構函式
default (1) |
condition_variable(); |
---|---|
copy [deleted] (2) |
condition_variable (const condition_variable&) = delete; |
std::condition_variable 的拷貝建構函式被禁用,只提供了預設建構函式。
std::condition_variable::wait() 介紹
unconditional (1) |
void wait (unique_lock<mutex>& lck); |
---|---|
predicate (2) |
template <class Predicate> void wait (unique_lock<mutex>& lck, Predicate pred); |
std::condition_variable 提供了兩種 wait() 函式。當前執行緒呼叫 wait() 後將被阻塞(此時當前執行緒應該獲得了鎖(mutex),不妨設獲得鎖 lck),直到另外某個執行緒呼叫 notify_* 喚醒了當前執行緒。
線上程被阻塞時,該函式會自動呼叫 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競爭上的執行緒得以繼續執行。
另外,一旦當前執行緒獲得通知(notified,通常是另外某個執行緒呼叫 notify_* 喚醒了當前執行緒),wait() 函式也是自動呼叫 lck.lock(),使得 lck 的狀態和 wait 函式被呼叫時相同。
在第二種情況下(即設定了 Predicate),只有當 pred 條件為 false 時呼叫 wait() 才會阻塞當前執行緒,並且在收到其他執行緒的通知後只有當 pred 為 true 時才會被解除阻塞。因此第二種情況類似以下程式碼:
while (!pred()) wait(lck);
請看下面例子(參考):
#include <iostream> // std::cout #include <thread> // std::thread, std::this_thread::yield #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; std::condition_variable cv; int cargo = 0; bool shipment_available() { return cargo != 0; } // 消費者執行緒. void consume(int n) { for (int i = 0; i < n; ++i) { std::unique_lock <std::mutex> lck(mtx); cv.wait(lck, shipment_available); std::cout << cargo << '\n'; cargo = 0; } } int main() { std::thread consumer_thread(consume, 10); // 消費者執行緒. // 主執行緒為生產者執行緒, 生產 10 個物品. for (int i = 0; i < 10; ++i) { while (shipment_available()) std::this_thread::yield(); std::unique_lock <std::mutex> lck(mtx); cargo = i + 1; cv.notify_one(); } consumer_thread.join(); return 0; }
程式執行結果如下:
concurrency ) ./ConditionVariable-wait 1 2 3 4 5 6 7 8 9 10
std::condition_variable::wait_for() 介紹
unconditional (1) |
template <class Rep, class Period> cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time); |
---|---|
predicate (2) |
template <class Rep, class Period, class Predicate> bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred); |
與 std::condition_variable::wait() 類似,不過 wait_for 可以指定一個時間段,在當前執行緒收到通知或者指定的時間 rel_time 超時之前,該執行緒都會處於阻塞狀態。而一旦超時或者收到了其他執行緒的通知,wait_for 返回,剩下的處理步驟和 wait() 類似。
另外,wait_for 的過載版本(predicte(2))的最後一個引數 pred 表示 wait_for 的預測條件,只有當 pred 條件為 false 時呼叫 wait() 才會阻塞當前執行緒,並且在收到其他執行緒的通知後只有當 pred 為 true 時才會被解除阻塞,因此相當於如下程式碼:
return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));
請看下面的例子(參考),下面的例子中,主執行緒等待 th 執行緒輸入一個值,然後將 th 執行緒從終端接收的值打印出來,在 th 執行緒接受到值之前,主執行緒一直等待,每個一秒超時一次,並列印一個 ".":
#include <iostream> // std::cout #include <thread> // std::thread #include <chrono> // std::chrono::seconds #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable, std::cv_status std::condition_variable cv; int value; void do_read_value() { std::cin >> value; cv.notify_one(); } int main () { std::cout << "Please, enter an integer (I'll be printing dots): \n"; std::thread th(do_read_value); std::mutex mtx; std::unique_lock<std::mutex> lck(mtx); while (cv.wait_for(lck,std::chrono::seconds(1)) == std::cv_status::timeout) { std::cout << '.'; std::cout.flush(); } std::cout << "You entered: " << value << '\n'; th.join(); return 0; }
std::condition_variable::wait_until 介紹
unconditional (1) |
template <class Clock, class Duration> cv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time); |
---|---|
predicate (2) |
template <class Clock, class Duration, class Predicate> bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred); |
與 std::condition_variable::wait_for 類似,但是 wait_until 可以指定一個時間點,在當前執行緒收到通知或者指定的時間點 abs_time 超時之前,該執行緒都會處於阻塞狀態。而一旦超時或者收到了其他執行緒的通知,wait_until 返回,剩下的處理步驟和 wait_until() 類似。
另外,wait_until 的過載版本(predicte(2))的最後一個引數 pred 表示 wait_until 的預測條件,只有當 pred 條件為 false 時呼叫 wait() 才會阻塞當前執行緒,並且在收到其他執行緒的通知後只有當 pred 為 true 時才會被解除阻塞,因此相當於如下程式碼:
while (!pred()) if ( wait_until(lck,abs_time) == cv_status::timeout) return pred(); return true;
std::condition_variable::notify_one() 介紹
喚醒某個等待(wait)執行緒。如果當前沒有等待執行緒,則該函式什麼也不做,如果同時存在多個等待執行緒,則喚醒某個執行緒是不確定的(unspecified)。
請看下例(參考):
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; std::condition_variable cv; int cargo = 0; // shared value by producers and consumers void consumer() { std::unique_lock < std::mutex > lck(mtx); while (cargo == 0) cv.wait(lck); std::cout << cargo << '\n'; cargo = 0; } void producer(int id) { std::unique_lock < std::mutex > lck(mtx); cargo = id; cv.notify_one(); } int main() { std::thread consumers[10], producers[10]; // spawn 10 consumers and 10 producers: for (int i = 0; i < 10; ++i) { consumers[i] = std::thread(consumer); producers[i] = std::thread(producer, i + 1); } // join them back: for (int i = 0; i < 10; ++i) { producers[i].join(); consumers[i].join(); } return 0; }
std::condition_variable::notify_all() 介紹
喚醒所有的等待(wait)執行緒。如果當前沒有等待執行緒,則該函式什麼也不做。請看下面的例子:
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; // 全域性互斥鎖. std::condition_variable cv; // 全域性條件變數. bool ready = false; // 全域性標誌位. void do_print_id(int id) { std::unique_lock <std::mutex> lck(mtx); while (!ready) // 如果標誌位不為 true, 則等待... cv.wait(lck); // 當前執行緒被阻塞, 當全域性標誌位變為 true 之後, // 執行緒被喚醒, 繼續往下執行列印執行緒編號id. std::cout << "thread " << id << '\n'; } void go() { std::unique_lock <std::mutex> lck(mtx); ready = true; // 設定全域性標誌位為 true. cv.notify_all(); // 喚醒所有執行緒. } int main() { std::thread threads[10]; // spawn 10 threads: for (int i = 0; i < 10; ++i) threads[i] = std::thread(do_print_id, i); std::cout << "10 threads ready to race...\n"; go(); // go! for (auto & th:threads) th.join(); return 0; }
std::condition_variable_any 介紹
與 std::condition_variable 類似,只不過 std::condition_variable_any 的 wait 函式可以接受任何 lockable 引數,而 std::condition_variable 只能接受 std::unique_lock<std::mutex> 型別的引數,除此以外,和 std::condition_variable 幾乎完全一樣。
std::cv_status 列舉型別介紹
cv_status::no_timeout | wait_for 或者 wait_until 沒有超時,即在規定的時間段內執行緒收到了通知。 |
cv_status::timeout | wait_for 或者 wait_until 超時。 |
std::notify_all_at_thread_exit
函式原型為:
void notify_all_at_thread_exit (condition_variable& cond, unique_lock<mutex> lck);
當呼叫該函式的執行緒退出時,所有在 cond 條件變數上等待的執行緒都會收到通知。請看下例(參考):
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; std::condition_variable cv; bool ready = false; void print_id (int id) { std::unique_lock<std::mutex> lck(mtx); while (!ready) cv.wait(lck); // ... std::cout << "thread " << id << '\n'; } void go() { std::unique_lock<std::mutex> lck(mtx); std::notify_all_at_thread_exit(cv,std::move(lck)); ready = true; } int main () { std::thread threads[10]; // spawn 10 threads: for (int i=0; i<10; ++i) threads[i] = std::thread(print_id,i); std::cout << "10 threads ready to race...\n"; std::thread(go).detach(); // go! for (auto& th : threads) th.join(); return 0; }
好了,到此為止,<condition_variable> 標頭檔案中的兩個條件變數類(std::condition_variable 和 std::condition_variable_any)、列舉型別(std::cv_status)、以及輔助函式(std::notify_all_at_thread_exit())都已經介紹完了。從下一章開始我會逐步開始介紹 <atomic> 標頭檔案中的內容,後續的文章還會介紹 C++11 的記憶體模型,涉及內容稍微底層一些,希望大家能夠保持興趣,學完 C++11 併發程式設計,如果你發現本文中的錯誤,也請給我反饋 ;-)。