C++11多執行緒---互斥量、鎖、條件變數的總結
關於互斥量std::mutex
的總結
互斥量用於組成程式碼的臨界區。C++的多執行緒模型是基於記憶體的,或者說是基於程式碼片段的,這和我們作業系統學習的臨界區概念基本一致,但是與Golang不同,Golang是基於訊息模型的。
一個std::mutex
的lock()
和unlock()
之間的程式碼片段組成一個臨界區,這個臨界區內部同時最多隻能有一個執行緒進行訪問,可以理解為這個片段內部的程式碼是受到保護的,不會被多執行緒同時訪問造成不可預知的問題。
std::mutex mtx;
mtx.lock();
// 這裡的程式碼,同時最多隻能有一個執行緒進行訪問
mtx.unlokc ();
當一個執行緒獲取到一個std::mutex
並且呼叫lock()
後,必須由同一個執行緒呼叫unlock()
操作,因此一定要注意兩個函式成對出現,否則會造成死鎖。
當一個執行緒獲取一個std::mutex
並呼叫lock()
函式後,其他執行緒的呼叫會失敗。可以使用try_lock
進行判別,具體細節參考:https://en.cppreference.com/w/cpp/thread/mutex
關於std::lock_guard
和std::unique_lock
的總結
std::lock_guard
比較好理解,因為呼叫mutex
需要時刻記著解鎖,所用這個類封裝了一系列的操作,在一個模組中構造了一個std::lock_guard
std::lock_guard
自動析構,相當於解鎖。它的結構簡單、速度快,但是功能比較少。
std::unique_lock
是對std::lock_guard
功能的一個拓展,功能更多,但是速度會慢一些,具體參照:https://en.cppreference.com/w/cpp/thread/unique_lock
關於條件變數std::condition_variable
的總結
這個相當於作業系統中的wait
和signal
原語操作,需要結合一個std::unique_lock
組成的臨界區共同完成功能。wait & signal
原語操作最典型的特點是 “阻塞自己,喚醒別人”。可以這麼理解,如果當前滿足特定條件不滿足,那麼就不能進入臨界區,當前執行緒阻塞。當其他執行緒處理完後,使得條件滿足了,執行緒會喚醒那些處於阻塞狀態的執行緒,使之重新進入。直接通過下面的程式碼來說明,經典的生產者和消費者問題。
注意只有一個互斥量的時候,喚醒順序的問題,參照官網:https://en.cppreference.com/w/cpp/thread/condition_variable
在這裡給出一個更加簡潔的例子:
void worker_thread()
{
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, pred});
// 這裡執行工作程式碼,注意下面兩個語句的順序
lk.unlock();
cv.notify_one();
}
上述程式碼中,先執行lk.unlock()
,說明當前執行緒放棄對臨界區的所有權,此時再呼叫notify_one
會喚醒其他執行緒來對臨界區執行操作。如果先喚醒其他執行緒,則可能unlock
未執行完畢,就有執行緒到臨界區了,此時新來的執行緒又會阻塞了。
程式碼示例
下面程式碼總共是3個例子。第一個Application
是模擬一個事件處理系統的,但是沒有使用條件變數,自己實現了一下,第二個Application
使用了條件變數。第三個ProduceAndConsume
是典型的生產者和消費者模型。
未使用條件變數,僅僅藉助迴圈的載入資料應用
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>
#include <random>
#include <cstdlib>
const int MAXT = 1000;
std::uniform_int_distribution<int>dis(1, MAXT);
std::random_device rd;
std::mt19937 gen(rd());
class Application {
public:
void mainTask() {
std::cout << "Do some main task...\n";
auto t = dis(gen); // 隨機時間模擬主線任務
std::this_thread::sleep_for(std::chrono::milliseconds(t));
std::cout << "Finish main task in " << t << " ms\n";
mtx.lock();
while (!m_bDataLoaded) {
mtx.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(100));
mtx.lock();
}
mtx.unlock();
std::cout << "Get loaded data\n";
}
void loadData() {
std::cout << "Loading data...\n";
auto t = dis(gen); // 隨機時間模擬主線任務
std::this_thread::sleep_for(std::chrono::milliseconds(t));
std::lock_guard<std::mutex>lck(mtx);
m_bDataLoaded = true;
std::cout << "Finish loading data in " << t << " ms\n";
}
bool isDataLoaded()const {
return m_bDataLoaded;
}
private:
bool m_bDataLoaded{ false };
std::mutex mtx;
std::condition_variable m_convVar;
};
int main() {
Application app;
std::thread t1(&Application::mainTask, &app);
std::thread t2(&Application::loadData, &app);
t1.join();
t2.join();
system("pause");
return 0;
}
結果:
使用了條件變數的載入資料應用
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>
#include <random>
#include <cstdlib>
const int MAXT = 1000;
std::uniform_int_distribution<int>dis(1, MAXT);
std::random_device rd;
std::mt19937 gen(rd());
class Application {
public:
void mainTask() {
std::cout << "Do some main task...\n";
auto t = dis(gen); // 隨機時間模擬主線任務
std::this_thread::sleep_for(std::chrono::milliseconds(t));
std::cout << "Finish main task in " << t << " ms\n";
std::unique_lock<std::mutex> lck(mtx);
m_convVar.wait(lck, std::bind(&Application::isDataLoaded, this));
std::cout << "Get loaded data\n";
}
void loadData() {
std::cout << "Loading data...\n";
auto t = dis(gen); // 隨機時間模擬載入資料任務
std::this_thread::sleep_for(std::chrono::milliseconds(t));
std::cout << "Finish loading task in " << t << " ms\n";
std::unique_lock<std::mutex> lck(mtx);
m_bDataLoaded = true;
lck.unlock(); // 最好是新增上這一句,本例子無所謂
m_convVar.notify_one();
}
bool isDataLoaded()const {
return m_bDataLoaded;
}
private:
bool m_bDataLoaded{ false };
std::mutex mtx;
std::condition_variable m_convVar;
};
int main() {
Application app;
std::thread t1(&Application::mainTask, &app);
std::thread t2(&Application::loadData, &app);
t1.join();
t2.join();
system("pause");
return 0;
}
執行結果:
生產者和消費者模型
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <random>
#include <functional>
#include <algorithm>
#include <queue>
#include <vector>
#include <cstdlib>
const int MAXT = 1000;
const int MAXN = 5;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int>dis(MAXT / 10, MAXT);
class ProducerAndConsumer {
public:
void prodece() {
// 隨機時間,模擬貨物生產過程,生產過程本身不是在臨界區
std::this_thread::sleep_for(std::chrono::milliseconds(2 * dis(gen)));
std::cout << "Prodece\n";
std::unique_lock<std::mutex> lck(mtx);
m_convPro.wait(lck, std::bind(&ProducerAndConsumer::notFull, this));
m_qCargo.push(m_iCargoNum);
++m_iCargoNum;
m_convCon.notify_one();
}
void consume() {
std::unique_lock<std::mutex> lck(mtx);
m_convCon.wait(lck, std::bind(&ProducerAndConsumer::notEmpty, this));
int n = m_qCargo.front();
m_qCargo.pop();
m_convPro.notify_one();
lck.unlock(); // 一定要先解鎖
// 隨機時間,模擬貨物消費過程,消費過程本身不是在臨界區!!!
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
std::cout << "Consume\n";
}
inline bool notFull()const {
return m_qCargo.size() < m_iMaxCargoNum;
}
inline bool notEmpty()const {
return m_qCargo.size() > 0;
}
inline int getBufferSize()const {
return m_qCargo.size();
}
inline int getCargoNum()const {
return m_iCargoNum;
}
private:
std::queue<int> m_qCargo; // 貨物佇列
int m_iMaxCargoNum{ MAXN }; // 最大容量
int m_iCargoNum{ 0 }; // 貨物總數
std::mutex mtx;
std::condition_variable m_convPro, m_convCon;
};
int main() {
ProducerAndConsumer pac;
auto N = std::thread::hardware_concurrency();
std::cout << "Thread num: " << N << std::endl;
std::vector<std::thread>producerThreads;
std::vector<std::thread>consumerThreads;
for (int i = 0; i < N; ++i) {
producerThreads.emplace_back(std::thread(&ProducerAndConsumer::prodece, &pac));
consumerThreads.emplace_back(std::thread(&ProducerAndConsumer::consume, &pac));
}
std::for_each(producerThreads.begin(), producerThreads.end(),
std::mem_fn(&std::thread::join));
std::for_each(consumerThreads.begin(), consumerThreads.end(),
std::mem_fn(&std::thread::join));
std::cout << "Cargo in buffer: " << pac.getBufferSize() << std::endl;
std::cout << "Cargo count: " << pac.getCargoNum() << std::endl;
system("pause");
return 0;
}
執行結果: