1. 程式人生 > 其它 >程式設計師的自我修養(六):保護執行緒間的共享資料 轉載

程式設計師的自我修養(六):保護執行緒間的共享資料 轉載

程式設計師的自我修養(六):保護執行緒間的共享資料

多程序和多執行緒最本質的區別在於共享和隔離的程度不同。對於多程序方式來說,因為隔離程度高,所以程式設計師很少需要去擔心程序空間的資料被破壞;但是併發任務之間共享資料就變得很困難了。對於多執行緒方式來說,因為隔離程度低,所以共享資料非常容易;但是,相應地,程式設計師需要更多地考慮如何線上程之間安全地共享資料。這就引出了所謂的「執行緒安全」問題。

此篇,我們討論如何線上程之間安全地共享資料。

線上程間共享資料的問題

引子

讓我們先來看一個有味道的例子。

假設你邀請朋友到家裡來派對。這個派對有些特殊,需要各自準備好食材,而後烹飪成美味——大家比比手藝。朋友 A 準備做辣椒炒肉,而朋友 B 決定用韭菜炒蛋一展身手——他們都已經備好了菜等待下鍋。

不幸的是,你家只有一口鍋,但是朋友 A 和 B 兩不相讓。他們在幾乎同一時間,分別把自己手頭備好的菜下了油鍋:

  • 朋友 A 把切好的辣椒下了油鍋——嘶~
  • 朋友 B 把打好的蛋液下了油鍋——譁~

於是,廚房裡傳出了辣椒炒蛋的香味……

在這個例子裡,最終朋友 A 和朋友 B 合作了一盤「辣椒炒蛋」。雖然這不是他們預期的結果,但好歹沒有引起災難性的後果。不過,這種「好結果」並不是每次都能發生;更多的時候,可能會做出一堆奇奇怪怪的黑暗料理。

這件事情和我們今天要討論的問題很是相像,我們看看下面這張表。

美食派對 執行緒間資料共享
廚房 程序空間
油鍋 可線上程間共享的資料
朋友 A 和朋友 B 兩個執行緒

對於沒事派對中的這個小插曲,顯而易見地,作為主人需要定下一些規矩。對於這個問題,主人需要規定:別人使用完畢之前,其他人不能用鍋子(當然也包括鍋鏟、煤氣灶之類的)。線上程間共享資料也是如此。程式設計師在設計多執行緒併發程式時,需要協調好多個執行緒對資料的操作:應當在何時,如何操作資料,並與其它執行緒進行通訊。

不變數

在具體討論之前,我們先了解一下「不變數」這個概念。

所謂不變數,指的是對於某個資料結構,當其合法可用時,總是成立的一個命題。在程式設計時引入不變數,可以幫助程式設計師正確地處理資料。為了更好地理解不變數,我們來看一下下面這個簡單的例子。

int sum{0};
constexpr size_t times{100};
// invariant: `i` is the number of integer that we have added to `sum`.
for (size_t i{0}; i != times; ++i) {
    sum += i + 1;
}
// some use of `sum`

這是一個非常簡單的例子,此處僅用於說明不變數。

在註釋中,我已經註明了此處用到的不變數:i 是已經累加的次數。對於 i 來說,它在迴圈開始前、每次迴圈迭代之後、迴圈結束的瞬間,都應該是合法可用的。因此,作為不變數,應該在這三種情形下都成立。若不然,則說明迴圈寫錯了。

  • 開始前:在迴圈開始前,我們尚未進行過累加,因此 i == 0 成立;
  • 迭代過後:每迭代一次,++i 保證了不變數成立;
  • 迴圈結束:由於 i0 開始自增,直到第一次滿足 i != times 的條件終止迴圈,因此在迴圈終止時,i == times 成立。

經過這樣的驗證,我們應當很有信心地說:我的程式碼是正確的。

注 1:應當寫明顯沒有錯誤的程式碼,比如這個例子。
注 2:在這個例子中,之所以將迴圈條件記為 i != times 而不是 i < times,正是因為有不變數的存在。若是記作 i < times,那麼迴圈中止時,應有 i >= times;對應到不變數上,就是「至今為止,已經累加了不小於 times 次」,而這與程式碼的意圖是不一致的。故而,對 C++ 程式設計師來說,建議寫作 i != times 而不是 i < times
注 3:此外,C++ 對 STL 容器引入了「迭代器」的概念。迭代器過載了 ==!= 等符號,但並不一定過載了了 < 號。事實上,對兩個迭代器進行「大小比較」是沒有意義的。為了保證一致性,從迭代器的角度說,也不建議 C++ 程式設計師將此類迴圈條件寫作 i < times

回過頭,再來看一下這個簡短的例子。我們不難發現,在執行完 sum += i + 1 的一瞬間,事實上不變數並不成立。比如,當 i == 30 時,我們執行完畢上述賦值表示式時,已經進行了 31 次累加。這告訴我們:在對操作資料時,有可能會破壞不變數。這樣一來,迴圈迭代後,對 i 的自增操作,就可以理解為是「維護不變數」了。這件事情告訴我們:不變數不成立,預示著資料不可用(通常表示資料操作進行到一半);操作資料後,需要維護不變數。

我們再來看一個稍微複雜一些的例子。

// invariant: suppose we have two nodes `A` and `B`,
// then if `A->next_ == B`, then we must have `B->perv_ == A`.
template<typename DataType>
class Node {
 private:
    Node* prev_ = nullptr;
    Node* next_ = nullptr;
    DataType data;
 public:
    // ...
    void Delete() {
        auto prev = this->prev_;
        auto next = this->next_;
        if (nullptr != prev) {
            prev->next_ = next; // 1.
        }
        if (nullptr != next) {
            next->prev_ = perv; // 2.
        }
        this->Destroy();
        return;
    }
};

這是一個雙鏈表中結點的簡單示例。同樣,我在程式碼註釋中已經標明瞭不變數,即在雙鏈表可用時,對兩個合法結點來說,應當有 A->next_->perv_ == A。然而,在刪除結點時(更新資料結構時),不變數會被破壞。比如,當 (1) 已經執行完畢,但 (2) 尚未執行時,上述不變數就是不成立的。

假設現在有兩個執行緒,在對同一個雙鏈表進行操作。其中之一嘗試從前向後遍歷雙鏈表;另一則在刪除其中某一個結點。我們之前說,刪除結點會臨時破壞不變數,而不變數被破壞則表示資料結構是不可用的。那麼正在執行刪除操作的執行緒,就可能導致正在遍歷的執行緒出錯甚至崩潰。

執行緒 1 執行緒 2 註釋
if (nullptr != curr->next()) 此時檢查成功
prev->next_ = next next 有可能是 nullptr
注意此時不變數被破壞了
work = curr->next() 現在 work 可能是 nullptr
data = work->data() 解引用 nullptr,段錯誤

上表展現了兩個執行緒同時操作一個雙鏈表時,可能出現的情況。在這種情況下,執行緒 1 可能因為解引用空指標,而引發段錯誤,導致整個程序崩潰。

競爭狀態

我們再看一個有小情緒的例子。

對於國人來說,「春運」總是很頭疼的問題。在春運期間,火車票總是不夠的。基本上,任何一次買票行為,其結果(能不能買到火車票),都取決於你下手的時機——是否足夠快。對於你來說,同樣是「點選滑鼠確認」這個動作,其結果實際上是不確定的。具體是何種結果,取決於你的行為與其他人行為的相對順序。

在計算機世界中,我們把結果取決於多個執行緒執行指令的相對順序的情形,稱為「競爭狀態」。若是競爭狀態發生在多個執行緒對同一個資料結構的修改上,則稱其為「資料競爭」。因為競爭狀態的結果是不確定的,所以資料競爭可能導致未定義行為(Undefined Behavior, 縮寫 UB,讀作「有病」)。

結合不變數的概念,不難理解。如果一個行為可能在其中間狀態破壞不變數,則由此行為引發的競爭狀態,可能導致災難後果。而若要破壞不變數,通常來說意味著該操作需要更新多個變數,而這些更新操作無法在單個指令完成——因而有被打斷的可能。由資料競爭導致的問題通常難以排查。這是因為,儘管這些操作可能被打斷,但並不是每次都會打斷。通常來說,只有當系統負載很高,CPU 需要頻繁地切換上下文時,資料競爭的可能性才會增大。因此,排查由資料競爭導致的問題,一般來說是非常困難的。

避免惡性資料競爭

既然資料競爭通常可能會招致 UB,那麼我們就要想辦法避免它。通常來說,避免惡性資料競爭有幾個思路。

  • 保護資料結構,確保在資料結構更新過程中,其不變數被破壞的中間狀態只有一個執行緒能夠看到。
  • 修改資料結構的實現,確保任何對資料結構的更新,在外界看來不變數都是成立的。
  • 將所有對資料結構的修改,都交給第三方序列執行。

對於第三種方案,以之前購買火車票的例子來說,可以實現為:當你點選確認按鈕後,購票網站將你的請求傳送給伺服器,伺服器將請求加入一個佇列,並返回一個狀態。例如:「當車次剩餘車票 1000 張,在您之前有 800 個尚未處理的請求」。這樣一來,通常來說,購票行為的結果就是確定的了。

然而,C++ 標準庫並沒有實現上述第三種方案。故此,此處我們不做更深入的討論。

使用互斥量保護資料

保護資料的最基本方法是使用所謂的「互斥量」(mutual exclusion, mutex)。

互斥量的基本邏輯是這樣的。

  • 訪問某個資料結構時,首先檢查互斥量。
    • 若互斥量被鎖住,則等待,直到互斥量解鎖。
    • 若互斥量沒有被鎖住,則鎖住互斥量,而後更新資料,再解鎖。

聽起來是個挺美好的事情,若能實現,那麼資料競爭就能被解決。然而,所謂「沒有銀彈」,並不是說引入互斥量就能完美解決所有問題。

  • 首先,互斥量起作用是有前提的。
    • 所有可能引發資料競爭的資料結構都被保護起來了;
    • 所有可能引發資料競爭的操作,都正確地使用了互斥量。
  • 其次,互斥量可能引發所謂的「死鎖」問題。
  • 最後,若互斥量過多或過少地保護了資料,都可能出現問題。

在 C++ 中使用互斥量

在 C++ 中,標準庫提供的互斥量是 std::mutex,它被定義在 mutex 這個標頭檔案中。

互斥量是「鎖」的一種,按照我們在 C++ 的幾個基本原理和技術中的介紹,鎖也是一種資源。因此為了保證資源被正確釋放(正確使用互斥量的條件之一),我們最好是用 RAII 技術將其包裝起來。C++ 標準庫直接提供了這樣的封裝,名為 std::lock_guard,它也定義在 mutex 這個標頭檔案當中。

我們來看一個簡單的例子。

#include <list>
#include <mutex>
#include <algorithm>

typedef std::lock_guard<std::mutex> guarded_mutex;

std::list<int> data_list;               // 1.
std::mutex data_mutex;                  // 2.

void add_to_list(int new_value) {
    guarded_mutex guard{data_mutex};    // 3.
    data_list.push_back(new_value);
}

bool list_contains(int value_to_find) {
    guarded_mutex guard{data_mutex};    // 4.
    return (data_list.end() != std::find(data_list.begin(), data_list.end(), value_to_find));
}

例中,(1) 和 (2) 分別定義了全域性變數。此處,我們意圖用 (2) 定義的全域性互斥量保護對 (1) 定義的連結串列進行保護。(3) 和 (4) 通過 RAII 容器 std::lock_guard 在構造時,對傳入的互斥量 data_mutex 上鎖,並在 guard 銷燬時自動解鎖。

在這個例子中,若是 add_to_listlist_contains 分別在不同執行緒中執行,則他們都會嘗試鎖上 data_mutex 這個互斥量。顯而易見,同一時刻,只能有一個函式能成功鎖上它;於是該函式正常執行,而另外一個函式則會陷入等待。這樣一來,data_list 更新期間,不變數被破壞的中間狀態,就只有修改它的執行緒能看到。而對於其他執行緒來說,data_list 要麼沒有被修改,要麼已經修改完成,因而總是可用的。

在這個簡單的例子裡,被保護的資料和互斥量都是全域性變數。顯而易見,這是不好的。首先,使用全域性變數,意味著任何函式都有可能修改它。其次,除了 data_listdata_mutex 的定義連在一起,它們之間在程式碼上沒有其他的聯絡。因此,很可能出現程式設計師使用 data_mutex 來保護其他資料;或者使用其他互斥量來保護 data_list 的現象。這樣一來,保護就不完整了。因此,在實際使用中,通常我們會選擇將 data_listdata_mutex 封裝在同一個類當中。

限制被保護資料的使用範圍

上一節,我們瞭解瞭如何使用互斥量保護資料。此外,談到了「正確使用」的要求之一:鎖上互斥量之後必須解鎖(否則其他執行緒永遠無法上鎖,就可能陷入無休止的等待)。這一節討論正確使用的另一個要求:必須限制被保護資料的使用範圍。簡單來說,就是不要將被保護資料的指標或引用通過返回值、函式引數的方式,傳到無法控制的範圍內。這個約定,是基於一個簡單的假設:你無法保證在你無法控制的範圍內,其他程式設計師是否按照約定使用互斥量保護這份資料。

我們看一個不好的例子。

template<typename DataType>
class Container {
 private:
    typedef std::lock_guard<std::mutex> guarded_mutex;
    std::mutex mtx_;                    // 1.
    DataType* data_;                    // 2.
 public:
    // ...
    template<typename FuncType>
    void process_data(FuncType func) {  // 3.
        guarded_mutex l(this->mtx_);
        func(this->data);               // 4.
        return;
    }
};

std::list* unprotected;

void malicious_function(std::list* protected_data) {
    unprotected = protected_data;       // 5.
    return;
}

Container<std::list> ctn;

void foo () {
    ctn.process_data(malicious_function);
    unprotected->clear();               // 6.
}

在這個例子當中,Container 使用 (1) mtx_ 保護 (2) data_。但需要注意的是,(3) 接收的函式 func 是一個外部函式。由於在寫 Container 這段程式碼時,你無法預知以後的使用者,會傳遞何種 func 進來。所以對於 Container 的作者來說,func 是不可控的。但是,(4) 將內部被保護的資料的指標,作為引數傳遞給了 func。接下來的事情就變得糟糕了。首先我們定義了一個惡意函式 malicious_function,在其中 (5) 將傳遞進來的指標賦值給了一個沒有任何保護的全域性指標變數 unprotected。而最後,在沒有任何保護的情況下,(6) 清空了整個連結串列。這個操作是非常危險的。

因此,在使用互斥量保護資料的時候,需要注意:

  • 不能將被保護資料的指標或引用以函式返回值的形式,返回給外部不可控的呼叫者;
  • 不能將被保護資料的指標或引用以函式引數的形式,傳遞給外部不可控的呼叫者。

死鎖及其解法

回到我們最開始有味道的例子。

假設現在你給鍋、鏟都加上了互斥量。這樣一來,你希望朋友 A 和朋友 B 不會在同一時間共用鍋鏟,避免未定義的行為。然而,新的問題又來了。

朋友 A 朋友 B 註釋
給鍋鏟上鎖
給鍋子上鎖
開始使用鍋子
開始使用鍋鏟
嘗試獲取鍋子的鎖 朋友 A 開始等待
嘗試獲取鍋鏟的鎖 朋友 B 開始等待

現在,因為有鎖的保護,所以朋友 A 和朋友 B 不會再同時使用鍋或鏟了,避免了「辣椒炒蛋」的鬧劇。但是,現在朋友 A 和朋友 B 來到你的面前,向你哭訴:「我的心在等待,永遠在等待」。

朋友 A 期待使用鍋子,然而因為鍋子對應的鎖被朋友 B 鎖上,所以朋友 A 不得不等待朋友 B 使用完畢之後才行;另一方面,朋友 B 需要使用鍋鏟,對應的鎖卻被朋友 A 鎖上了。這樣一來,由於朋友 A 和朋友 B 互相等待,但各自又什麼都做不了。於是兩人只能大眼瞪小眼,永遠「耗下去」。

在併發程式設計領域中,我們把這種現象稱之為「死鎖」。對於由「鎖」引起的「死鎖」,它有幾個特點:

  • 完成一個任務,需要獲取多把鎖;
  • 存在資料競爭;
  • 各自持有一部分資料對應的鎖,互相等待,永不釋放。

為了解決死鎖問題,前輩們曾經提出了很多方案。其中最基本的一個方案是說:在操作需要獲取多把鎖時,總是以固定的順序獲取這些鎖。比如,在我們的例子裡,如果要求必須先獲取鍋鏟對應的鎖,再去獲取鍋子對應的鎖;那麼由於鍋鏟對應的鎖被朋友 A 首先持有,那麼朋友 B 就只能等待 A 做好菜之後,才能一展身手。這樣一來,死鎖的問題就解決了。

然而,這樣的建議並不能解決所有問題。所謂「固定順序」的前提是我們能夠以某種方式定義出穩定的順序關係。然而,有時候我們無法在程式碼中定義這樣的順序。比如,假設有兩個物件,它們是同一個類的兩個例項。現在,我們希望在某個執行緒裡,交換二者的內容。顯而易見,我們應該要用對應的鎖分別保護兩個物件,避免被併發的其他執行緒破壞。然而,在此二者之間,你很難定義具體的順序。索性,C++ 標準庫提供了 std::lock() 函式,用於同時鎖住多個互斥量,並且沒有死鎖的風險。

以下示例展示瞭如何用 std::lock() 函式同時鎖住兩個互斥量。

template<typename DataType>
void swap(DataType& lhs, DataType& rhs);

template<typename DataType>
class Container {
 private:
    typedef std::lock_guard<std::mutex> guarded_mutex;
    std::mutex mtx_;
    DataType data_;
 public:
    friend void swap(Container<DataType>& lhs, Container<DataType>& rhs) {
        if (&lhs == &rhs) { return; }                   // 1.
        std::lock(lhs.mtx_, rhs.mtx_);                  // 2.
        guarded_mutex g_lhs(lhs.mtx_, std::adopt_lock); // 3.
        guarded_mutex g_rhs(rhs.mtx_, std::adopt_lock); // 4.
        swap(lhs.data_, rhs.data_);
    }
};

例子中,首先我們在 (1) 處判斷傳入的兩個容器是否不同。這是因為,對同一個 std::mutex 線上程中反覆上鎖是未定義行為。而後我們在 (2) 處使用 std::lock() 函式,同時鎖住 lhsrhs 的互斥量。之後,我們在 (3) 和 (4) 處使用 RAII 容器接管已經上鎖的互斥量。主意,這裡傳入的 std::adopt_lock 表示該互斥量已經上鎖,std::lock_guard 只需要接管互斥量的所有權即可,不需要再次上鎖。在此之後,我們就可以安心地呼叫 swap 函式交換兩個 DataType 中的內容了。

std::lock 解決不了死鎖的時候

對於死鎖,std::lock 函式能夠保證一次性鎖住多把鎖,從而在一定程度上解決了問題。之所以說它只是在一定程度上解決問題,是因為還有很多情況,是無法使用 std::lock 的。比如,有一些情況必須要在不同的位置,分別鎖上不同的鎖。這時候,std::lock 就不適用了。此外,死鎖問題並不僅僅是發生在和互斥量相關的情形中,此時使用 std::lock 也解決不了問題——因為根本不存在鎖的問題。

對於 std::lock 解決不了的死鎖情況,想要寫出不會死鎖的程式碼,就需要靠一些規來保證了。對於這些規矩,我們簡單羅列如下。

  • 避免需要獲取多個鎖的情況:從根本上避免死鎖的可能;
  • 持有鎖的時候,不要呼叫不可控的使用者函式:因為你不知道使用者函式會做什麼,比如它可能會鎖上另一把鎖;
  • 如果必須獲取多個鎖,那麼按順序上鎖:從而避免競爭;
  • 使用層次鎖,強制要求上鎖的順序:這是在上一條規矩的基礎上衍生而來的。

奇行種,以及一些其他問題

層次鎖

首先我們看一個名為層次鎖的奇行種。

層次鎖是為了保證上鎖順序而設計出來的奇怪物種。當一個執行緒嘗試對一個層次鎖上鎖時,需要檢查當前已經上鎖的鎖的層次,從而保證當前嘗試上鎖的層次低於已經上鎖的層次。以下是對層次鎖的一個簡單實現。

class hierarchial_mutex {
 private:
    typedef size_t level;
    typedef std::lock_guard<std::mutex> guarded_mutex;
    static constexpr
    level max_level = std::numeric_limits<std::size_t>::max();
 private:
    std::mutex mtx_;
    const level mtx_level_  = 0;
    level prev_mtx_level_   = 0;
    static thread_local
    level thread_mtx_level;

    inline void check_level() {
        if (not(mtx_level_ < thread_mtx_level)) {
            throw std::logic_error("mutex hierarchy violated!");
        }
        return;
    }
    inline void update_level() {
        prev_mtx_level_  = thread_mtx_level;
        thread_mtx_level = mtx_level_;
        return;
    }
    inline void recover_level() {
        thread_mtx_level = prev_mtx_level_;
        return;
    }
    hierarchial_mutex(const hierarchial_mutex&) = delete;
    hierarchial_mutex& operator=(const hierarchial_mutex&) = delete;
 public:
    hierarchial_mutex() :
        mtx_level_{0}, prev_mtx_level_{0} {}
    hierarchial_mutex(const level& value) :
        mtx_level_{value}, prev_mtx_level_{0} {}
    void lock() {
        check_level();
        mtx_.lock();
        update_level();
    }
    void unlock() {
        recover_level();
        mtx_.unlock();
    }
    bool try_lock() {
        check_level();
        if (not(mtx_.try_lock())) {
            return false;
        } else {
            update_level();
            return true;
        }
    }
};
thread_local hierarchial_mutex::level
hierarchial_mutex::thread_mtx_level{hierarchial_mutex::max_level};

在介面上,hierarchial_mutex 基本上完整地實現了 std::mutex 的介面。不同的是,首先,每個 hierarchial_mutex 都有一個自己的等級 mtx_level_;在 lock(), unlock()try_lock() 時,hierarchial_mutex 需要對 thread_mtx_level 以及 prev_mtx_level_ 進行維護。其中,thread_mtx_levelstatic thread_local 的,這意味著,同一個執行緒的不同 hierarchial_mutex 公用一個 thread_mtx_level,而不同執行緒之間則是不同的 thread_mtx_level

允許額外上鎖的 RAII 容器:std::unique_lock

前文提到了 RAII 容器 std::lock_guard。它會在構造時對傳入的互斥量上鎖(如果沒有 std::adopt_lock 標誌的話),並在銷燬時解鎖。然而,std::lock_guard 例項沒有 lock(), unlock() 以及 try_lock() 函式,因此一旦鎖上,就必須等待例項銷燬才能解鎖互斥量。若是在鎖住互斥量的過程中,有一些不必上鎖但特別耗時的外部 I/O 操作,那麼 std::lock_guard 的這一特性就會降低併發效率。

std::unique_lockstd::lock_guard 一樣,都是對互斥量的 RAII 容器。不同的是,std::unique_lock 提供了 lock(), unlock()try_lock() 函式,能夠通過 RAII 容器鎖住/解鎖內部的互斥量。除此之外,std::unique_lock 還能保證在銷燬時正確解鎖內部的互斥量。

之前的 swap 示例,若使用 std::unique_lock 則應是如下光景。

template<typename DataType>
void swap(DataType& lhs, DataType& rhs);

template<typename DataType>
class Container {
 private:
    typedef std::unique_lock<std::mutex> guarded_mutex;
    std::mutex mtx_;
    DataType data_;
 public:
    friend void swap(Container<DataType>& lhs, Container<DataType>& rhs) {
        if (&lhs == &rhs) { return; }
        guarded_mutex g_lhs(lhs.mtx_, std::defer_lock); // 1.
        guarded_mutex g_rhs(rhs.mtx_, std::defer_lock); // 2.
        std::lock(lhs.mtx_, rhs.mtx_);
        swap(lhs.data_, rhs.data_);
    }
};

此處,我們在 (1) 和 (2) 中提供了對互斥量的封裝,並宣告 std::defer_lock,以在後面使用 std::lock 一次性鎖住兩個互斥量。當然,在這個例子中,std::unique_lock 相對 std::lock_guard 的優勢並沒有體現出來,僅只是一個示例。

鎖的粒度

多執行緒提高執行效率的根源,在於多個執行緒可以同時執行不同的指令。然而,鎖的存在會破壞這一特性。當多個執行緒同時嘗試訪問被互斥量保護的資料時,除了成功獲取鎖的執行緒,其它執行緒都被阻塞住,等待鎖被釋放。當然,為了執行緒安全,這種阻塞是不可避免的;然而,另一方面,過多的阻塞,必然降低併發效率,「吃掉」併發帶來的效能提升。

這樣一來,使用合適的粒度,減少不必要的等待就顯得很有必要了。一般來說,鎖的粒度可以定義為:被鎖保護的資料的量在時間上的累積。

鎖的粒度=def被保護的資料量×因持有鎖而阻塞其他執行緒的時間.

顯而易見,如果一個鎖保護的資料量很大,那麼其它執行緒獲取相應的鎖的次數就會相應增加;另一方面,如果某個執行緒長時間持有鎖,那麼其他執行緒因此阻塞等待的時間就會很長。因此,在保證執行緒安全的情況下,我們應該儘可能降低鎖的粒度。

為了減小鎖的粒度,一方面我們可以減少鎖保護的資料量,另一方面則可以降低執行緒持有鎖的時間。前者需要具體問題具體分析地進精打細算;後者則相對容易分析。

對於資料結構的操作,大體可以分為以下三個步驟:

  • 讀取資料(可能是其中一部分);
  • 處理資料;
  • 回寫處理結果。

通常來說,對資料進行處理,這件事情本身不會破壞資料結構的不變數,因而不用加鎖;而讀取和寫入資料是需要用鎖保護的。因此,如果粗獷地用鎖將上述整個過程保護起來,而處理資料的時間很長(例如有網路 I/O),那麼這樣無疑效率是很低的。因此,我們可以考慮在處理資料的過程中,釋放鎖;而僅用鎖保護對資料的讀取和回寫過程。

void get_process_write() {
    std::unique_lock<std::mutex> lk(a_mutex);   // 1.
    Data chunk = get_data_chunk();
    lk.unlock();                                // 2.
    Result res = process_data_chunk(chunk);
    lk.lock();                                  // 3.
    write_back(chunk, res);
}

程式碼簡單展現了應用 std::unique_lock 管理互斥量 (1) 的過程。在讀操作完成之後,解鎖互斥量 (2),而在寫操作之前,再次鎖住互斥量 (3)。這樣一來,在 process_data_chunk 的過程中,當前執行緒並不持有鎖,因而降低了鎖的粒度。

讀寫鎖與 std::shared_mutexstd::shared_lock

在進一步探討鎖的粒度之前,我們回顧一下在介紹競爭狀態的時候,我們講到,執行緒安全需要解決的本質問題,是保證不變數被破壞的中間狀態,資料結構僅只對修改它的那個執行緒可見。從此出發,我們不難理解以下兩個推論。

  • 因為僅僅「讀取」資料不會破壞不變數,所以多個執行緒同時讀取某個資料結構是安全的。
  • 但是,另一方面,如果有一個執行緒嘗試對資料進行修改,那麼若有其他執行緒在訪問該資料結構(不論讀寫),都可能是不安全的。

這樣一來,不難發現,對於「讀」和「寫」兩類操作,資料結構所需的「保護」,其程度是不一樣的。

  • 若一個執行緒僅只是讀取一個數據結構,那麼只需保證沒有其他執行緒同時寫入即可,但其它執行緒對資料結構的讀操作是安全的。
  • 若一個執行緒嘗試修改一個數據結構(寫操作),那麼其它執行緒對該資料結構的讀寫操作都是不安全的,因而應該被禁止。

對一個頻繁進行寫操作的資料結構來說,按照讀寫操作,區分保護程度意義不大。這是因為,區分兩種程度的保護,必然帶來額外的開銷。而若是某個資料結構的讀操作的頻率遠遠大於寫操作,那麼進行這樣的區分,從而降低鎖的粒度,收益就很客觀了。

為此,我們引入 std::shared_mutex 的概念。除了和一般的 std::mutex 一樣提供 lock(), try_lock()unlock() 之外,std::shared_mutex 還提供了 lock_shared(), try_lock_shared()unlock_shared() 三個操作。

阻塞情況表 自由 被共享鎖住 被獨佔鎖住
lock() 以獨佔方式鎖住 阻塞 阻塞
lock_shared() 以共享方式鎖住 以共享方式鎖住 阻塞

std::shared_mutex 直到 C++17 才被引入。若你的編譯器不支援 C++17,請升級你的編譯器,或者使用 boost::shared_mutex 代替。

於是,我們可以使用 std::shared_mutex 保護頻繁讀取而甚少寫入的資料結構,並在讀取時使用 lock_shared() 鎖住互斥量,而在寫入時使用 lock() 鎖住互斥量。

std::unique_lock 對應,標準庫也提供了 std::shared_lock 容器。它會在構造時,嘗試以 lock_shared() 鎖住傳入的共享互斥量,並在銷燬時,確保以 unlock_shared() 的方式釋放共享互斥量。同時,std::shared_lock 也提供了 lock()unlock() 介面,用於以共享的方式鎖住或者解鎖構造時關聯的共享互斥量。

std::shared_lock 直到 C++14 才被引入。若你的編譯器不支援 C++14,請升級你的編譯器,或者使用 boost::shared_lock 代替。

讀寫鎖的一個典型應用場景是執行緒共享的資料快取。對於快取來說,存在於快取內的條目(entry)通常會被頻繁讀取,而寫操作則相對來說低頻很多。比如,DNS 伺服器上的快取就是這樣的情況。DNS 解析記錄一般來說是非常穩定的——頻繁更換解析結果的域名總是少數。這裡以 DNS 快取作為讀寫鎖的簡單示例。

#include <unordered_map>
#include <string>
#include <mutex>
#include <shared_mutex>

class DNSEntry;

class DNSCache {
 public:
    typedef std::unordered_map<std::string, DNSEntry> RecMap;

 private:
    RecMap entries_;
    mutable std::shared_mutex entry_mutex_;                     // 1.

 public:
    DNDEntry find_entry(const std::string& domain) const {
        std::shared_lock<std::shared_mutex> slk(entry_mutex_);  // 2.
        const RecMap::const_iterator target = entries_.find(domain);
        const bool found = (target != entries_.end());
        return found ? target->second : DNSEntry();
    }
    void update_one_entry(const std::string& domain, const DNSEntry& entry) {
        std::unique_lock<std::shared_mutex> ulk(entry_mutex_);  // 3.
        entries_[domain] = entry;
        return;
    }
};

此處 (1) 為保護 RecMap 引入了一個共享互斥量,它是 mutable 的,因而允許在 const 成員函式中做修改。(2) 通過 std::shared_lock 容器,以共享的方式鎖住互斥量,保證讀操作的穩定;(3) 則通過 std::unique_lock 容器,以獨佔的方式鎖住互斥量,保證寫操作的安全。

保護資料的初始化過程

我們在前作中,介紹了一個 GetInstance 函式,其實就是單例模式。通常,這種用法適用於構造過程開銷很大,而使用過程本身是執行緒安全的情況(比如連線資料庫的過程)。線上程中使用 GetInstance 函式獲取資料的指標,而不是在程序啟動時構造資料,可以加快程式的啟動速度,減少總體的等待時間。為了避免額外的獲取鎖的操作,前作首先使用了兩次指標檢查的方式。

volatile T* pInst = nullptr;
T* GetInstance() {
    if (nullptr == pInst) {
        lock();
        if (nullptr == pInst) {
            pInst = new T;
        }
        unlock();
    }
    return pInst;
}

第二次檢查是為了防止第一次檢查和lock之間的時間空隙導致pInst被改變,從而不再是nullptr。

然而,由於 CPU 的動態排程,這樣的程式碼可能引發嚴重的問題。於是,前作引入了基於作業系統架構的解決方案。

#define barrier() __asm__ volatile("mfence")
volatile T* pInst = nullptr;
T* GetInstance() {
    if (nullptr == pInst) {
        lock();
        if (nullptr == pInst) {
            T* temp = new T;
            barrier();
            pInst   = temp;
        }
        unlock();
    }
    return pInst;
}

然而,mfence 是 i386 架構特有的指令。因此,這份程式碼在別的架構上無法正確執行。為了保證通用性,C++11 引入了 std::once_flagstd::call_once 解決這類問題。

#include <mutex>

volatile T* pInst = nullptr;
std::once_flag flag_T;                          // 1.

void ConstructInstance() {                      // 2.
    pInst = new T;
}

T* GetInstance() {
    std::call_once(flag_T, ConstructInstance);  // 3.
    return pInst;
}

此處,(1) 初始化了一個對於型別 T 的哨兵變數,用於標記 T 型別的例項是否已經初始化。(2) 則是對例項初始化的封裝。在 (3) 處,我們使用 std::call_once 確保 ConstructInstance 有且只有一次呼叫,從而返回正確的例項物件的指標。

鎖解決不了的競爭狀態

介面固有的競爭狀態

使用鎖保護資料,通過阻塞其它執行緒的方式,可以避免一些競爭狀態。因此,在一些資料結構中,使用鎖可以保證資料結構對外的幾個介面互相之間是執行緒安全的。然而,這並不意味著在外部呼叫這些介面就一定是執行緒安全的。事實上,這些介面本身可能存在固有的競爭狀態,因而在其內部使用鎖保護資料不能完全解決問題。

舉例來說,對於一個標準的棧,除去其建構函式和交換函式 swap(),還有五個介面:

  • push(): 將新元素壓棧;
  • pop(): 彈出棧頂元素;
  • top(): 返回棧頂元素;
  • empty(): 判斷棧是否為空;
  • size(): 返回棧的大小。

這五個介面中,隱含了兩類固有的競爭狀態。

第一類:通過 empty()size() 判斷棧狀態,而後對棧做其他操作。這是競爭狀態的原因在於,在多執行緒環境中,empty()size() 的返回值是不可信的。比如,在 A 執行緒呼叫 empty() 並返回 false 之後,B 執行緒可能緊接著清空了整個棧,而後 A 執行緒基於上述 false 判斷呼叫 top() 函式就會產生不符合預期的結果。

第二類:首先通過 top() 獲取棧頂元素,而後通過 pop() 彈棧該元素。之所以這也是競爭狀態,是因為在 top()pop() 之間,其它執行緒可能進行額外的 push() 或者 pop() 操作,於是當前執行緒彈出的元素不一定是通過 top() 獲取的那個元素。

不難發現,因為這兩類固有的競爭狀態,不論棧的內部如何實現,外部使用棧時,都可能有執行緒不安全的情況。

top()pop() 分離的原因

上述兩類固有的競爭狀態源自棧的介面設計。對於第一類競爭狀態來說,如此設計似乎情有可原。但是,為什麼要將 top()pop() 分離開呢?

簡單來說,top() 將棧頂元素返回給呼叫者的過程意味著存在一次元素的拷貝。如果棧頂元素體積很大,比如是一個非常長的 std::vector<int>,那麼在拷貝的過程中,可能因為系統負載相對資源過高,而丟擲 std::bad_alloc 異常。對於現有的實現來說,即使丟擲異常,棧內的元素還是完整的。但若是將 pop() 實現在彈棧之後將被彈棧的棧頂元素返回給呼叫者,則在上述異常可能發生在棧已經被修改之後。若是前一種情況(即當前的實現),呼叫者在收到異常時,可以嘗試進行一些處理;但是,在後一種情況下,即使呼叫者嘗試做了一些記憶體清理工作,棧中的目標元素也已經被銷燬了。

修改介面,實現執行緒安全

以下是一個執行緒安全的棧的簡單實現,分析後附。

#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack : public std::exception {                    // 1.
    const char* what() const throw();
};

template<typename T>
class threadsafe_stack {
 private:
    std::stack<T> stack_;                                       // 2.
    mutable std::mutex mtx_;                                    // 3.

 public:
    threadsafe_stack() {}
    explicit threadsafe_stack(const threadsafe_stack& source) {
        std::lock_guard<std::mutex> slk(source.mtx_);
        stack_ = source.stack_;
    }
    threadsafe_stack& operator=(const threadsafe_stack& source) {
        std::lock_guard<std::mutex> slk(source.mtx_);
        stack_ = source.stack_;
    }

    bool empty() const {                                        // 4.
        std::lock_guard<std::mutex> slk(mtx_);
        return stack_.empty();
    }
    size_t size() const {
        std::lock_guard<std::mutex> slk(mtx_);
        return stack_.size();
    }

    void push(T element) {
        std::lock_guard<std::mutex> ulk(mtx_);
        stack_.push(element);
    }
    std::shared_ptr<T> pop() {                                  // 5.
        std::lock_guard<std::mutex> ulk(mtx_);
        if (stack_.empty()) {
            throw empty_stack();
        }
        std::shared_ptr res{std::make_shared<T>(stack_.top())};
        stack_.pop();
        return res;
    }
    void pop(T& ref_holder) {                                   // 6.
        std::lock_guard<std::mutex> ulk(mtx_);
        if (stack_.empty()) {
            throw empty_stack();
        }
        ref_holder = stack_.top();
        stack_.pop();
    }
};

此處我們實現了名為 threadsafe_stack 的模板類。其中,顯而易見,我們的執行緒安全棧的實現是基於標準庫中的棧的 (2),並且為了實現執行緒安全,我們使用了一個互斥量來保護棧物件 (3)。此外,儘管存在一些只讀的公開成員函式 (4);但是,考慮到實際使用中,大量的棧操作都是寫操作,因此 (3) 沒有使用讀寫鎖。

為了解決第一類固有競爭狀態,我們首先在 (1) 處定義了空棧異常——我們讓對空棧進行的 pop() 操作 (5, 6) 丟擲空棧異常。如此,在呼叫處使用 try ... catch ... 語句塊,就能實現預期的行為,同時避免介面競爭。

為了避免第二類固有競爭狀態,我們取消了 top() 函式,而將它的功能合併入 pop() 函式。同時,為了避免在丟擲 std::bad_alloc 時元素已彈棧導致的資料丟失的問題,我們在內部棧物件彈棧之前,嘗試將目標元素拷貝 (5) 或賦值 (6) 到其它地方。最後,我們返回拷貝的結果的指標 (5) 或引用 (6) 傳給呼叫者。如此,就避免了第二類競爭。

小結

如我們在前作最後提到的,「執行緒安全」是一個燙手山芋,不存在放之四海而皆準的解決方案(所謂「沒有銀彈」)。因此,為了寫出執行緒安全的程式碼,我們必須在理解問題之起因的基礎上,具體問題具體分析。

為此,此篇從「不變數」開始,引出線上程中共享資料的「競爭狀態」——執行緒安全問題的根源。而後就如何解決問題展開了一系列的討論。首先,我們介紹瞭如何使用標準庫提供的「鎖」來保護共享資料結構,並介紹了和鎖相關的一些話題(如死鎖問題、鎖的粒度等)。而後,我們通過實現執行緒安全的棧,討論了鎖無法解決問題時,應當怎麼辦。

此篇無法窮盡所有和執行緒安全、鎖、死鎖相關的話題和技術。但是,建立在理解的基礎上,讀者應該能對執行緒安全有直觀的認識。我想這應該是有益的。