程式設計師的自我修養(六):保護執行緒間的共享資料 轉載
程式設計師的自我修養(六):保護執行緒間的共享資料
多程序和多執行緒最本質的區別在於共享和隔離的程度不同。對於多程序方式來說,因為隔離程度高,所以程式設計師很少需要去擔心程序空間的資料被破壞;但是併發任務之間共享資料就變得很困難了。對於多執行緒方式來說,因為隔離程度低,所以共享資料非常容易;但是,相應地,程式設計師需要更多地考慮如何線上程之間安全地共享資料。這就引出了所謂的「執行緒安全」問題。
此篇,我們討論如何線上程之間安全地共享資料。
線上程間共享資料的問題
引子
讓我們先來看一個有味道的例子。
假設你邀請朋友到家裡來派對。這個派對有些特殊,需要各自準備好食材,而後烹飪成美味——大家比比手藝。朋友 A 準備做辣椒炒肉,而朋友 B 決定用韭菜炒蛋一展身手——他們都已經備好了菜等待下鍋。
不幸的是,你家只有一口鍋,但是朋友 A 和 B 兩不相讓。他們在幾乎同一時間,分別把自己手頭備好的菜下了油鍋:
- 朋友 A 把切好的辣椒下了油鍋——嘶~
- 朋友 B 把打好的蛋液下了油鍋——譁~
於是,廚房裡傳出了辣椒炒蛋的香味……
在這個例子裡,最終朋友 A 和朋友 B 合作了一盤「辣椒炒蛋」。雖然這不是他們預期的結果,但好歹沒有引起災難性的後果。不過,這種「好結果」並不是每次都能發生;更多的時候,可能會做出一堆奇奇怪怪的黑暗料理。
這件事情和我們今天要討論的問題很是相像,我們看看下面這張表。
美食派對 | 執行緒間資料共享 |
---|---|
廚房 | 程序空間 |
油鍋 | 可線上程間共享的資料 |
朋友 A 和朋友 B | 兩個執行緒 |
對於沒事派對中的這個小插曲,顯而易見地,作為主人需要定下一些規矩。對於這個問題,主人需要規定:別人使用完畢之前,其他人不能用鍋子(當然也包括鍋鏟、煤氣灶之類的)。線上程間共享資料也是如此。程式設計師在設計多執行緒併發程式時,需要協調好多個執行緒對資料的操作:應當在何時,如何操作資料,並與其它執行緒進行通訊。
不變數
在具體討論之前,我們先了解一下「不變數」這個概念。
所謂不變數,指的是對於某個資料結構,當其合法可用時,總是成立的一個命題。在程式設計時引入不變數,可以幫助程式設計師正確地處理資料。為了更好地理解不變數,我們來看一下下面這個簡單的例子。
int sum{0}; constexpr size_t times{100}; // invariant: `i` is the number of integer that we have added to `sum`. for (size_t i{0}; i != times; ++i) { sum += i + 1; } // some use of `sum`
這是一個非常簡單的例子,此處僅用於說明不變數。
在註釋中,我已經註明了此處用到的不變數:i
是已經累加的次數。對於 i
來說,它在迴圈開始前、每次迴圈迭代之後、迴圈結束的瞬間,都應該是合法可用的。因此,作為不變數,應該在這三種情形下都成立。若不然,則說明迴圈寫錯了。
- 開始前:在迴圈開始前,我們尚未進行過累加,因此
i == 0
成立; - 迭代過後:每迭代一次,
++i
保證了不變數成立; - 迴圈結束:由於
i
自0
開始自增,直到第一次滿足i != times
的條件終止迴圈,因此在迴圈終止時,i == times
成立。
經過這樣的驗證,我們應當很有信心地說:我的程式碼是正確的。
注 1:應當寫明顯沒有錯誤的程式碼,比如這個例子。
注 2:在這個例子中,之所以將迴圈條件記為i != times
而不是i < times
,正是因為有不變數的存在。若是記作i < times
,那麼迴圈中止時,應有i >= times
;對應到不變數上,就是「至今為止,已經累加了不小於times
次」,而這與程式碼的意圖是不一致的。故而,對 C++ 程式設計師來說,建議寫作i != times
而不是i < times
。
注 3:此外,C++ 對 STL 容器引入了「迭代器」的概念。迭代器過載了==
和!=
等符號,但並不一定過載了了<
號。事實上,對兩個迭代器進行「大小比較」是沒有意義的。為了保證一致性,從迭代器的角度說,也不建議 C++ 程式設計師將此類迴圈條件寫作i < times
。
回過頭,再來看一下這個簡短的例子。我們不難發現,在執行完 sum += i + 1
的一瞬間,事實上不變數並不成立。比如,當 i == 30
時,我們執行完畢上述賦值表示式時,已經進行了 31 次累加。這告訴我們:在對操作資料時,有可能會破壞不變數。這樣一來,迴圈迭代後,對 i
的自增操作,就可以理解為是「維護不變數」了。這件事情告訴我們:不變數不成立,預示著資料不可用(通常表示資料操作進行到一半);操作資料後,需要維護不變數。
我們再來看一個稍微複雜一些的例子。
// invariant: suppose we have two nodes `A` and `B`,
// then if `A->next_ == B`, then we must have `B->perv_ == A`.
template<typename DataType>
class Node {
private:
Node* prev_ = nullptr;
Node* next_ = nullptr;
DataType data;
public:
// ...
void Delete() {
auto prev = this->prev_;
auto next = this->next_;
if (nullptr != prev) {
prev->next_ = next; // 1.
}
if (nullptr != next) {
next->prev_ = perv; // 2.
}
this->Destroy();
return;
}
};
這是一個雙鏈表中結點的簡單示例。同樣,我在程式碼註釋中已經標明瞭不變數,即在雙鏈表可用時,對兩個合法結點來說,應當有 A->next_->perv_ == A
。然而,在刪除結點時(更新資料結構時),不變數會被破壞。比如,當 (1) 已經執行完畢,但 (2) 尚未執行時,上述不變數就是不成立的。
假設現在有兩個執行緒,在對同一個雙鏈表進行操作。其中之一嘗試從前向後遍歷雙鏈表;另一則在刪除其中某一個結點。我們之前說,刪除結點會臨時破壞不變數,而不變數被破壞則表示資料結構是不可用的。那麼正在執行刪除操作的執行緒,就可能導致正在遍歷的執行緒出錯甚至崩潰。
執行緒 1 | 執行緒 2 | 註釋 |
---|---|---|
if (nullptr != curr->next()) |
此時檢查成功 | |
prev->next_ = next |
next 有可能是 nullptr |
|
注意此時不變數被破壞了 | ||
work = curr->next() |
現在 work 可能是 nullptr |
|
data = work->data() |
解引用 nullptr ,段錯誤 |
上表展現了兩個執行緒同時操作一個雙鏈表時,可能出現的情況。在這種情況下,執行緒 1 可能因為解引用空指標,而引發段錯誤,導致整個程序崩潰。
競爭狀態
我們再看一個有小情緒的例子。
對於國人來說,「春運」總是很頭疼的問題。在春運期間,火車票總是不夠的。基本上,任何一次買票行為,其結果(能不能買到火車票),都取決於你下手的時機——是否足夠快。對於你來說,同樣是「點選滑鼠確認」這個動作,其結果實際上是不確定的。具體是何種結果,取決於你的行為與其他人行為的相對順序。
在計算機世界中,我們把結果取決於多個執行緒執行指令的相對順序的情形,稱為「競爭狀態」。若是競爭狀態發生在多個執行緒對同一個資料結構的修改上,則稱其為「資料競爭」。因為競爭狀態的結果是不確定的,所以資料競爭可能導致未定義行為(Undefined Behavior, 縮寫 UB,讀作「有病」)。
結合不變數的概念,不難理解。如果一個行為可能在其中間狀態破壞不變數,則由此行為引發的競爭狀態,可能導致災難後果。而若要破壞不變數,通常來說意味著該操作需要更新多個變數,而這些更新操作無法在單個指令完成——因而有被打斷的可能。由資料競爭導致的問題通常難以排查。這是因為,儘管這些操作可能被打斷,但並不是每次都會打斷。通常來說,只有當系統負載很高,CPU 需要頻繁地切換上下文時,資料競爭的可能性才會增大。因此,排查由資料競爭導致的問題,一般來說是非常困難的。
避免惡性資料競爭
既然資料競爭通常可能會招致 UB,那麼我們就要想辦法避免它。通常來說,避免惡性資料競爭有幾個思路。
- 保護資料結構,確保在資料結構更新過程中,其不變數被破壞的中間狀態只有一個執行緒能夠看到。
- 修改資料結構的實現,確保任何對資料結構的更新,在外界看來不變數都是成立的。
- 將所有對資料結構的修改,都交給第三方序列執行。
對於第三種方案,以之前購買火車票的例子來說,可以實現為:當你點選確認按鈕後,購票網站將你的請求傳送給伺服器,伺服器將請求加入一個佇列,並返回一個狀態。例如:「當車次剩餘車票 1000 張,在您之前有 800 個尚未處理的請求」。這樣一來,通常來說,購票行為的結果就是確定的了。
然而,C++ 標準庫並沒有實現上述第三種方案。故此,此處我們不做更深入的討論。
使用互斥量保護資料
保護資料的最基本方法是使用所謂的「互斥量」(mutual exclusion, mutex)。
互斥量的基本邏輯是這樣的。
- 訪問某個資料結構時,首先檢查互斥量。
- 若互斥量被鎖住,則等待,直到互斥量解鎖。
- 若互斥量沒有被鎖住,則鎖住互斥量,而後更新資料,再解鎖。
聽起來是個挺美好的事情,若能實現,那麼資料競爭就能被解決。然而,所謂「沒有銀彈」,並不是說引入互斥量就能完美解決所有問題。
- 首先,互斥量起作用是有前提的。
- 所有可能引發資料競爭的資料結構都被保護起來了;
- 所有可能引發資料競爭的操作,都正確地使用了互斥量。
- 其次,互斥量可能引發所謂的「死鎖」問題。
- 最後,若互斥量過多或過少地保護了資料,都可能出現問題。
在 C++ 中使用互斥量
在 C++ 中,標準庫提供的互斥量是 std::mutex
,它被定義在 mutex
這個標頭檔案中。
互斥量是「鎖」的一種,按照我們在 C++ 的幾個基本原理和技術中的介紹,鎖也是一種資源。因此為了保證資源被正確釋放(正確使用互斥量的條件之一),我們最好是用 RAII 技術將其包裝起來。C++ 標準庫直接提供了這樣的封裝,名為 std::lock_guard
,它也定義在 mutex
這個標頭檔案當中。
我們來看一個簡單的例子。
#include <list>
#include <mutex>
#include <algorithm>
typedef std::lock_guard<std::mutex> guarded_mutex;
std::list<int> data_list; // 1.
std::mutex data_mutex; // 2.
void add_to_list(int new_value) {
guarded_mutex guard{data_mutex}; // 3.
data_list.push_back(new_value);
}
bool list_contains(int value_to_find) {
guarded_mutex guard{data_mutex}; // 4.
return (data_list.end() != std::find(data_list.begin(), data_list.end(), value_to_find));
}
例中,(1) 和 (2) 分別定義了全域性變數。此處,我們意圖用 (2) 定義的全域性互斥量保護對 (1) 定義的連結串列進行保護。(3) 和 (4) 通過 RAII 容器 std::lock_guard
在構造時,對傳入的互斥量 data_mutex
上鎖,並在 guard
銷燬時自動解鎖。
在這個例子中,若是 add_to_list
和 list_contains
分別在不同執行緒中執行,則他們都會嘗試鎖上 data_mutex
這個互斥量。顯而易見,同一時刻,只能有一個函式能成功鎖上它;於是該函式正常執行,而另外一個函式則會陷入等待。這樣一來,data_list
更新期間,不變數被破壞的中間狀態,就只有修改它的執行緒能看到。而對於其他執行緒來說,data_list
要麼沒有被修改,要麼已經修改完成,因而總是可用的。
在這個簡單的例子裡,被保護的資料和互斥量都是全域性變數。顯而易見,這是不好的。首先,使用全域性變數,意味著任何函式都有可能修改它。其次,除了 data_list
和 data_mutex
的定義連在一起,它們之間在程式碼上沒有其他的聯絡。因此,很可能出現程式設計師使用 data_mutex
來保護其他資料;或者使用其他互斥量來保護 data_list
的現象。這樣一來,保護就不完整了。因此,在實際使用中,通常我們會選擇將 data_list
和 data_mutex
封裝在同一個類當中。
限制被保護資料的使用範圍
上一節,我們瞭解瞭如何使用互斥量保護資料。此外,談到了「正確使用」的要求之一:鎖上互斥量之後必須解鎖(否則其他執行緒永遠無法上鎖,就可能陷入無休止的等待)。這一節討論正確使用的另一個要求:必須限制被保護資料的使用範圍。簡單來說,就是不要將被保護資料的指標或引用通過返回值、函式引數的方式,傳到無法控制的範圍內。這個約定,是基於一個簡單的假設:你無法保證在你無法控制的範圍內,其他程式設計師是否按照約定使用互斥量保護這份資料。
我們看一個不好的例子。
template<typename DataType>
class Container {
private:
typedef std::lock_guard<std::mutex> guarded_mutex;
std::mutex mtx_; // 1.
DataType* data_; // 2.
public:
// ...
template<typename FuncType>
void process_data(FuncType func) { // 3.
guarded_mutex l(this->mtx_);
func(this->data); // 4.
return;
}
};
std::list* unprotected;
void malicious_function(std::list* protected_data) {
unprotected = protected_data; // 5.
return;
}
Container<std::list> ctn;
void foo () {
ctn.process_data(malicious_function);
unprotected->clear(); // 6.
}
在這個例子當中,Container
使用 (1) mtx_
保護 (2) data_
。但需要注意的是,(3) 接收的函式 func
是一個外部函式。由於在寫 Container
這段程式碼時,你無法預知以後的使用者,會傳遞何種 func
進來。所以對於 Container
的作者來說,func
是不可控的。但是,(4) 將內部被保護的資料的指標,作為引數傳遞給了 func
。接下來的事情就變得糟糕了。首先我們定義了一個惡意函式 malicious_function
,在其中 (5) 將傳遞進來的指標賦值給了一個沒有任何保護的全域性指標變數 unprotected
。而最後,在沒有任何保護的情況下,(6) 清空了整個連結串列。這個操作是非常危險的。
因此,在使用互斥量保護資料的時候,需要注意:
- 不能將被保護資料的指標或引用以函式返回值的形式,返回給外部不可控的呼叫者;
- 不能將被保護資料的指標或引用以函式引數的形式,傳遞給外部不可控的呼叫者。
死鎖及其解法
回到我們最開始有味道的例子。
假設現在你給鍋、鏟都加上了互斥量。這樣一來,你希望朋友 A 和朋友 B 不會在同一時間共用鍋鏟,避免未定義的行為。然而,新的問題又來了。
朋友 A | 朋友 B | 註釋 |
---|---|---|
給鍋鏟上鎖 | ||
給鍋子上鎖 | ||
開始使用鍋子 | ||
開始使用鍋鏟 | ||
嘗試獲取鍋子的鎖 | 朋友 A 開始等待 | |
嘗試獲取鍋鏟的鎖 | 朋友 B 開始等待 |
現在,因為有鎖的保護,所以朋友 A 和朋友 B 不會再同時使用鍋或鏟了,避免了「辣椒炒蛋」的鬧劇。但是,現在朋友 A 和朋友 B 來到你的面前,向你哭訴:「我的心在等待,永遠在等待」。
朋友 A 期待使用鍋子,然而因為鍋子對應的鎖被朋友 B 鎖上,所以朋友 A 不得不等待朋友 B 使用完畢之後才行;另一方面,朋友 B 需要使用鍋鏟,對應的鎖卻被朋友 A 鎖上了。這樣一來,由於朋友 A 和朋友 B 互相等待,但各自又什麼都做不了。於是兩人只能大眼瞪小眼,永遠「耗下去」。
在併發程式設計領域中,我們把這種現象稱之為「死鎖」。對於由「鎖」引起的「死鎖」,它有幾個特點:
- 完成一個任務,需要獲取多把鎖;
- 存在資料競爭;
- 各自持有一部分資料對應的鎖,互相等待,永不釋放。
為了解決死鎖問題,前輩們曾經提出了很多方案。其中最基本的一個方案是說:在操作需要獲取多把鎖時,總是以固定的順序獲取這些鎖。比如,在我們的例子裡,如果要求必須先獲取鍋鏟對應的鎖,再去獲取鍋子對應的鎖;那麼由於鍋鏟對應的鎖被朋友 A 首先持有,那麼朋友 B 就只能等待 A 做好菜之後,才能一展身手。這樣一來,死鎖的問題就解決了。
然而,這樣的建議並不能解決所有問題。所謂「固定順序」的前提是我們能夠以某種方式定義出穩定的順序關係。然而,有時候我們無法在程式碼中定義這樣的順序。比如,假設有兩個物件,它們是同一個類的兩個例項。現在,我們希望在某個執行緒裡,交換二者的內容。顯而易見,我們應該要用對應的鎖分別保護兩個物件,避免被併發的其他執行緒破壞。然而,在此二者之間,你很難定義具體的順序。索性,C++ 標準庫提供了 std::lock()
函式,用於同時鎖住多個互斥量,並且沒有死鎖的風險。
以下示例展示瞭如何用 std::lock()
函式同時鎖住兩個互斥量。
template<typename DataType>
void swap(DataType& lhs, DataType& rhs);
template<typename DataType>
class Container {
private:
typedef std::lock_guard<std::mutex> guarded_mutex;
std::mutex mtx_;
DataType data_;
public:
friend void swap(Container<DataType>& lhs, Container<DataType>& rhs) {
if (&lhs == &rhs) { return; } // 1.
std::lock(lhs.mtx_, rhs.mtx_); // 2.
guarded_mutex g_lhs(lhs.mtx_, std::adopt_lock); // 3.
guarded_mutex g_rhs(rhs.mtx_, std::adopt_lock); // 4.
swap(lhs.data_, rhs.data_);
}
};
例子中,首先我們在 (1) 處判斷傳入的兩個容器是否不同。這是因為,對同一個 std::mutex
線上程中反覆上鎖是未定義行為。而後我們在 (2) 處使用 std::lock()
函式,同時鎖住 lhs
和 rhs
的互斥量。之後,我們在 (3) 和 (4) 處使用 RAII 容器接管已經上鎖的互斥量。主意,這裡傳入的 std::adopt_lock
表示該互斥量已經上鎖,std::lock_guard
只需要接管互斥量的所有權即可,不需要再次上鎖。在此之後,我們就可以安心地呼叫 swap
函式交換兩個 DataType
中的內容了。
當 std::lock
解決不了死鎖的時候
對於死鎖,std::lock
函式能夠保證一次性鎖住多把鎖,從而在一定程度上解決了問題。之所以說它只是在一定程度上解決問題,是因為還有很多情況,是無法使用 std::lock
的。比如,有一些情況必須要在不同的位置,分別鎖上不同的鎖。這時候,std::lock
就不適用了。此外,死鎖問題並不僅僅是發生在和互斥量相關的情形中,此時使用 std::lock
也解決不了問題——因為根本不存在鎖的問題。
對於 std::lock
解決不了的死鎖情況,想要寫出不會死鎖的程式碼,就需要靠一些規來保證了。對於這些規矩,我們簡單羅列如下。
- 避免需要獲取多個鎖的情況:從根本上避免死鎖的可能;
- 持有鎖的時候,不要呼叫不可控的使用者函式:因為你不知道使用者函式會做什麼,比如它可能會鎖上另一把鎖;
- 如果必須獲取多個鎖,那麼按順序上鎖:從而避免競爭;
- 使用層次鎖,強制要求上鎖的順序:這是在上一條規矩的基礎上衍生而來的。
奇行種,以及一些其他問題
層次鎖
首先我們看一個名為層次鎖的奇行種。
層次鎖是為了保證上鎖順序而設計出來的奇怪物種。當一個執行緒嘗試對一個層次鎖上鎖時,需要檢查當前已經上鎖的鎖的層次,從而保證當前嘗試上鎖的層次低於已經上鎖的層次。以下是對層次鎖的一個簡單實現。
class hierarchial_mutex {
private:
typedef size_t level;
typedef std::lock_guard<std::mutex> guarded_mutex;
static constexpr
level max_level = std::numeric_limits<std::size_t>::max();
private:
std::mutex mtx_;
const level mtx_level_ = 0;
level prev_mtx_level_ = 0;
static thread_local
level thread_mtx_level;
inline void check_level() {
if (not(mtx_level_ < thread_mtx_level)) {
throw std::logic_error("mutex hierarchy violated!");
}
return;
}
inline void update_level() {
prev_mtx_level_ = thread_mtx_level;
thread_mtx_level = mtx_level_;
return;
}
inline void recover_level() {
thread_mtx_level = prev_mtx_level_;
return;
}
hierarchial_mutex(const hierarchial_mutex&) = delete;
hierarchial_mutex& operator=(const hierarchial_mutex&) = delete;
public:
hierarchial_mutex() :
mtx_level_{0}, prev_mtx_level_{0} {}
hierarchial_mutex(const level& value) :
mtx_level_{value}, prev_mtx_level_{0} {}
void lock() {
check_level();
mtx_.lock();
update_level();
}
void unlock() {
recover_level();
mtx_.unlock();
}
bool try_lock() {
check_level();
if (not(mtx_.try_lock())) {
return false;
} else {
update_level();
return true;
}
}
};
thread_local hierarchial_mutex::level
hierarchial_mutex::thread_mtx_level{hierarchial_mutex::max_level};
在介面上,hierarchial_mutex
基本上完整地實現了 std::mutex
的介面。不同的是,首先,每個 hierarchial_mutex
都有一個自己的等級 mtx_level_
;在 lock()
, unlock()
和 try_lock()
時,hierarchial_mutex
需要對 thread_mtx_level
以及 prev_mtx_level_
進行維護。其中,thread_mtx_level
是 static thread_local
的,這意味著,同一個執行緒的不同 hierarchial_mutex
公用一個 thread_mtx_level
,而不同執行緒之間則是不同的 thread_mtx_level
。
允許額外上鎖的 RAII 容器:std::unique_lock
前文提到了 RAII 容器 std::lock_guard
。它會在構造時對傳入的互斥量上鎖(如果沒有 std::adopt_lock
標誌的話),並在銷燬時解鎖。然而,std::lock_guard
例項沒有 lock()
, unlock()
以及 try_lock()
函式,因此一旦鎖上,就必須等待例項銷燬才能解鎖互斥量。若是在鎖住互斥量的過程中,有一些不必上鎖但特別耗時的外部 I/O 操作,那麼 std::lock_guard
的這一特性就會降低併發效率。
std::unique_lock
和 std::lock_guard
一樣,都是對互斥量的 RAII 容器。不同的是,std::unique_lock
提供了 lock()
, unlock()
和 try_lock()
函式,能夠通過 RAII 容器鎖住/解鎖內部的互斥量。除此之外,std::unique_lock
還能保證在銷燬時正確解鎖內部的互斥量。
之前的 swap
示例,若使用 std::unique_lock
則應是如下光景。
template<typename DataType>
void swap(DataType& lhs, DataType& rhs);
template<typename DataType>
class Container {
private:
typedef std::unique_lock<std::mutex> guarded_mutex;
std::mutex mtx_;
DataType data_;
public:
friend void swap(Container<DataType>& lhs, Container<DataType>& rhs) {
if (&lhs == &rhs) { return; }
guarded_mutex g_lhs(lhs.mtx_, std::defer_lock); // 1.
guarded_mutex g_rhs(rhs.mtx_, std::defer_lock); // 2.
std::lock(lhs.mtx_, rhs.mtx_);
swap(lhs.data_, rhs.data_);
}
};
此處,我們在 (1) 和 (2) 中提供了對互斥量的封裝,並宣告 std::defer_lock
,以在後面使用 std::lock
一次性鎖住兩個互斥量。當然,在這個例子中,std::unique_lock
相對 std::lock_guard
的優勢並沒有體現出來,僅只是一個示例。
鎖的粒度
多執行緒提高執行效率的根源,在於多個執行緒可以同時執行不同的指令。然而,鎖的存在會破壞這一特性。當多個執行緒同時嘗試訪問被互斥量保護的資料時,除了成功獲取鎖的執行緒,其它執行緒都被阻塞住,等待鎖被釋放。當然,為了執行緒安全,這種阻塞是不可避免的;然而,另一方面,過多的阻塞,必然降低併發效率,「吃掉」併發帶來的效能提升。
這樣一來,使用合適的粒度,減少不必要的等待就顯得很有必要了。一般來說,鎖的粒度可以定義為:被鎖保護的資料的量在時間上的累積。
鎖的粒度=def被保護的資料量×因持有鎖而阻塞其他執行緒的時間.
顯而易見,如果一個鎖保護的資料量很大,那麼其它執行緒獲取相應的鎖的次數就會相應增加;另一方面,如果某個執行緒長時間持有鎖,那麼其他執行緒因此阻塞等待的時間就會很長。因此,在保證執行緒安全的情況下,我們應該儘可能降低鎖的粒度。
為了減小鎖的粒度,一方面我們可以減少鎖保護的資料量,另一方面則可以降低執行緒持有鎖的時間。前者需要具體問題具體分析地進精打細算;後者則相對容易分析。
對於資料結構的操作,大體可以分為以下三個步驟:
- 讀取資料(可能是其中一部分);
- 處理資料;
- 回寫處理結果。
通常來說,對資料進行處理,這件事情本身不會破壞資料結構的不變數,因而不用加鎖;而讀取和寫入資料是需要用鎖保護的。因此,如果粗獷地用鎖將上述整個過程保護起來,而處理資料的時間很長(例如有網路 I/O),那麼這樣無疑效率是很低的。因此,我們可以考慮在處理資料的過程中,釋放鎖;而僅用鎖保護對資料的讀取和回寫過程。
void get_process_write() {
std::unique_lock<std::mutex> lk(a_mutex); // 1.
Data chunk = get_data_chunk();
lk.unlock(); // 2.
Result res = process_data_chunk(chunk);
lk.lock(); // 3.
write_back(chunk, res);
}
程式碼簡單展現了應用 std::unique_lock
管理互斥量 (1) 的過程。在讀操作完成之後,解鎖互斥量 (2),而在寫操作之前,再次鎖住互斥量 (3)。這樣一來,在 process_data_chunk
的過程中,當前執行緒並不持有鎖,因而降低了鎖的粒度。
讀寫鎖與 std::shared_mutex
和 std::shared_lock
在進一步探討鎖的粒度之前,我們回顧一下在介紹競爭狀態的時候,我們講到,執行緒安全需要解決的本質問題,是保證不變數被破壞的中間狀態,資料結構僅只對修改它的那個執行緒可見。從此出發,我們不難理解以下兩個推論。
- 因為僅僅「讀取」資料不會破壞不變數,所以多個執行緒同時讀取某個資料結構是安全的。
- 但是,另一方面,如果有一個執行緒嘗試對資料進行修改,那麼若有其他執行緒在訪問該資料結構(不論讀寫),都可能是不安全的。
這樣一來,不難發現,對於「讀」和「寫」兩類操作,資料結構所需的「保護」,其程度是不一樣的。
- 若一個執行緒僅只是讀取一個數據結構,那麼只需保證沒有其他執行緒同時寫入即可,但其它執行緒對資料結構的讀操作是安全的。
- 若一個執行緒嘗試修改一個數據結構(寫操作),那麼其它執行緒對該資料結構的讀寫操作都是不安全的,因而應該被禁止。
對一個頻繁進行寫操作的資料結構來說,按照讀寫操作,區分保護程度意義不大。這是因為,區分兩種程度的保護,必然帶來額外的開銷。而若是某個資料結構的讀操作的頻率遠遠大於寫操作,那麼進行這樣的區分,從而降低鎖的粒度,收益就很客觀了。
為此,我們引入 std::shared_mutex
的概念。除了和一般的 std::mutex
一樣提供 lock()
, try_lock()
和 unlock()
之外,std::shared_mutex
還提供了 lock_shared()
, try_lock_shared()
和 unlock_shared()
三個操作。
阻塞情況表 | 自由 | 被共享鎖住 | 被獨佔鎖住 |
---|---|---|---|
lock() |
以獨佔方式鎖住 | 阻塞 | 阻塞 |
lock_shared() |
以共享方式鎖住 | 以共享方式鎖住 | 阻塞 |
std::shared_mutex
直到 C++17 才被引入。若你的編譯器不支援 C++17,請升級你的編譯器,或者使用boost::shared_mutex
代替。
於是,我們可以使用 std::shared_mutex
保護頻繁讀取而甚少寫入的資料結構,並在讀取時使用 lock_shared()
鎖住互斥量,而在寫入時使用 lock()
鎖住互斥量。
與 std::unique_lock
對應,標準庫也提供了 std::shared_lock
容器。它會在構造時,嘗試以 lock_shared()
鎖住傳入的共享互斥量,並在銷燬時,確保以 unlock_shared()
的方式釋放共享互斥量。同時,std::shared_lock
也提供了 lock()
和 unlock()
介面,用於以共享的方式鎖住或者解鎖構造時關聯的共享互斥量。
std::shared_lock
直到 C++14 才被引入。若你的編譯器不支援 C++14,請升級你的編譯器,或者使用boost::shared_lock
代替。
讀寫鎖的一個典型應用場景是執行緒共享的資料快取。對於快取來說,存在於快取內的條目(entry)通常會被頻繁讀取,而寫操作則相對來說低頻很多。比如,DNS 伺服器上的快取就是這樣的情況。DNS 解析記錄一般來說是非常穩定的——頻繁更換解析結果的域名總是少數。這裡以 DNS 快取作為讀寫鎖的簡單示例。
#include <unordered_map>
#include <string>
#include <mutex>
#include <shared_mutex>
class DNSEntry;
class DNSCache {
public:
typedef std::unordered_map<std::string, DNSEntry> RecMap;
private:
RecMap entries_;
mutable std::shared_mutex entry_mutex_; // 1.
public:
DNDEntry find_entry(const std::string& domain) const {
std::shared_lock<std::shared_mutex> slk(entry_mutex_); // 2.
const RecMap::const_iterator target = entries_.find(domain);
const bool found = (target != entries_.end());
return found ? target->second : DNSEntry();
}
void update_one_entry(const std::string& domain, const DNSEntry& entry) {
std::unique_lock<std::shared_mutex> ulk(entry_mutex_); // 3.
entries_[domain] = entry;
return;
}
};
此處 (1) 為保護 RecMap
引入了一個共享互斥量,它是 mutable
的,因而允許在 const
成員函式中做修改。(2) 通過 std::shared_lock
容器,以共享的方式鎖住互斥量,保證讀操作的穩定;(3) 則通過 std::unique_lock
容器,以獨佔的方式鎖住互斥量,保證寫操作的安全。
保護資料的初始化過程
我們在前作中,介紹了一個 GetInstance
函式,其實就是單例模式。通常,這種用法適用於構造過程開銷很大,而使用過程本身是執行緒安全的情況(比如連線資料庫的過程)。線上程中使用 GetInstance
函式獲取資料的指標,而不是在程序啟動時構造資料,可以加快程式的啟動速度,減少總體的等待時間。為了避免額外的獲取鎖的操作,前作首先使用了兩次指標檢查的方式。
volatile T* pInst = nullptr;
T* GetInstance() {
if (nullptr == pInst) {
lock();
if (nullptr == pInst) {
pInst = new T;
}
unlock();
}
return pInst;
}
第二次檢查是為了防止第一次檢查和lock之間的時間空隙導致pInst被改變,從而不再是nullptr。
然而,由於 CPU 的動態排程,這樣的程式碼可能引發嚴重的問題。於是,前作引入了基於作業系統架構的解決方案。
#define barrier() __asm__ volatile("mfence")
volatile T* pInst = nullptr;
T* GetInstance() {
if (nullptr == pInst) {
lock();
if (nullptr == pInst) {
T* temp = new T;
barrier();
pInst = temp;
}
unlock();
}
return pInst;
}
然而,mfence
是 i386 架構特有的指令。因此,這份程式碼在別的架構上無法正確執行。為了保證通用性,C++11 引入了 std::once_flag
和 std::call_once
解決這類問題。
#include <mutex>
volatile T* pInst = nullptr;
std::once_flag flag_T; // 1.
void ConstructInstance() { // 2.
pInst = new T;
}
T* GetInstance() {
std::call_once(flag_T, ConstructInstance); // 3.
return pInst;
}
此處,(1) 初始化了一個對於型別 T
的哨兵變數,用於標記 T
型別的例項是否已經初始化。(2) 則是對例項初始化的封裝。在 (3) 處,我們使用 std::call_once
確保 ConstructInstance
有且只有一次呼叫,從而返回正確的例項物件的指標。
鎖解決不了的競爭狀態
介面固有的競爭狀態
使用鎖保護資料,通過阻塞其它執行緒的方式,可以避免一些競爭狀態。因此,在一些資料結構中,使用鎖可以保證資料結構對外的幾個介面互相之間是執行緒安全的。然而,這並不意味著在外部呼叫這些介面就一定是執行緒安全的。事實上,這些介面本身可能存在固有的競爭狀態,因而在其內部使用鎖保護資料不能完全解決問題。
舉例來說,對於一個標準的棧,除去其建構函式和交換函式 swap()
,還有五個介面:
push()
: 將新元素壓棧;pop()
: 彈出棧頂元素;top()
: 返回棧頂元素;empty()
: 判斷棧是否為空;size()
: 返回棧的大小。
這五個介面中,隱含了兩類固有的競爭狀態。
第一類:通過 empty()
或 size()
判斷棧狀態,而後對棧做其他操作。這是競爭狀態的原因在於,在多執行緒環境中,empty()
和 size()
的返回值是不可信的。比如,在 A 執行緒呼叫 empty()
並返回 false
之後,B 執行緒可能緊接著清空了整個棧,而後 A 執行緒基於上述 false
判斷呼叫 top()
函式就會產生不符合預期的結果。
第二類:首先通過 top()
獲取棧頂元素,而後通過 pop()
彈棧該元素。之所以這也是競爭狀態,是因為在 top()
和 pop()
之間,其它執行緒可能進行額外的 push()
或者 pop()
操作,於是當前執行緒彈出的元素不一定是通過 top()
獲取的那個元素。
不難發現,因為這兩類固有的競爭狀態,不論棧的內部如何實現,外部使用棧時,都可能有執行緒不安全的情況。
top()
和 pop()
分離的原因
上述兩類固有的競爭狀態源自棧的介面設計。對於第一類競爭狀態來說,如此設計似乎情有可原。但是,為什麼要將 top()
和 pop()
分離開呢?
簡單來說,top()
將棧頂元素返回給呼叫者的過程意味著存在一次元素的拷貝。如果棧頂元素體積很大,比如是一個非常長的 std::vector<int>
,那麼在拷貝的過程中,可能因為系統負載相對資源過高,而丟擲 std::bad_alloc
異常。對於現有的實現來說,即使丟擲異常,棧內的元素還是完整的。但若是將 pop()
實現在彈棧之後將被彈棧的棧頂元素返回給呼叫者,則在上述異常可能發生在棧已經被修改之後。若是前一種情況(即當前的實現),呼叫者在收到異常時,可以嘗試進行一些處理;但是,在後一種情況下,即使呼叫者嘗試做了一些記憶體清理工作,棧中的目標元素也已經被銷燬了。
修改介面,實現執行緒安全
以下是一個執行緒安全的棧的簡單實現,分析後附。
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack : public std::exception { // 1.
const char* what() const throw();
};
template<typename T>
class threadsafe_stack {
private:
std::stack<T> stack_; // 2.
mutable std::mutex mtx_; // 3.
public:
threadsafe_stack() {}
explicit threadsafe_stack(const threadsafe_stack& source) {
std::lock_guard<std::mutex> slk(source.mtx_);
stack_ = source.stack_;
}
threadsafe_stack& operator=(const threadsafe_stack& source) {
std::lock_guard<std::mutex> slk(source.mtx_);
stack_ = source.stack_;
}
bool empty() const { // 4.
std::lock_guard<std::mutex> slk(mtx_);
return stack_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> slk(mtx_);
return stack_.size();
}
void push(T element) {
std::lock_guard<std::mutex> ulk(mtx_);
stack_.push(element);
}
std::shared_ptr<T> pop() { // 5.
std::lock_guard<std::mutex> ulk(mtx_);
if (stack_.empty()) {
throw empty_stack();
}
std::shared_ptr res{std::make_shared<T>(stack_.top())};
stack_.pop();
return res;
}
void pop(T& ref_holder) { // 6.
std::lock_guard<std::mutex> ulk(mtx_);
if (stack_.empty()) {
throw empty_stack();
}
ref_holder = stack_.top();
stack_.pop();
}
};
此處我們實現了名為 threadsafe_stack
的模板類。其中,顯而易見,我們的執行緒安全棧的實現是基於標準庫中的棧的 (2),並且為了實現執行緒安全,我們使用了一個互斥量來保護棧物件 (3)。此外,儘管存在一些只讀的公開成員函式 (4);但是,考慮到實際使用中,大量的棧操作都是寫操作,因此 (3) 沒有使用讀寫鎖。
為了解決第一類固有競爭狀態,我們首先在 (1) 處定義了空棧異常——我們讓對空棧進行的 pop()
操作 (5, 6) 丟擲空棧異常。如此,在呼叫處使用 try ... catch ...
語句塊,就能實現預期的行為,同時避免介面競爭。
為了避免第二類固有競爭狀態,我們取消了 top()
函式,而將它的功能合併入 pop()
函式。同時,為了避免在丟擲 std::bad_alloc
時元素已彈棧導致的資料丟失的問題,我們在內部棧物件彈棧之前,嘗試將目標元素拷貝 (5) 或賦值 (6) 到其它地方。最後,我們返回拷貝的結果的指標 (5) 或引用 (6) 傳給呼叫者。如此,就避免了第二類競爭。
小結
如我們在前作最後提到的,「執行緒安全」是一個燙手山芋,不存在放之四海而皆準的解決方案(所謂「沒有銀彈」)。因此,為了寫出執行緒安全的程式碼,我們必須在理解問題之起因的基礎上,具體問題具體分析。
為此,此篇從「不變數」開始,引出線上程中共享資料的「競爭狀態」——執行緒安全問題的根源。而後就如何解決問題展開了一系列的討論。首先,我們介紹瞭如何使用標準庫提供的「鎖」來保護共享資料結構,並介紹了和鎖相關的一些話題(如死鎖問題、鎖的粒度等)。而後,我們通過實現執行緒安全的棧,討論了鎖無法解決問題時,應當怎麼辦。
此篇無法窮盡所有和執行緒安全、鎖、死鎖相關的話題和技術。但是,建立在理解的基礎上,讀者應該能對執行緒安全有直觀的認識。我想這應該是有益的。