C++中的事件Event(基於條件變數的封裝)
考慮這樣一個場景,現在有兩條執行緒,一條執行緒負責往佇列中塞元素,另一條執行緒負責從佇列中取元素。其實就是簡單的生產者消費者佇列,取元素的這個執行緒需要注意佇列中是否有元素,如果沒有元素就不能取,於是我就搞一個迴圈,一直取看佇列是否為空:
for( ; !que.empty(); ) {}
這樣子確實可以實現沒有元素就等待,有元素就取元素,但是這樣搞,這條形成一直處於CPU100%執行的狀態,好像這種情況叫自旋。這樣和浪費資源,於是,想出第二個方法:執行緒休眠是不是就不浪費資源了,對,於是我們讓執行緒休眠一段時間,然後再去看看佇列是否為空,於是有了下面的程式碼:
for( ; !que.empty (); std::this_thread::sleep_for(std::chrono::seconds(1))) {}
這樣子確實不浪費資源,但是存在一個問題:響應不及時。還是講一個故事來說說。
假如你今天晚上通宵打了一晚上游戲,明天要坐十小時的火車,你準備在或者上補一覺,到站下車,但是,你怎麼就知道火車到站了呢?要麼你就不睡,眼睛一直睜著,但是這樣搞,你可能會猝死;要麼你就睡一個小時就起來看看是否到站了,如果沒到站繼續睡,但是萬一你剛睡著,火車到站了,這個時候就蛋疼了。那怎麼辦呢?很簡單,睡之前告訴下乘務員,讓他到站叫你。
那麼這個“到站叫你”這個操作在C++如何實現呢,C++11有了std::condition_variable
con.wait();
只要生產者往佇列中塞元素,就呼叫con.notify()
,這個操作就是通知。這樣子就可以很好地解決這個問題。
條件變數可以很好處理執行緒的同步關係。利用條件變數去等待條件是多執行緒程式設計的利器。
那麼條件變數std::condition_variable
該如何使用?
條件變數的成員函式:
notify_one
、notify_all
;wait
、wait_for
、wait_until
;
一個等待,一個喚醒。對,條件變數的使用就是這麼簡單。
條件變數通常和鎖一起使用,也就是說,條件變數在等待的時候會釋放鎖,當條件變化時得到鎖,如果發現條件不是正在等待的,馬上又釋放鎖。
用條件變數示範一下上面說的那個生產者消費者佇列:
std::condition_variable con;
std::mutex mu;
std::queue<int> que;
void produce()
{
std::lock_guard<std::mutex> lock(mu);
que.push(std::rand());
con.notify_one();
}
void consume()
{
std::unique_lock<std::mutex> lock(mu);
con.wait(lock, [] () { return !que.empty(); });
auto t = que.front();
que.pop();
}
使用wait
的地方使用std::unique_lock
是因為std::unique_lock
比std::lock_guard
更好的靈活性(條件變數在等待的時候會釋放鎖,當條件變化時得到鎖,如果發現條件不是正在等待的,馬上又釋放鎖,std::unique_lock
可以中途解鎖,而std::lock_guard
沒有解鎖這個成員函式,作用域一開始就加鎖,直到作用域結束才解鎖),
寫到這裡好像就結束了,但是條件變數就是完美的解決辦法嗎,它就沒有一點缺陷?正文開始。
produce
和consume
的呼叫順序不定,有沒有可能consume
在produce
之後呼叫,這種情況consume
會永遠阻塞;
有可能notify
還沒呼叫,wait
就直接返回了,這是完全有可能的,這和條件變數的底層實現有關,不可避免。
以下是維基百科的一段話:
“This means that when you wait on a condition variable, the wait may (occasionally) return when no thread specifically broadcast or signaled that condition variable. Spurious wakeups may sound strange, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations. The race conditions that cause spurious wakeups should be considered rare.”
因此,條件變數會導致兩個問題:
- 提前喚醒
- 虛假喚醒
那麼如何對這兩個問題亡羊補牢呢?對於提前喚醒,可以在wait
之前判斷下條件,如果已經符合條件就不wait
了。
於是消費者可以下面這個樣子:
void consume()
{
std::unique_lock<std::mutex> lock(mu);
if (que.empty()) {
con.wait(lock, [] () { return !que.empty(); });
}
auto t = que.front();
que.pop();
}
那麼虛假喚醒呢?可以在wait
返回之後,在對條件進行判斷,如果wait
的返回不是因為條件得到了滿足,那麼就繼續wait
。於是消費者又可以搞成下面這樣:
void consume()
{
std::unique_lock<std::mutex> lock(mu);
if (que.empty()) {
con.wait(lock, [] () { return !que.empty(); });
if (que.empty()) {
con.wait(lock, [] () { return !que.empty(); });
}
}
auto t = que.front();
que.pop();
}
但是,有更精簡的寫法:
void consume()
{
std::unique_lock<std::mutex> lock(mu);
while (que.empty()) {
con.wait(lock, [] () { return !que.empty(); });
}
auto t = que.front();
que.pop();
}
其實上面說了兩個問題的解決措施在C++11的std::condition_variable
上使用是多此一舉的,這兩個問題標準庫早就考慮到了,我們可以看看標準庫條件變數的std::condition_variable::wait
原始碼:
// <condition_variable>
template<typename _Predicate>
void wait(unique_lock<mutex>& __lock, _Predicate __p)
{
while (!__p())
wait(__lock);
}
所以,我上面的程式碼所有使用到std::condition_variable
的地方都可以直接使用,如果把std::condition_variable
換成POSIX的pthread_con_t
,上面的這些操作都是有意義的,我這裡用std::condition_variable
替代POSIX的pthread_con_t
只是為了演示方便。
那麼標題中提到的事件Event是什麼東西,其實這個東西就是對條件變數的再次封轉。使得程式碼更加簡潔,邏輯更加清晰。至於為什麼取名為事件Event,想想條件變數的適用場景,當發生特定事件(條件得到滿足)時,執行特定操作,Event把執行的特定時間這項工作交給使用者,自己則負責告訴使用者:“你期待的事件現在已經發生了,趕快執行你的操作吧”。
於是,Event的定義是:Event是一個同步物件,它允許一條執行緒通知另外的一條或者多條執行緒特定的事件已經發生了。
剩下的就是Event的實現了。
綜合上面的分析,用一個標誌變數來標識特定的事件是否發生了(條件滿足就設定標誌變數,把業務邏輯分離出去),再用一個標誌變數標識是否全部喚醒。
class Event
{
public:
Event() = default;
void Wait()
{
std::unique_lock<std::mutex> lock(mu);
con.wait(lock, [this] () { return this->flag || this->all; });
if (!all)
flag = false;
}
template <typename _Rep, typename _Period>
bool WaitFor(const std::chrono::duration<_Rep, _Period> & duration)
{
std::unique_lock<std::mutex> lock(mu);
bool ret = true;
ret = con.wait_for(lock, duration, [this] () { return this->flag || this->all; });
if (ret && !all)
flag = false;
return ret;
}
template <typename _Clock, typename _Duration>
bool WaitUntil(const std::chrono::time_point<_Clock, _Duration> & point)
{
std::unique_lock<std::mutex> lock(mu);
bool ret = true;
ret = con.wait_until(lock, point, [this] () { return this->flag || this->all; });
if (ret && !all)
flag = false;
return ret;
}
void NotifyOne()
{
std::lock_guard<std::mutex> lock(mu);
flag = true;
con.notify_one();
}
void NotifyAll()
{
std::lock_guard<std::mutex> lock(mu);
all = true;
con.notify_all();
}
void Reset()
{
std::lock_guard<std::mutex> lock(mu);
flag = all = false;
}
private:
Event(const Event &) = delete;
Event & operator = (const Event &) = delete;
private:
bool flag = false;
bool all = false;
std::mutex mu;
std::condition_variable con;
};
把生產者消費者佇列那個程式用Event
改寫,然後在main
函式開兩條執行緒測試一下。
Event event;
std::queue<int> que;
std::mutex mu;
void produce()
{
std::cout << "produce" << std::endl;
do {
std::lock_guard<std::mutex> lock(mu);
que.push(std::rand());
} while (false);
event.NotifyOne();
std::cout << "produce end" << std::endl;
}
void consume()
{
std::cout << "consume" << std::endl;
event.Wait();
std::unique_lock<std::mutex> lock(mu);
auto t = que.front();
que.pop();
std::cout << "consume end" << std::endl;
}
int main()
{
auto th = std::thread(consume);
std::this_thread::sleep_for(std::chrono::seconds(2));
std::thread(produce).join();
th.join();
return 0;
}
結果的話,看程式碼就知道了,不過使用Event
特別容易出現死鎖,避免把Event
的操作放在鎖的範圍中,因為Event
本身就有一把鎖,鎖套鎖,一不小心,加鎖順序錯亂就死鎖了。這個體會下上面的程式就明白了。
到這裡,博文就結束了。Event
的簡單封裝的背後其實也是有這麼長一段心路歷程的,還是要多挖掘。
參考: