1. 程式人生 > 其它 >現代 C++ 對多執行緒/併發的支援(上) -- 節選自 C++ 之父的 《A Tour of C++》

現代 C++ 對多執行緒/併發的支援(上) -- 節選自 C++ 之父的 《A Tour of C++》

本文翻譯自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅(A Tour of C++)一書的第 13 章 Concurrency。用短短數十頁,帶你一窺現代 C++ 對併發/多執行緒的支援。原文地址:現代 C++ 對多執行緒/併發的支援(上) -- 節選自 C++ 之父的 《A Tour of C++》 水平有限,難以用另一種語言原汁原味地還原,有條件的建議直接閱讀原版書籍。

13 併發

目錄

13.1 介紹

併發,即同時執行多個任務,常用來提高吞吐量(通過利用多處理器進行同一個計算)或者改善響應性(等待回覆的時候,允許程式的其他部分繼續執行)。所有現代語言都支援併發。C++ 標準庫提供了可移植、型別安全的併發支援,經過 20 多年的發展,已經幾乎被所有現代硬體所支援。標準庫提供的主要是系統級的併發支援,而非複雜的、更高層次的併發模型;其他庫可以基於標準庫,提供更高級別的併發支援。

C++ 提供了適當的記憶體模型(memory model)和一組原子操作(atomic operation),以支援在同一地址空間內併發執行多個執行緒。原子操作使得無鎖程式設計

成為可能。記憶體模型保證了在避免資料競爭(data races,不受控地同時訪問可變資料)的前提下,一切按照預期工作。

本章將給出標準庫對併發的主要支援示例:threadmutexlock()packaged_task 以及 future。這些特徵直接基於作業系統構建,相較於作業系統原生支援,不會帶來效能損失,也不保證會有顯著的效能提升。

為什麼要用標準庫而非作業系統的併發?可移植性。

不要把併發當作靈丹妙藥:如果順序執行可以搞定,通常順序會比並發更簡單、更快速!

你真的需要多執行緒併發嗎?寫個 demo 測試下效能?

13.2 任務和執行緒

如果一個計算有可能(potentially)和另一個計算併發執行,我們稱之為任務

(task)。執行緒是任務的系統級表示。任務可以通過構造一個 std::thread (定義在 <thread> 中)來啟動,任務作為引數。任務是一個函式或者函式物件

void f();              // 函式

struct F {             // 函式物件
    void operator()()  // F 的呼叫操作符
};

void user()
{
    thread t1 {f};     // f() 在另一個執行緒中執行
    thread t2 {F()};   // F()() 在另一個執行緒中執行
    
    t1.join();  // 等待 t1
    t2.join();  // 等待 t2
}

join() 確保執行緒完成後才退出 user(),“join”執行緒的意思是“等待執行緒結束”。

一個程式的執行緒共享同一地址空間。執行緒不同於程序,程序通常不直接共享資料。執行緒間可以通過共享物件(shared object)通訊,這類通訊一般用鎖或其他機制控制,以避免資料競爭。

編寫併發任務可能會非常棘手:

void f() {cout << "Hello ";}
struct F {
    void operator()() {cout << "Parallel World!\n";}
};

上述例子有個錯誤:fF() 都用到了 cout 物件,卻沒有任何形式的同步。這會導致輸出的結果不可預測,多次執行的結果可能會得到不同的結果:因為兩個任務的執行順序是未定義的。程式可能有詭異的輸出,比如:

PaHerallllel o World!

定義一個併發程式中的任務時,我們的目標是保持任務之間的完全獨立,除非他們之間僅僅是簡單、明顯的通訊。最簡單的方法就是把併發任務看作是一個恰巧可以和呼叫者同時執行的函式:我們只要傳遞引數、取回結果,保證該過程中沒有使用共享資料(沒有資料競爭)即可。

13.3 傳遞引數

一般來說,任務需要基於資料執行。我們可以通過引數傳遞資料(或者資料的指標或引用)。

void f(vector<double>& v); // 處理 v 的函式

struct F {                 // 處理 v 的函式物件
    vector<double>& v;
    F(vector<double>& vv) : v(vv) {}
    void operator()();
};

int main()
{
    vector<double> some_vec{1,2,3,4,5,6,7,8,9};
    vector<double> vec2{10,11,12,13,14};

    thread t1{f,ref(some_vec)}; // f(some_vec) 在另一個執行緒中執行
    thread t2{F{vec2}};         // F{vec2}() 在另一個執行緒中執行

    t1.join();
    t2.join();
}

F{vec2}F 中儲存了引數 vector 的引用。F 現在可以使用這個 vector。但願在 F 執行時,沒有其他任務訪問 vec2。如果通過值傳遞 vec2 就可以消除這個隱患。

t1 通過 {f,ref(some_vec)} 初始化,用到了 thread可變引數模板構造,可以接受任意序列的引數。ref() 是來自 <functional> 的型別函式。為了讓可變引數模板把 some_vec 當作一個引用而非物件,ref() 不能省略。編譯器檢查第一個引數可以通過其後面的引數呼叫,並構建必要的函式物件,傳遞給執行緒。如果 F::operator()()f() 執行了相同的演算法,兩個任務的處理幾乎是等同的:兩種情況下,都各自構建了一個函式物件,讓 thread 去執行。

可變引數模板需要用 ref()cref() 傳遞引用

13.4 返回結果

13.3 的例子中,我傳了一個非 const 的引用。我只有在期待任務修改引用資料的值時才這麼做。儘管這是一種很常見的獲取返回結果的方式,但這麼做並不能向他人傳達清晰、明確的意圖,有些偷偷摸摸暗中操作的感覺。一種不那麼晦澀的方式是通過 const 引用傳遞輸入資料,通過另外單獨的引數傳遞儲存結果的指標。

void f(vector<double>& v, double *res); // 從 v 獲取輸入; 結果存入 *res

class F {
public:
    F(vector<double>& vv, double *p) : v(vv), res(p) {}
    void operator()();  // 結果儲存到 *res

private:
    vector<double>& v;  // 輸入源
    double *p;          // 輸出地址
};

int main()
{
    vector<double> some_vec;
    vector<double> vec2;

    double res1;
    double res2;

    thread t1{f,ref(some_vec),&res1}; // f(some_vec,&res1) 在另一個執行緒中執行
    thread t2{F{vec2,&res2}};         // F{vec2,&res2}() 在另一個執行緒中執行

    t1.join();
    t2.join();
}

這樣行得通,也很常用。但我不覺得通過引數傳遞返回結果有多優雅,我會在 13.7.1 節再次討論這個話題。

通過引數(出參)傳遞結果並不優雅

13.5 共享資料

有時任務需要共享資料,這種情況下,對共享資料的訪問需要進行同步,同一時刻只能有一個任務訪問資料(但是多工同時讀取不變的資料是沒有問題的)。我們要考慮如何保證在同一時刻最多隻有一個任務能夠訪問一組物件。

解決這個問題的基本元素是 mutex(mutual exclusion object,互斥物件)。thread 通過 lock() 獲取 mutex

int shared_data;
mutex m;          // 用於控制 shared_data 的 mutex

void f()
{
    unique_lock<mutex> lck{m};  // 獲取 mutex
    shared_data += 7;           // 操作共享資料
}   // 離開 f() 作用域,隱式自動釋放 mutex

unique_lock 的建構函式通過 m.lock() 獲取 mutex。如果另一個執行緒已經獲取這個 mutex,當前執行緒等待(阻塞)直到另一個執行緒(通過 m.unlock())釋放該 mutex。當 mutex 釋放,等待該 mutex 的執行緒恢復執行(喚醒)。互斥、鎖宣告在 <mutex> 標頭檔案中。

共享資料和 mutex 之間的關聯需要達成一致:程式設計師需要知道哪個 mutex 對應哪個資料。這樣很容易出錯,但是我們可以通過一些方式使得他們之間的關係更清晰:

class Record {
public:
    mutex rm;
};

不難猜到,對於一個 Record 物件 rec,在訪問 rec 其他資料之前,你應該先獲取 rec.rm。最好通過註釋或者良好的命名讓讀者清楚地知道 mutex 和資料的關聯。

有時執行某些操作需要同時訪問多個資源,有可能導致死鎖。例如,thread1 已經獲取了 mutex1,然後嘗試獲取 mutex2;與此同時,thread2 已經獲取 mutex2,嘗試獲取 mutex1。在這種情況下,兩個任務都無法進行下去。標準庫支援同時獲取多個鎖:

void f()
{
    unique_lock<mutex> lck1{m1,defer_lock};  // defer_lock:不立即獲取 mutex
    unique_lock<mutex> lck2{m2,defer_lock};
    unique_lock<mutex> lck3{m3,defer_lock};

    lock(lck1,lck2,lck3);
    // 操作共享資料
}   // 離開 f() 作用域,隱式自動釋放所有 mutexes

lock() 只有在獲取所有引數裡的的 mutex 之後繼續執行,並且在其持有 mutex 期間,不會阻塞(go to sleep)。每個 unique_lock 的析構會確保離開作用域時,自動釋放所有的 mutex。

通過共享資料通訊是相對底層的操作。程式設計人員要設計一套機制,弄清楚哪些任務完成了哪些工作,還有哪些未完成。從這個角度看, 使用共享資料不如直接呼叫函式、返回結果。另一方面,有些人認為共享資料比拷貝引數和返回值效率更高。這個觀點可能在涉及大量資料的時候成立,但是 locking 和 unlocking 也是相對耗時的操作。不僅如此,現代計算機很擅長拷貝資料,尤其是像 vector 這種連續儲存的元素。所以,不要僅僅因為“效率”而選用共享資料進行通訊,除非你真正實際測量過。

13.6 等待事件

有時執行緒需要等待外部事件,比如另一個執行緒完成了任務或者經過了一段時間。最簡單的事件是時間。藉助 <chrono>,可以寫出:

using namespace std::chrono;
auto t0 = high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1 = high_resolution_clock::now();

cout << duration_cast<nanoseconds>(t1-t0).count() << " nanoseconds passed\b";

注意,我甚至沒有啟動一個執行緒;預設情況下,this_thread 指當前唯一的執行緒。

我用 duration_cast 把時間單位轉成了我想要的 nanoseconds。

condition_variable(定義在 <condition_variable>)提供了對通過外部事件通訊的支援。condition_variable 允許一個執行緒等待另一個執行緒,尤其是允許一個執行緒等待某個(由於其他執行緒工作結束)條件/事件發生。

condition_variable 支援很多優雅、高效的共享形式,但也可能會很棘手。考慮一個經典的生產者-消費者例子:兩個執行緒通過一個佇列傳遞訊息:

class Message { /**/ }; // 通訊的物件

queue<Message> q;       // 訊息佇列
condition_variable cv;  // 傳遞事件的變數
mutex m;                // locking 機制

queuecondition_variable 以及 mutex 由標準庫提供。

消費者讀取並處理 Message

void consumer()
{
    while(true){
        unique_lock<mutex> lck{m}; // 獲取 mutex m
        cv.wait(lck);              // 釋放 lck,等待
                                   // 喚醒時重新獲得 lck
        auto m = q.front();        // 取出 Message m
        q.pop();
        lck.unlock();              // 後續處理訊息不再操作佇列 q,提前釋放 lck
        // 處理 m
    }
}

這裡我顯式地用 unique_lock<mutex> 保護 queuecondition_variable 上的操作。condition_variable 上的 cv.wait(lck) 會釋放參數中的鎖 lck,直到等待結束(佇列非空),然後再次獲取 lck

相應的生產者程式碼:

void producer()
{
    while(true) {
        Message m;
        // 填充 m
        unique_lock<mutex> lck{m}; // 保護操作
        q.push(m);
        cv.notify_one();           // 通知
    } // 作用域結束自動釋放鎖
}

到目前為止,不論是 thread、mutex、lock 還是 condition_variable,都還是低層次的抽象。接下來我們馬上就能看到 C++ 對併發的高階抽象支援。

13.7 通訊任務

標準庫還提供了一些機制,能夠讓程式設計師在更高的任務的概念層次上工作,而不是直接使用低層的執行緒、鎖:

  1. futurepromise:用於從另一個執行緒生成的任務中返回值
  2. packaged_task:幫助啟動任務,封裝了 futurepromise,並且建立兩者之間的關聯
  3. async():像呼叫一個函式那樣啟動一個任務。形式最簡單,但也最強大!

上述機制在標頭檔案 <future> 中。

篇幅有點長,先到這裡,餘下的內容單獨寫一篇:現代 C++ 對多執行緒/併發的支援(下) -- 節選自 C++ 之父的 《A Tour of C++》

13.7.1 future 和 promise

13.7.2 packaged_task

13.7.3 async()

13.8 建議