C++併發與多執行緒(七) condition_variable、wait、notify_one、notify_all
條件變數condition_variable
condition_variable是一個和條件相關的類,本質上就是等待一個條件達成。使用的時候必須和互斥量mutex配合使用。
使用場景:增加效率
//把訊息從訊息佇列取出
void outMsgRecvQueue() {
int command = 0; //指令為command;
for (int i = 0; i < 10000; i++) {
bool result = outMsgLULProc(command); //將所有對共享資料的訪問都封裝成一個函式,方便加鎖
if (result) {
//訊息佇列不為空
//對根據命令對資料進行處理
}
else {
//訊息佇列為空
cout << "訊息佇列為空!" << endl;
}
}
}
bool outMsgLULProc(int &command) {
//對共享資源上鎖
unique_lock<mutex> myUnique2(m_mutex2);
if (!msgRecvQueue.empty()) {
//訊息不為空
int command = msgRecvQueue.front() ; //返回第一個元素,但不檢查是否存在
msgRecvQueue.pop_front();
return true;
}
return false;
}
上面是(五)中寫的取訊息佇列的部分程式碼,outMsgRecvQueue
中不停的迴圈,呼叫outMsgLULProc
檢視佇列是否為空,只有訊息不為空,才會進行操作,CPU被極大的浪費在了檢視佇列是否為空上;(六)中提到的雙重加鎖,可以一定程式上減少判斷,提高效率,這裡利用條件變數可以更加提高效率:
一、outMsgRecvQueue呼叫wait()
outMsgLULProc
修改成一直等待,訊息佇列中有資料的時候,來通知執行緒
//把訊息從訊息佇列取出
void outMsgRecvQueue() {
int command = 0;
while (true) {
unique_lock<mutex> myUnique(m_mutex2);
m_cond.wait(myUnique, [this] { //一個lamda表示式就是一個可呼叫物件(函式)
if (!msgRecvQueue.empty()) {
return true;
}
else
{
return false;
}
});
//未完,待續
}
}
//...
condition_variable m_cond; //生成一個條件變數物件
wait()方法是用來等待一個條件
-
如果第二個引數的lambda表示式返回值是false,那麼wait()將解鎖互斥量,並阻塞到本行
-
如果第二個引數的lambda表示式返回值是true,那麼wait()直接返回並繼續執行。
-
阻塞的時機為,一直阻塞到**其他某個執行緒呼叫notify_one()**成員函式為止;
-
如果沒有第二個引數,那麼效果跟第二個引數lambda表示式返回false效果一樣
二、inMsgRecvQueue呼叫notice_one()
繼續修改inMsgRecvQueue
,增加呼叫notify_one()成員函式:
//把收到的訊息加入訊息佇列
void inMsgRecvQueue() {
for (int i = 0; i < 10000; i++) {
cout << "插入一個元素到訊息佇列";
//互斥量mutex的使用,保護共享資源
unique_lock<mutex> myUnique1(m_mutex1);
msgRecvQueue.push_back(i);
m_cond.notify_one(); //去把wait的執行緒喚醒,執行完這一行,outMsgRecvQueue中的wait會被喚醒
//喚醒之後繼續他的操作
//其它處理步驟...
}
}
三、outMsgRecvQueue的wait被notice_one喚醒後
-
wait()不斷嘗試獲取互斥量鎖,如果獲取不到那麼流程就卡在wait()這裡等待獲取,如果獲取到了,那麼wait()就繼續執行,這也表明獲取到了鎖。(也就是說,只要執行到了wait後面的程式碼,此執行緒一定獲取到了鎖)
-
如果wait有第二個引數就判斷這個lambda表示式。
2.1 如果表示式為false,那wait又對互斥量解鎖,然後又休眠,等待再次被notify_one()喚醒
2.2 如果lambda表示式為true,則wait返回,流程可以繼續執行(此時互斥量已被鎖住)。
- 如果wait沒有第二個引數,則wait返回,流程走下去。
完整程式碼:
class MyPrint {
public:
//把收到的訊息加入訊息佇列
void inMsgRecvQueue() {
for (int i = 0; i < 10000; i++) {
cout << "插入一個元素到訊息佇列";
//互斥量mutex的使用,保護共享資源
unique_lock<mutex> myUnique1(m_mutex1);
msgRecvQueue.push_back(i);
m_cond.notify_one(); //去把wait的執行緒喚醒,執行完這一行,outMsgRecvQueue中的wait會被喚醒
//喚醒之後繼續他的操作
//其它處理步驟...
}
}
//把訊息從訊息佇列取出
void outMsgRecvQueue() {
int command = 0;
while (true) {
unique_lock<mutex> myUnique(m_mutex2);
//當其它執行緒用notice_one將wait喚醒
m_cond.wait(myUnique, [this] { //一個lamda表示式就是一個可呼叫物件(函式)
if (!msgRecvQueue.empty()) {
return true;
}
else
{
return false;
}
});
//wait後,佇列一定不為空,一定獲取到了鎖
command = msgRecvQueue.front(); //返回第一個元素,但不檢查是否存在
msgRecvQueue.pop_front();
}
}
list<int> msgRecvQueue; //訊息佇列
//死鎖演示
mutex m_mutex1;
mutex m_mutex2;
condition_variable m_cond; //生成一個條件變數物件
};
PS:重點是理解wait被notice_one喚醒之後的流程。
深入思考
上面的程式碼可能導致出現一種情況:
事實上執行的時候,不一定是inMsgRecvQueue()
和outMsgRecvQueue()
你執行一次,我執行一次,這麼簡單,兩個程序獲取鎖,都是概率成功的,競爭鎖的時候,誰獲得和排程演算法有關,都是可能成功的。
因為outMsgRecvQueue()
與inMsgRecvQueue()
並不是一對一執行的,所以當程式迴圈執行很多次以後,可能在訊息佇列中已經有了很多訊息,但是,outMsgRecvQueue
還是被喚醒一次只處理一條資料。這時可以考慮把outMsgRecvQueue
多執行幾次,或者對inMsgRecvQueue
進行限流。
notice_all()
notify_one()
:通知一個執行緒的wait()
notify_all()
:通知所有執行緒的wait()