C++ 11中的多執行緒
C++11多執行緒簡單使用
開篇介紹
C++11中引入了多執行緒標頭檔案<thread>
,讓我們能更方便的使用多執行緒進行程式設計
void TestThread(int index) { std::cout << "Child Thread" << index << " id " << std::this_thread::get_id() << std::endl; std::cout << "Child Thread" << index << "Stop" << std::endl; } int main() { std::thread newThread1(TestThread, 1); std::thread newThread2(TestThread, 2); std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl; std::cout << "Main Thread Stop" << std::endl; if (newThread1.joinable()) newThread1.join(); if (newThread2.joinable()) newThread2.join(); }
眾所周知,執行緒具有非同步性,也就是說這道程式的推進的方向是不確定的,而事實也正是如此
有兩句話說得好,彙總一下就是:join()
和detach()
總要呼叫一個,並且呼叫之前最好是要進行joinable()
檢查
Never call join() or detach() on std::thread object with no associated executing thread
Never forget to call either join or detach on a std::thread object with associated executing thread
如果對一個子執行緒執行兩次join()
void TestThread(int index) { std::cout << "Child Thread" << index << " id " << std::this_thread::get_id() << std::endl; std::cout << "Child Thread" << index << "Stop" << std::endl; } int main() { std::thread newThread(TestThread, 1); newThread.join(); newThread.join(); }
cppreference中提到,當joinable()
的執行緒被賦值或析構的時候,會呼叫std::terminate()
,而一般std::terminate()
意味著std::abort()
,也就是說如果對一個執行緒既不執行join()
操作也不執行detach()
操作,而它卻被析構了,那麼“it will cause the program to Terminate(終止)”
void TestThread(int index)
{
std::cout << "Child Thread" << index << " id " << std::this_thread::get_id() << std::endl;
std::cout << "Child Thread" << index << "Stop" << std::endl;
}
int main()
{
std::thread newThread(TestThread, 1);
newThread = std::thread(TestThread, 2);
std::cout << "Main Thread Sleep Begin" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "Main Thread Sleep End" << std::endl;
if (newThread.joinable())
newThread.join();
}
拋開Bug不談,由於執行緒的非同步性,誰也說不清是先輸出Thread1還是輸出Thread2
針對這個Bug,可以這麼理解:執行緒1由物件newThread管理,當C++底層告知作業系統去建立一個新執行緒並給它分配一些任務後,卻馬上建立了一個執行緒2交給newThread,這樣子產生了一個覆蓋操作,導致執行緒1被析構,而它又是可結合的,所以程式被terminate
正確的做法是
int main()
{
std::thread newThread(TestThread, 1);
if (newThread.joinable())
newThread.join();
newThread = std::thread(TestThread, 2);
if (newThread.joinable())
newThread.join();
}
執行緒的構造
通過前文中的示範,我們瞭解到可以通過傳入一個函式指標來給一個執行緒分配它的“任務”
除了函式指標外,還可以通過lambda表示式,std::function
,std::bind
,仿函式物件等來解決
struct Handle
{
void operator()(int data) { std::cout << data << std::endl; }
};
int main()
{
std::thread t(Handle(), 10);
if (t.joinable())
t.join();
}
join與detach
newThread.join()
操作代表當前執行緒將阻塞,直到newThread
完成它的任務newThread.detach()
操作代表newThread
與當前執行緒分開(即當前執行緒結束銷燬後並不會同時銷燬newThread
),但是程式結束後newThread
仍會被終止(即使它還沒執行完,也會被作業系統強行叫停,這也可能會導致資源沒有正確的釋放)
class TestClass
{
public:
TestClass() { cout << "create" << endl; }
TestClass(const TestClass& t) { cout << "copy" << endl; }
void print(int num)
{
for (int i = 0; i < num; i++)
cout << i << ends;
cout << endl;
}
};
void Func2()
{
cout << "start" << endl;
TestClass t;
std::thread newThread(&TestClass::print, &t, 10);
// 讓當前執行緒等待newThread完成
newThread.join();
cout << "end" << endl;
}
int main()
{
std::thread t(Func2);
if (t.joinable())
t.join();
return 0;
}
程式的執行結果為
void repeat1000(int index, int time)
{
for (int i = 0; i < time; i++)
cout << index;
}
void detach_test()
{
thread newThread(repeat1000, 1, 100000);
newThread.detach();
newThread = thread(repeat1000, 2, 1000);
newThread.join();
}
int main()
{
{
thread t(detach_test);
t.join();
}
cout << endl << "start wait" << endl;
this_thread::sleep_for(chrono::seconds(5));
return 0;
}
我認為這是一個很好的解釋join()
和detach()
的例子,下面來分析一下
- 首先建立了執行緒
t
,給它分配了detach_test()
任務 - 主執行緒阻塞 等待
t
完成它的任務 - 同時作業系統完成了對
t
的分配,開始執行detach_test()
t
程序再建立子執行緒newThread
,並給他分配(repeat1000, 1, 100000)
任務- 作業系統在分配
newThread
的“同時”,t
執行緒將其和newThread
分離 - 此時
(repeat1000, 1, 100000)
仍然在執行,同時t
執行緒給newThread
賦了一個新執行緒,任務為(repeat1000, 2, 1000)
newThread
呼叫join()
操作,t
程序阻塞,等待newThread
的(repeat1000, 2, 1000)
工作完成newThread
的(repeat1000, 2, 1000)
完成,也意味著t
程序的任務完成,此時主執行緒不再受到阻塞t
執行緒離開作用域,遭到銷燬。但此時(repeat1000, 1, 100000)
操作因為比較耗時所以仍然還在執行- 主程式輸出"start wait"後開始休眠5秒鐘,此時執行緒
(repeat1000, 1, 100000)
仍然在執行 - 最後程式結束,各種執行緒被作業系統銷燬(即可能
(repeat1000, 1, 100000)
執行到第八萬次的時候就突然被終止了)
在一般情況下,為了避免detach
或join
使用不當造成的程式錯誤,可以建立一個執行緒類,使用解構函式執行執行緒的分離或合併(join
)
執行緒傳參時應該避免的操作
應該避免傳入棧上物件的指標
void newThreadCallback(int* p)
{
std::cout << "Inside Thread: " << *p << std::endl;
// 等一秒鐘 讓startNewThread()執行結束 使i的記憶體空間被回收
std::this_thread::sleep_for(std::chrono::seconds(1));
// 丟擲異常
*p = 19;
}
void startNewThread()
{
int i = 10;
std::cout << "Inside Main Thread: " << i << std::endl;
std::thread t(newThreadCallback, &i);
t.detach();
std::cout << "Inside Main Thread: " << i << std::endl;
}
int main()
{
startNewThread();
// 等兩秒鐘 讓所有執行緒和方法都執行完畢再結束程式
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
堆上的資料同理
因為一般堆物件需要使用delete
來銷燬,所以也無法確定別的執行緒訪問到指標的時候,它所指的記憶體是否有效
執行緒的引用傳參
使用std::ref
或者std::cref
,使用方法幾乎和std::bind
中使一致的,所以不再贅述
多執行緒中的競爭
作業系統應該學過,競爭就是多個執行緒同時訪問一塊記憶體區域,導致不可預估的結果
class Wallet
{
int money;
public:
Wallet() : money(0) {}
int getMoney() const { return money; }
void addMoney(int increase)
{
for (int i = 0; i < increase; ++i)
money++;
}
};
int testMultiThreadWallet()
{
Wallet walletObject;
std::vector<std::thread> threads;
// 建立五條執行緒非同步訪問Wallet "理應"得到的結果為5000
threads.reserve(5);
// reserve對應_back而resize對應[i]
for (int i = 0; i < 5; ++i)
threads.emplace_back(&Wallet::addMoney, &walletObject, 1000);
// 等所有執行緒執行完再結束
for (auto& thread : threads)
thread.join();
return walletObject.getMoney();
}
int main()
{
int val = 0;
for (int k = 0; k < 1000; k++)
{
if ((val = testMultiThreadWallet()) != 5000)
std::cout << "Error at count = " << k << " Money in Wallet = " << val << std::endl;
}
return 0;
}
某次程式執行的結果為,這種結果是不確定的,可能1000次實驗,一次都不會出錯,也可能出現多至十多次錯誤
這一行短短的程式碼其實發生了三件事
money++;
- 將
money
的值載入進暫存器中 - 在暫存器中進行計算,即++操作
- 將暫存器中的結果存回
money
所在的記憶體中
而當由於執行緒具有非同步性,在不加鎖的情況下我們無法控制多條執行緒對money
的訪問順序,那麼就可能出現以下這種情況
- 假設money的初始值是43
- money的值被執行緒1取出放入暫存器1中
- money的值被執行緒2取出放入暫存器2中
- 暫存器1進行計算得到結果44
- 暫存器2進行計算得到結果44
- 暫存器1將結果放入money所在記憶體中,money為44
- 暫存器2將結果放入money所在記憶體中,money為44
解決執行緒中的競爭
作業系統中的PV操作與之類似,關鍵就是加鎖
std::mutex
使用互斥鎖解鎖上文中的錢包問題
#include<mutex>
class Wallet
{
int money;
mutex mutexLock;
public:
Wallet() : money(0) {}
int getMoney() const { return money; }
void addMoney(int increase)
{
mutexLock.lock();
for (int i = 0; i < increase; ++i)
money++;
mutexLock.unlock();
}
};
但是如果一個執行緒在加鎖後並沒有解鎖,那麼所有其他執行緒將會一直等待,導致程式無法結束(當使用join
的情況下)
相反的,如果沒上鎖就解鎖
因此可以在此基礎上做一層簡單的封裝
class SmartMutex
{
private:
mutex mutexLock;
public:
void AutoLock(const function<void()>& func)
{
mutexLock.lock();
func();
mutexLock.unlock();
}
};
class Wallet
{
int money;
SmartMutex smartMutex;
public:
Wallet() : money(0) {}
int getMoney() const { return money; }
void addMoney(int increase)
{
smartMutex.AutoLock([this, &increase]()
{
for (int i = 0; i < increase; ++i)
this->money++;
});
}
};
或者可以使用std::lock_guard
std::lock_guard
std::lock_guard
有點像一個智慧指標,在它的作用域結束後,會呼叫解構函式,完成對互斥鎖的解鎖操作
class Wallet
{
int money;
mutex mutexLock;
public:
Wallet() : money(0) {}
int getMoney() const { return money; }
void addMoney(int increase)
{
// 在lockGuard的建構函式中會自動上鎖
lock_guard<mutex> lockGuard(mutexLock);
for (int i = 0; i < increase; ++i)
this->money++;
// 離開作用域 lockGuard解構函式呼叫 自動解鎖
}
};