1. 程式人生 > 資訊 >阿里雲現人事變動:中國區總裁任庚(M6)離職,多名高 P 離職

阿里雲現人事變動:中國區總裁任庚(M6)離職,多名高 P 離職

1、原因

掌握併發程式設計技術,利用多核處理來提升軟體專案的效能是軟體工程師一項基本技能。本文以c++語言為例,探索如何進行併發程式設計。內容涉及C++11,C++14以及C++17的主要內容。

測試環境:MacBook Pro ,處理器 M1, 編譯器 gcc ,IDE xcode。

2、併發與並行

Erlang之父Joe Armstrong曾經以人們使用咖啡機的場景為例描述這兩個術語。

 併發(Concurrent):如果多個佇列可以交替使用某臺咖啡機,則這一行為就是併發的。

並行(Parallel):如果存在多臺咖啡機可以被多個佇列交替使用,則就是並行。

這裡佇列中的每個人類比於計算機的任務,咖啡機類比於計算機處理器。因此:併發和並行都是在多工的環境下的討論。

更嚴格的來說:如果一個系統支援多個動作同時存在,那麼這是一個併發系統。如果這個系統還支援多個動作(物理時間上)同時執行,則這個系統上一個並行系統。

所以並行其實是併發的子集。它們的區別在於是否具有多個處理器。如果存在多個處理器同時執行多個執行緒,就是並行。

在不考慮處理器數量的情況下,我們統稱之為“併發”。

3、併發系統的效能

開發併發系統最主要的動機就是提升系統性能(事實上,這是以增加複雜度為代價的)

但是,單純的使用多執行緒並不一定能提升系統系能(當然,也並非執行緒越多系統的效能就越好)。從上面的兩幅圖中可以直觀感受到:執行緒(任務)的數量要根據具體的處理器數量來決定。假設只有一個處理器,那麼劃分太多執行緒可能適得其反。因為很多時間都花在任務切換上。

所以,在涉及併發系統之前,一方面要做好對硬體效能的瞭解,另一個方面需要對我們的任務有足夠的認識。

具體參考阿姆達爾定律。

4、c++與併發程式設計

並非所有的語言都提供了多執行緒的環境。

即便是c++語言,知道c++11標準之前,也是沒有執行緒支援的。在這種情況下,Linux/Unix平臺的開發者通常會使用POSIX Threads,windows上的開發者也有相應的藉口。但很明顯,這些API都只針對特定的作業系統平臺,可移植性較差。如果要同時支援Linux和Windows系統,就可能要寫兩套程式碼。

這個狀態在c++11標準釋出之後得到了改變。並且,在c++14 和c++17標準中又對併發程式設計機制進行了增強。

下圖是最近幾個版本的c++標準特性的線路圖:

編譯器對於語言特性的支援是逐步完成的。想要使用特定的特性則需要相應版本的編譯器。

下面表格列出了c++標準和相應的編譯器版本對照:

  • c++標準與相應的gcc版本要求如下:
  • c++標準與相應的clang版本要求如下:

預設情況下變壓器是以較低的標準來編譯的,如果希望使用新的標準,則需要通過編譯引數-std=c++xx告知編譯器。例如:

g++ -std=c++17 your_file.cpp -o your_program

5、執行緒

5.1 建立執行緒

如下所示:

#include <iostream>
#include <thread> 

using namespace std; 

void hello() { 
  cout << "Hello World from new thread." << endl;
}

int main() {
  thread t(hello); 
  t.join(); 

  return 0;
}

thread可以和callable型別一起工作,如可以直接用lambda表示式:

#include <iostream>
#include <thread>

using namespace std;

int main() {
  thread t([] {
    cout << "Hello World from lambda thread." << endl;
  });

  t.join();

  return 0;
}

也可以傳遞引數給入口函式:

請注意,引數是以拷貝的形式傳遞的。因此,對於拷貝耗時的物件可能需要傳遞指標或者引用型別作為引數。如果是傳遞指標或者引用,則需要考慮引數物件的生命週期。因為執行緒的執行長度很可能會超過引數的生命週期,這個時候如果執行緒還在訪問一個已經被銷燬的物件就會出現問題。

5.2 join和detach

  • 主要API

一旦啟動執行緒之後,我們必須決定是要等待它結束(通過jion),還是讓它獨立執行(通過detach),我們必須二者選擇其一。如果在thread物件銷燬的時候我們還沒做決定,則thread物件在解構函式處將呼叫std::terminate()從而導致我們的程序異常退出。

需要注意的是:在我們做決定的時候,很可能執行緒已經執行完了(例如上面的示例中執行緒的邏輯僅僅是一句列印,執行時間會很短)。新的執行緒建立之後,究竟是新的執行緒執行,還是當前執行緒的下一條語句先執行這是不確定的,因為這是由作業系統的排程策略決定的。不過這不要緊,我們只要在thread物件銷燬之前做決定即可。

  • join:呼叫此介面時,當前執行緒會一直阻塞,直到目標執行緒執行完成(當然,很可能目標執行緒在此呼叫之前就已經執行完成了,不過這不要緊)。因此,如果目標執行緒的任務非常耗時,你就要考慮是否需要在主執行緒上等待它了,因此這很可能會導致主執行緒卡住。
  • detach:detach是讓目標執行緒成為守護執行緒(deamon threads)。一旦detach之後,目標執行緒將獨立執行,即便其對應的thread物件銷燬也不影響執行緒的執行。並且,你無法再與之通訊。

對於這兩個介面,都必須是可執行執行緒才有意義。可以通過joinable()介面查詢是否可以對它們進行join或者detach。

  • 主要API

上面是一些線上程內部使用的API,它們用來對當前執行緒做一些控制。

  • yield 通常用在自己的主要任務已完成的時候,此時希望讓出處理器給其它任務使用。
  • get_id 返回當前執行緒ID,可以以此來標識不同的執行緒。
  • sleep_for 讓當前執行緒停止一段時間。
  • sleep_until 和sleep_for類似,但是是以具體的時間點為引數。這兩個API都以chrono API為基礎。

下面是一個程式碼示例:

void print_time() {
  auto now = chrono::system_clock::now();
  auto in_time_t = chrono::system_clock::to_time_t(now);

  std::stringstream ss;
  ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X");
  cout << "now is: " << ss.str() << endl;
}

void sleep_thread() {
  this_thread::sleep_for(chrono::seconds(3));
  cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;
}

void loop_thread() {
  for (int i = 0; i < 10; i++) {
    cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl;
  }
}

int main() {
  print_time();

  thread t1(sleep_thread);
  thread t2(loop_thread);

  t1.join();
  t2.detach();

  print_time();
  return 0;
}

上述程式碼建立兩個執行緒。它們都有一些輸出,其中一個會停止3秒鐘,然後在輸出。主執行緒呼叫join會一直卡住等待它執行結束。

輸出結果如下:

now is:2022-04-05 20:19:11
[thread-0x16ff13000] print:0
[thread-0x16ff13000] print:1
[thread-0x16ff13000] print:2
[thread-0x16ff13000] print:3
[thread-0x16ff13000] print:4
[thread-0x16ff13000] print:5
[thread-0x16ff13000] print:6
[thread-0x16ff13000] print:7
[thread-0x16ff13000] print:8
[thread-0x16ff13000] print:9
[thread-0x16fe87000] is waking up
now is:2022-04-05 20:19:14

5.3 一次呼叫

在某些情況下,有些任務需要執行一次,且只希望它執行一次,例如資源的初始化任務。這個時候可以用到上面的介面。這個介面會保證,即便在多執行緒的環境下,相應的函式也只會呼叫一次。

下面就是一個示例:有三個執行緒都會使用init函式,但是隻會有一個執行緒真正執行它。

void init() {
  cout << "Initialing..." << endl;
  // Do something...
}

void worker(once_flag* flag) {
  call_once(*flag, init);
}

int main() {
  once_flag flag;

  thread t1(worker, &flag);
  thread t2(worker, &flag);
  thread t3(worker, &flag);

  t1.join();
  t2.join();
  t3.join();

  return 0;
}

無法確定具體哪個執行緒會執行init。而事實上我們也不關心。因為只要某個執行緒完成這個初始化工作就可以了。

6 併發任務

下面以一個併發任務為示例,講述如何引入多執行緒。

任務:假設需要計算某個範圍內所有自然數的平方根之和,例如[1,10e8]。

單執行緒模模型下,程式碼如下:

static const int MAX=10e8;
static double sum = 0;
void worker(int min,int max){
    for(int i = min;i <= max;i++){
        sum +=sqrt(i);
    }
}
void serial_task(int min,int max){
    auto start_time = chrono::steady_clock::now();
    sum = 0;
    worker(min, max);
    auto end_time = chrono::steady_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end_time-start_time).count();
    cout<<"Serial task consumed "<<ms<<" ms.Result:"<<sum<<endl;
}
int main(int argc, const char * argv[]) {
    // insert code here...
    serial_task(0, MAX);
    return 0;
}

輸出結果如下:

Serial task consumed 3483 ms.Result:2.10819e+13

很顯然,單執行緒做法效能太差。且這個任務完全可以是併發執行的。

下面我們嘗試以多執行緒方式來改造原先的程式。

改造後的程式如下:

void concurrent_task(int min,int max){
    auto start_time = chrono::steady_clock::now();
    unsigned concurrent_count = thread::hardware_concurrency();
    cout<<"hardware_concurrency:"<<concurrent_count<<endl;
    vector<thread> threads;
    min = 0;
    sum = 0;
    for(int t=0;t<concurrent_count;t++){
        int range = max /concurrent_count*(t+1);
        threads.push_back(thread(worker,min,range));
        min = range + 1;
    }
    for(auto& t:threads)
    {
        t.join();
    }
    auto end_time = chrono::steady_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end_time-start_time).count();
    cout<<"Concurrent task consumed "<<ms<<" ms.Result:"<<sum<<endl;
}

輸出結果:

Concurrent task consumed 1178 ms.Result:4.94391e+12

效能是提升了,但結果是錯的。

要搞清楚結果為什麼不正確需要我們瞭解更過背景知識

對於現代處理器來說,為了提高處理速度,每個處理器都會有自己的快取記憶體(Cache),這個快取記憶體是與每個處理器相對應的。如下圖所示:

處理器在進行計算的時候,快取記憶體會參與起鬨,例如資料的讀和寫。而快取記憶體和系統主存(Memory)是有可能不一致的。即:某個結果計算後儲存在處理的快取記憶體中了,但是沒有同步到主存中,此時這個值對於其他處理器就是不可見的。

事情還遠不止這麼簡單,我們對於全域性變數值的修改:sum+=sqrt(i);這條語句,它並非原子的。它其實是很多條指令的組合才能完成。假設在某個裝置上,這條語句通過下面幾個步驟來完成。它們的時序可能如下所示:

在時間點a的時候,所有執行緒對於sum變數的值是一致的。

但是在時間點b之後,thread3上已經對sum進行了賦值,而這時候其他幾個執行緒也同時在其它處理器上使用這個值,那麼這個時候它們所使用的值就是舊的(錯誤的)。最後得到的結果也自然是錯誤的。

7 競爭條件與臨界區

當多個程序或者執行緒同時訪問共享資料時,只要有一個任務會修改資料,那麼就可能會發生問題。此時結果依賴於這些任務執行的相對時間,這種場景稱為競爭條件(race condition)。

訪問共享資料的程式碼片段稱之為臨界區(critical section)。具體到上面這個示例,臨界區就是讀寫sum變數的地方。

要避免競爭條件,就需要對臨界區進行資料保護。

很自然的,現在我們能夠理解發生競爭條件是因為這些執行緒同時訪問共享資料,其中有些執行緒的改動沒有讓其他執行緒知道,導致其他執行緒在錯誤的基礎上進行處理,結果自然也就是錯誤的。

那麼,如果一次只讓一個執行緒訪問共享資料,訪問完了再讓其他執行緒接著訪問這樣就可以避免問題的發生了。

下文介紹的API提供的就是這樣的功能。

8 互斥體與鎖

mutex

開發併發系統的目的主要是為了提升效能:將任務分散到多個執行緒,然後在不同的處理器上同時執行。這些分散開來的執行緒通常會包含兩類任務:

1、獨立的對於劃分給自己的資料的處理。

2、對處理的結果的彙總。

其中,第1項任務因為每個執行緒都是獨立的,不存在競爭條件的問題。而第2項任務,由於所有的執行緒都可能往總結果(例如上面的sum變數)彙總,這就需要做保護了。在某一個具體的時刻,只應當有一個執行緒更新總結果,即:保證每個執行緒對於共享資料的訪問是“互斥”的。mutex就提供了這樣的功能。

mutex是mutual exclusion(互斥)的簡寫。

主要API:

 很明顯,在這些類中,mutex是最基礎的API。其他類都是在它的基礎上改進。所以這些類都提供了下面三個方法,並且它們的功能是一樣的:

  • lock:鎖定互斥體,如果不可用,則阻塞。
  • try_lock:嘗試鎖定互斥體,如果不可用,則直接返回。
  • unlock:解鎖互斥體。

這三個方法提供了基礎的鎖定和解鎖功能。使用lock意味著你有很強的意願,一定要獲取到互斥體。而使用try_lock則是進行一次嘗試。這意味著如果失敗了,通常還有其他的路徑可以走。

在這些基礎功能上,其他的類分別在下面三個方面進行了擴充套件:

  • 超時:

timed_mutex,recursive_timed_mutex,shared_timed_mutex名稱都帶有timed,這意味著它們都支援超時功能。它們都提供了try_lock_for和try_lock_until方法,這兩個方法分別可以指定超時的時間長度和時間點。如果在超時的時間範圍內沒有獲取到鎖,則直接返回,不再繼續等待。

  • 可重入

recursive_mutex和recursive_timed_mutex的名稱都帶有recursive。可重入或者叫做可遞迴,是指在同一個執行緒中,同一把鎖可以鎖多次。這就避免了一些不必要的死鎖。

  • 共享

shared_timed_mutex和shared_mutex提供了共享功能。對於這類互斥體,實際上是提供了兩把鎖:一把是共享鎖,一把是互斥鎖。一旦某個執行緒獲取了互斥鎖,任務其他執行緒都無法在獲取互斥鎖和共享鎖;但是如果某個執行緒獲取到了共享鎖,其他執行緒無法再獲取到互斥鎖,但是還可以獲取到共享鎖。這裡互斥鎖的使用和其他互斥體介面和功能一樣。而共享鎖可以同時被多個執行緒同時獲取到(使用共享鎖的介面參見下文)。

 接下來,我們藉助mutex來改造我們的併發系統,改造後程序如下:

static const int MAX=10e8;
static double sum = 0;
static mutex exclusive;

void worker(int min,int max){
    for(int i = min;i <= max;i++){
        exclusive.lock();
        sum +=sqrt(i);
        exclusive.unlock();
    }
}

void concurrent_task(int min,int max){
    auto start_time = chrono::steady_clock::now();
    unsigned concurrent_count = thread::hardware_concurrency();
    cout<<"hardware_concurrency:"<<concurrent_count<<endl;
    vector<thread> threads;
    min = 0;
    sum = 0;
    for(int t=0;t<concurrent_count;t++){
        int range = max /concurrent_count*(t+1);
        threads.push_back(thread(worker,min,range));
        min = range + 1;
    }
    for(auto& t:threads)
    {
        t.join();
    }
    auto end_time = chrono::steady_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end_time-start_time).count();
    cout<<"Concurrent task consumed "<<ms<<" ms.Result:"<<sum<<endl;
}
int main(int argc, const char * argv[]) {
    // insert code here...
   // serial_task(0, MAX);
    concurrent_task(0, MAX);
    return 0;
}

這裡有三個地方需要注意:

1、在訪問共享資料之前加鎖。

2、訪問完成之後解鎖。

3、在多執行緒中使用帶鎖的版本

執行之後結果輸出如下:

hardware_concurrency:8
Concurrent task consumed 36695 ms.Result:2.10819e+13

這下結果是對了,但是我們卻發現這個版本比原先但執行緒版本效能還要差很多。這就是為什麼說多執行緒系統會增加系統的複雜度,並行並非多執行緒系統就一定有更好的效能。

不過,對於這個問題是可以改進的,即只有在最後彙總資料的時候進行一次鎖保護即可。

於是改造worker函式程式碼如下:

void worker(int min,int max){
    double tmp_sum=0;
    for(int i = min;i <= max;i++){
        tmp_sum +=sqrt(i);
    }
    exclusive.lock();
    sum +=tmp_sum;
    exclusive.unlock();
}

程式碼的改變在於兩處:

1、通過一個區域性變數儲存當前執行緒的處理結果。

2、在彙總總結果的時候進行鎖保護。

執行改進後的程式,輸出結果如下: 

hardware_concurrency:8
Concurrent task consumed 675 ms.Result:2.10819e+13

可以看到,效能一下子提升了好多倍。我們終於體會到執行緒帶來的好處。

我們用鎖的粒度(granularity)來描述鎖的範圍。細粒度(fine-grained)是指鎖保護較小的範圍,粗粒度(coarse-grained)是指鎖保護較大的範圍。出於效能的考慮,我們應該保證鎖的粒度儘可能的細。並且,不應該在獲取鎖的範圍內執行耗時的操作,例如執行IO。如果是耗時的運算,應該儘可能的移到鎖的外面。

In general,a lock should be held for only the minimun possible time needed to perform the required operations.

--《C++ Concurrency in Action》

9 死鎖

死鎖是指:兩個或以上的運算單元,每一方都在等待對方釋放資源,但是所有方都不願意釋放資源。結果是沒有任何一方能繼續推進下去,於是整個系統無法再繼續運轉。

示例:

假設編寫一個銀行系統的轉賬功能。首先建立一個Account類來描述銀行賬號。為了支援併發,這個類包含一個mutex物件,用來保護賬號金額,在讀寫賬號金額時需要先加鎖保護。

class Account{
public:
    Account(string name,double money):m_strName(name),m_dMoney(money){};
    void changeMoney(double amount){
        m_dMoney += amount;
    }
    string getName(){
        return m_strName;
    }
    double getMoney(){
        return m_dMoney;
    }
    mutex* getLock(){
        return &m_mutexMoneyLock;
    }
private:
    string m_strName;
    double m_dMoney;
    mutex m_mutexMoneyLock;
};

接下來,再建立Bank類。

class Bank{
public:
    void addAccount(Account* account){
        m_Accounts.insert(account);
    }
    bool transferMoney(Account* accountA,Account* accountB,double amount){
        lock_guard guardA(*accountA->getLock());
        lock_guard guardB(*accountB->getLock());
        if(amount>accountA->getMoney()){
            return false;
        }
        accountA->changeMoney(-amount);
        accountB->changeMoney(amount);
        return true;
    }
    double totalMoney() const{
        double sum = 0;
        for(auto a:m_Accounts){
            sum += a->getMoney();
        }
        return 0;
    }
private:
    set<Account*> m_Accounts;
};

銀行類中記錄了所有的賬號,並且提供了一個方法來查詢整個銀行的總金額。其中,最主要關注轉賬的實現:transferMoney。該方法關鍵點如下:

  1. 為了保證執行緒安全,在修改每個賬號之前,需要獲取相應的鎖。
  2. 判斷轉出賬號金額是否足夠,如果不夠此次轉賬失敗。
  3. 進行轉賬。

有了銀行和賬戶結構之後就可以編寫轉系統。

void randomTransfer(Bank* bank,Account* accountA,Account* accountB){
    while(true){
        double randomMoney=((double)random()/RAND_MAX)*100;
        if(bank->transferMoney(accountA, accountB, randomMoney)){
            cout<<"Transfer "<<randomMoney<<" from "<<accountA->getName()<<" to "<<accountB->getName()<<" ,Bank totalMoney:"<< bank->totalMoney()<<endl;
        }
        else{
            cout<<" Transfer failed,"<<accountA->getName()<<" has only  $"<<accountA->getMoney()<<" required"<<endl;
        }
    }
}

最後在main函式中建立兩個執行緒,互相在兩個賬號之間來回轉賬:

int main(int argc, const char * argv[]) {
    // insert code here...
    // serial_task(0, MAX);
    //concurrent_task(0, MAX);
    Account a("Paul",100);
    Account b("Moira",100);
    Bank aBank;
    aBank.addAccount(&a);
    aBank.addAccount(&b);
    
    thread t1(randomTransfer,&aBank,&a,&b);
    thread t2(randomTransfer,&aBank,&b,&a);
    t1.join();
    t2.join();
    
    return 0;
}

測試結果可能如下:

...
Transfer 83.3996 from Paul to Moira ,Bank totalMoney:0
Transfer 24.5672 from Paul to Moira ,Bank totalMoney:0
Transfer 2.06493 from Paul to Moira ,Bank totalMoney:0
Transfer 55.1409 from Paul to Moira ,Bank totalMoney:0
Transfer 3.7174 from Paul to Moira ,Bank totalMoney:0
 Transfer failed,Paul has only  $21.3084 required
9.80156 required

程式可能很快就卡住不動了。為什麼?因為發生了死鎖。

兩個執行緒邏輯是這樣的:這兩個執行緒可能會同時獲取其中一個賬號的鎖,然後又想獲取另一個賬號的鎖,此時就發生了死鎖。如下圖所示:

當然,發生死鎖的原因遠不止上面這一種情況。如果兩個執行緒互相join就可能發生死鎖。還有在一個執行緒中對一個不可重入對互斥體(例如mutex而非recursive_mutex)多次加鎖也會死鎖。

實際上,很多時候由於程式碼層次巢狀導致了死鎖的發生,由於呼叫關係的複雜導致發現這類的問題並不容易。

再仔細看上面的輸出,會發現另外一個問題:這裡的輸出時亂的。兩個執行緒的輸出混雜在一起。原因:兩個執行緒可能會同時輸出,沒有做好隔離。

下面我們開始逐步解決上面的問題:

首先是輸出混亂問題(專門用一把鎖來保護輸出邏輯即可):

mutex mutexCoutLock;
void randomTransfer(Bank* bank,Account* accountA,Account* accountB){
    while(true){
        double randomMoney=((double)random()/RAND_MAX)*100;
        if(bank->transferMoney(accountA, accountB, randomMoney)){
            mutexCoutLock.lock();
            cout<<"Transfer "<<randomMoney<<" from "<<accountA->getName()<<" to "<<accountB->getName()<<" ,Bank totalMoney:"<< bank->totalMoney()<<endl;
            mutexCoutLock.unlock();
        }
        else{
            mutexCoutLock.lock();
            cout<<" Transfer failed,"<<accountA->getName()<<" has only  $"<<accountA->getMoney()<<" required"<<endl;
            mutexCoutLock.unlock();
        }
    }
}

請思考兩處的lock和unlock呼叫,並考慮為什麼不在while(true)下面寫一次整體的加鎖和解鎖

10 通用鎖定演算法

要避免死鎖,需要仔細的思考和設計業務邏輯。

有一個比較簡單的原則可以避免死鎖,即:對所有的鎖進行排序,每次一定按照順序來獲取鎖,不允許亂序。例如:要獲取某個玩具,一定先拿到鎖A,再拿鎖B,才能玩玩具。這樣就不會鎖死了。

這個原則雖然簡單,但卻不容易遵守。因為資料常常是分散在很多地方的。

不過,好訊息是,c++11標準提供了一些工具來避免多把鎖而導致的死鎖。我們只要直接呼叫這些介面就可以了。這個就是上面提到的兩個函式。它們都支援傳入多個Lockable物件。

接下來我們用它來改造之前死鎖的轉賬系統:

bool transferMoney(Account* accountA,Account* accountB,double amount){
        lock(*accountA->getLock(),*accountB->getLock());
        lock_guard guardA(*accountA->getLock(),adopt_lock);
        lock_guard guardB(*accountB->getLock(),adopt_lock);
        if(amount>accountA->getMoney()){
            return false;
        }
        accountA->changeMoney(-amount);
        accountB->changeMoney(amount);
        return true;
    }

這裡只改動了3行程式碼。

 

  1. 通過lock函式來獲取兩把鎖,標準庫的實現會保證不會發生死鎖。
  2. lock_guard在下面會詳細介紹。此處只要知道它會在自身物件生命週期的範圍鎖定互斥體即可。建立lock_guard的目的是為了在transferMoney結束的時候釋放鎖,guardB也是一樣。但是需要注意的是,這裡傳遞了adopt_lock表示:現在是已經獲取到互斥體的狀態了,不用再次加鎖(如果不加adopt_lock就是二次鎖定了)。

改造程式後,執行結果如下:

...
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
 Transfer failed,Moira has only  $0.448583 required
...

 現在這個轉賬程式就會一直執行下去,不會出現死鎖。輸出也是正常的了。

11 通用互斥管理

  • 主要API

互斥體(mutex相關類)提供了對於資源的保護功能,但是手動的鎖的(呼叫lock或者try_lock)和解鎖(呼叫unlock)互斥體是要耗費是要耗費比較大的精力的,開發者需要精心考慮和設計程式碼才行。因為我們需要保證,在任何情況下,解鎖要和加鎖配對,因為假設出現一條路徑導致獲取到鎖之後沒有正常釋放,就會影響整個系統。如果考慮方法還可以丟擲異常,這樣的程式碼寫起來會更費勁。

鑑於這個原因,標準庫提供了上面這些API。它們都使用了叫做RAII的程式設計技巧,來簡化我們手動加鎖和解鎖的體力活。

請看下面的例子

int g_i=0;
mutex g_i_mutex;
void safe_increment()
{
    lock_guard<mutex> lock(g_i_mutex);
    ++g_i;
    cout<<this_thread::get_id()<<":"<<g_i<<endl;
}
int main(int argc, const char * argv[]) {
    // insert code here...
    cout<<"main:"<<g_i<<endl;
    thread t1(safe_increment);
    thread t2(safe_increment);
    t1.join();
    t2.join();
    cout<<"main:"<<g_i<<endl;
    return 0;
}

這段程式碼中:

  1. 全域性的互斥體g_i_mutex用來保護全域性變數g_i。
  2. 這是一個設計為可以被多執行緒環境使用的方法。因此需要使用互斥體來進行保護。這裡沒有呼叫lock方法,而是直接使用lock_guard來鎖定互斥體。
  3. 在方法結束的時候,區域性變數lock_guard<mutex>會被銷燬,它對互斥體的鎖的也就解除了。
  4. 在多執行緒中使用這個方法。
  • RAII

上面的幾個類(lock_guard,unique_lock,shared_lock,scoped_lock)都使用了一個叫做RAII的程式設計技巧。

RAII全稱是Resource Acquisition Is Initialization,即資源獲取即初始化。

 RAII是一種C++程式設計技術,它將必須在使用前請求的資源(例如:分配的堆記憶體、執行執行緒、開啟的套接字、開啟的檔案、鎖定的互斥體、磁碟空間、資料庫連線等任何存在受限供給中的事物)的生命週期與一個物件生存週期繫結。

RAII保證資源可用於任何會訪問該物件的函式,也保證所有資源在其控制物件的生存期結束時,以獲得順序的逆序釋放。類似地,若資源獲取失敗(建構函式以異常退出),則為已構造完成的物件和基類子物件所獲取的所有資源,會以初始化順序的逆序釋放。這有效地利用了語言特性以消除記憶體洩漏並保證異常安全。

RAII可總結如下:

  • 將每個資源封裝入一個類,其中:建構函式請求資源,並建立所有類不變式,或在它無法完成時丟擲異常。解構函式釋放資源並絕不丟擲異常。
  • 始終經由RAII類的例項使用滿足要求的資源,該資源自身擁有自動儲存期或臨時生存期,或具有與自動或臨時物件的生存期繫結的生存期。

回想一下上下文中的transferMoney方法中的三行程式碼:

lock(*accountA->getLock(), *accountB->getLock());
lock_guard lockA(*accountA->getLock(), adopt_lock);
lock_guard lockB(*accountB->getLock(), adopt_lock);

如果使用unique_lock這三行程式碼則還有一種等價的寫法:

unique_lock lockA(*accountA->getLock(), defer_lock);
unique_lock lockB(*accountB->getLock(), defer_lock);
lock(*accountA->getLock(), *accountB->getLock());

注意這裡的lock方法的呼叫位置。這裡先定義unique_lock指定了defer_lock,因此實際沒有鎖定互斥體,而是到第三行才進行鎖定。

最後,藉助scoped_lock,我們可以將三行程式碼合成一行,這種寫法也是等價的:

scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());

scoped_lock會在其生命週期範圍內鎖定互斥體,銷燬的時候解鎖。同時,它可以鎖定多個互斥體,並且避免死鎖。

  • 條件變數 
  1. condition_variable    C++11標準  提供與std::unique_lock關聯的條件變數。
  2. condition_variable_any  C++11標準  提供與任何鎖型別關聯的條件變數。
  3. notify_all_at_thread_exit C++11標準 安排到在此執行緒完全結束時對notify_all的呼叫。
  4. vc_status C++11 列出條件變數上定時等待的可能結果。

至此,我們還有一個地方可以改進。即:轉賬金額不足的時候,程式直接返回了false。這很難說是一個好的策略。因為即便雖然當前賬號金額不足以轉賬,但只要別的賬號又轉賬進來之後,當前這個轉賬操作或許就可以繼續執行了。

這在很多業務中是很常見的一個需求:每一次操作都要正確執行,如果條件不滿足就停下來等待,直到滿足條件之後再繼續。而不是直接返回。

條件變數提供了一個可以讓多執行緒間同步協作的功能。這對生產者-消費者模型很有意義。再這個模型下:

  • 生產者和消費者共享一個工作區。這個區間的大小是有限的。
  • 生產者總是生產資料放入工作區中,當工作區滿了。它就停下來等消費者消費一部分資料,然後進行工作。
  • 消費者總是從工作區中拿出資料使用。當工作區中的資料全部被消費空了之後,它也會停下來等待生產者往工作區中放入新的資料。

從上面可以看到,無論是生產者還是消費者,當它們工作條件不滿足時,它們並不是直接報錯返回,而是停下來等待,知道條件滿足。

下面我們就藉助條件變數,再次改造之前的銀行轉賬系統。這個改造主要在於賬號類。重點是調整changeMoney方法。

class Account{
public:
    Account(string name,double money):m_strName(name),m_dMoney(money){};
    void changeMoney(double amount){
        unique_lock lock(m_mutexMoneyLock);//2
        m_conditionVar.wait(lock,[this,amount]{ //3
            return m_dMoney + amount >0;//4
        });
        m_dMoney += amount;
        m_conditionVar.notify_all();//5
        
    }
    string getName(){
        return m_strName;
    }
    double getMoney(){
        return m_dMoney;
    }
    mutex* getLock(){
        return &m_mutexMoneyLock;
    }
private:
    string m_strName;
    double m_dMoney;
    mutex m_mutexMoneyLock;
    condition_variable m_conditionVar; //1
}

這裡幾處改動:

  1. 這裡聲明瞭一個條件變數,用來在多個執行緒之間協作。
  2. 這裡使用的是unique_lock,這是為了與條件變數相配合。因為條件變數會解鎖和重新鎖定互斥體。
  3. 這裡是一個比較重要的地方:通過條件變數進行等待。此時,會通過後面的lambda表示式判斷條件是否滿足。如果滿足則繼續。如果不滿足,則此處會解鎖互斥體,並讓當前執行緒等待。解鎖這一點非常重要,因為只有這樣,才能讓其他執行緒獲取互斥體。
  4. 這裡是條件變數等待的條件。
  5. 此處也很重要。當金額發生變動之後,我們需要通知所有的條件變數上等待的其他執行緒。此時,所有呼叫wait執行緒都會再次喚醒,然後嘗試獲取鎖(當然,只有一個能獲取到)並再次判斷條件是否滿足。除了notify_all還有notify_one,它只通知一個等待執行緒。wait和notify就構成了執行緒間互相協作的工具。

注意:wait和notify_all雖然是寫在一個函式中的,但是在執行時它們時在多執行緒環境中執行的,因此對於這段程式碼,需要能夠從不同執行緒的角度去思考程式碼的邏輯。這也是開發併發系統比較難的地方。

有了上述改動之後,銀行轉賬方法實現就比較簡單了,不用再考慮資料保護的問題了。

    bool transferMoney(Account* accountA,Account* accountB,double amount){
        accountA->changeMoney(-amount);
        accountB->changeMoney(amount);
        return true;
    }

當轉賬邏輯也會變得簡單,不用再管轉賬失敗的情況發生:

mutex mutexCoutLock;
void randomTransfer(Bank* bank,Account* accountA,Account* accountB){
    while(true){
        double randomMoney=((double)random()/RAND_MAX)*100;
        {
            lock_guard guard(mutexCoutLock);
            cout<<"Try to Transfer "<<randomMoney
            <<" from "<<accountA->getName() <<"("<<accountA->getMoney()<<")"
            <<" to "<<accountB->getName()<<"("<<accountB->getMoney()<<"),Back total money "<<bank->totalMoney()<<endl;
        }
        bank->transferMoney(accountA, accountB, randomMoney);
    }
}

修改完之後,程式執行輸出如下:

Try to Transfer 51.2932 from Moira(146.223) to Paul(53.7769),Back total money 200
Try to Transfer 83.9112 from Moira(94.9299) to Paul(105.07),Back total money 200
Try to Transfer 61.264 from Moira(11.0186) to Paul(188.981),Back total money 200
Try to Transfer 99.8925 from Paul(188.981) to Moira(11.0186),Back total money 200
Try to Transfer 29.6032 from Paul(89.0889) to Moira(110.911),Back total money 200
Try to Transfer 63.7552 from Paul(59.4857) to Moira(79.2503),Back total money 138.736
Try to Transfer 52.4287 from Moira(79.2503) to Paul(120.75),Back total money 200
Try to Transfer 97.2775 from Moira(90.5768) to Paul(109.423),Back total money 200
Try to Transfer 49.3583 from Paul(109.423) to Moira(90.5768),Back total money 200

這下比之前都要好了。

但是細心的讀者會發現,Bank totalMoney輸出有時候是200,有時候不是。但不管怎麼樣,即便這一次不是,下一次又是了。關於這點,請自行思考為什麼,以及如何改進。

  • future

這一小節中,我們來熟悉更多的可以在併發環境中使用的工具,它們都位於<future>標頭檔案中。

  async

很多語言提供了非同步機制。非同步使得耗時操作不影響當前主執行緒的執行流程。

在c++11中,async便是完成這樣的功能的:

static const int MAX=10e8;
static double sum = 0;
void worker(int min,int max){
   
    for(int i = min;i <= max;i++){
        sum +=sqrt(i);
    }
}

int main(int argc, const char * argv[]) {
    // insert code here...
    sum = 0;
    auto start_time = chrono::steady_clock::now();
    auto f1 = async(worker,0,MAX);
    cout<<"Async task triggered"<<endl;
    f1.wait();
    cout<<"Async task finish,result:"<<sum<<endl;
    auto end_time = chrono::steady_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end_time-start_time).count();
    cout<<"Async task consumed "<<ms<<" ms.Result:"<<sum<<endl;
    return 0;
}

上面程式碼中,我們使用一個lambda表示式來編寫非同步任務邏輯,並通過launch::async明確指定要通過獨立執行緒來執行任務,同時我們打印出了執行緒ID。輸出結果:

Async task triggered
Async task finish,result:2.10819e+13
Async task consumed 3475 ms.Result:2.10819e+13

這仍然是我們之前熟悉的例子。這裡有兩個地方需要說明:

  1. 這裡以非同步的方式啟動了任務。它返回一個future物件。future用來儲存非同步任務的執行結果,關於future我們在後面的packed_task的例子中再詳細說明。
  2. 此處是等待非同步任務執行完成。

需要注意的是,預設情況下,async是啟動一個新執行緒,還是以同步的方式(不啟動新的執行緒)執行任務,這一點標準是沒有指定的,由具體的編譯器決定。如果希望一定要以新的執行緒來非同步執行任務,可以通過launch::async來明確說明。launch中有兩個常量:

  • async:執行新執行緒,以非同步執行任務。
  • deferred:呼叫方執行緒上第一次請求其結果時才執行任務,即惰性求值。

除了通過函式來指定非同步任務,還可以lambda表示式來指定。如下所示:

int main(int argc, const char * argv[]) {
    // insert code here...
    double result =0;
    cout<<"Async task with lambda triggered,thread:"<<this_thread::get_id()<<endl;
    auto f2 = async(launch::async,[&result](){
        cout<<"Lambda task in thread:"<<this_thread::get_id()<<endl;
        for(int i=0;i<=MAX;i++){
            result +=sqrt(i);
        }
    });
    f2.wait();
    cout<<"Async task with lambda finish,result:"<<result<<endl;
    return 0;
}

輸出:

Async task with lambda triggered,thread:0x100103d40
Lambda task in thread:0x16fe87000
Async task with lambda finish,result:2.10819e+13

對於面向物件程式設計來說,很多時候肯定希望以物件的方法來執行非同步任務。例子如下:

class Worker{
public:
    Worker(int min,int max):m_min(min),m_max(max){} //1
    double work(){//2
        m_result=0;
        for(int i= m_min;i<=m_max;i++){
            m_result +=sqrt(i);
        }
        return m_result;
    }
    double getResult(){
        return m_result;
    }
private:
    int m_min;
    int m_max;
    double m_result;
};
int main(int argc, const char * argv[]) {
    // insert code here...
    Worker w(0,MAX);
    cout<<"Task in class triggered"<<endl;
    auto f3 = async(&Worker::work,&w); //3
    f3.wait();
    cout<<"Task in class finish,result:"<<w.getResult()<<endl;
    return 0;
}

這段程式碼有三處需要說明:

  1. 這裡通過一個類來描述任務。這個類是對前面提到的任務的封裝。它包含了任務的輸入引數和輸出結果。
  2. work函式是任務的主體邏輯。
  3. 通過async執行任務:這裡指定了具體的任務函式以及相應的物件。請注意這裡是&w,因此傳遞的是物件的指標。如果不寫&將傳入w物件的臨時複製。

 輸出:

Task in class triggered
Task in class finish,result:2.10819e+13

  packaged_task

在一些業務中,我們可能會有很多的任務需要排程。這時我們常常會設計出任務佇列和執行緒池的結構。此時,就可以使用packaged_task來包裝。

packaged_task繫結到一個函式或者可呼叫物件上。當它被呼叫時,它就會呼叫其繫結到函式或者可呼叫物件。並且,可以通過與之相關聯的future來獲取任務的結果。排程程式只需要處理packaged_task,而非各個函式。

packaged_task物件是一個可呼叫物件,它可以被封裝稱一個std::function,或者作為執行緒函式傳遞給std::thread,或者直接呼叫。

下面是一個程式碼示例:

static const int MAX = 10e8;
double concurrent_worker(int min,int max){
    double sum =0;
    for(int i=min;i<=max ;i++){
        sum += sqrt(i);
    }
    return  sum;
}
double concurrent_task(int min,int max){
    vector<future<double>> results;//1
    unsigned concurrent_count=thread::hardware_concurrency();
    min = 0;
    for(int i =0;i<concurrent_count;i++){ //2
        packaged_task<double(int,int)> task(concurrent_worker);//3
        results.push_back(task.get_future());//4
        int range = max /concurrent_count*(i+1);
        thread t(std::move(task),min,range); //5
        t.detach();
        min = range + 1;
    }
    cout<<"threads create finish"<<endl;
    double sum = 0;
    for (auto& r:results) {
        sum += r.get();//6
    }
    return  sum;
}
int main(int argc, const char * argv[]) {
    // insert code here...
    auto start_time = chrono::steady_clock::now();
    double r = concurrent_task(0,MAX);
    auto end_time = chrono::steady_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end_time-start_time).count();
    cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl;
    return 0;
}

輸出:

threads create finish
Concurrent task finish, 701 ms consumed, Result: 2.10819e+13

這段程式碼中:

  1. 首先建立一個集合來儲存future物件。我們用它來獲取任務結果。
  2. 同樣的,根據CPU的情況來建立執行緒數量。
  3. 將任務包裝成packaged_task。請注意,由於concurrent_worker被包裝成了任務,我們無法直接獲取它的return值。而是要通過future物件來獲取。
  4. 獲取任務關聯的future物件,並將其存入集合中。
  5. 通過一個新的執行緒來執行任務,並傳入需要的引數。
  6. 通過future集合,逐個獲取每個任務的計算結果,將其累加。這裡r.get()獲取到的就是每個任務中concurrent_worker的返回值。

為了簡單起見,這裡的示例只顯示了我們熟悉的例子和結構。但在實際上的工程中,呼叫關係通常更加複雜,你可以藉助於packaged_task將任務組裝成佇列,然後通過執行緒池的方式進行排程:

  promise與future

在上面的例子中,concurrent_task的結果是通過return返回的。但在一些時候,我們可能不能這麼做:在得到任務結果之後,可能還有一些事情需要繼續處理,例如清理工作。

這時候,就可以將primise與future配對使用。這樣就可以將返回結果和任務結束兩個事情分開。

下面是多上程式碼示例的改寫:

static const int MAX = 10e8;
double concurrent_worker(int min,int max){
    double sum =0;
    for(int i=min;i<=max ;i++){
        sum += sqrt(i);
    }
    return  sum;
}
void concurrent_task(int min,int max,promise<double>* result){//1
    vector<future<double>> results;
    unsigned concurrent_count=thread::hardware_concurrency();
    min = 0;
    for(int i =0;i<concurrent_count;i++){
        packaged_task<double(int,int)> task(concurrent_worker);
        results.push_back(task.get_future());
        int range = max /concurrent_count*(i+1);
        thread t(std::move(task),min,range);
        t.detach();
        min = range + 1;
    }
    cout<<"threads create finish"<<endl;
    double sum = 0;
    for (auto& r:results) {
        sum += r.get();
    }
    result->set_value(sum);
    cout << "concurrent_task finish" << endl;
}
int main(int argc, const char * argv[]) {
    // insert code here...
    auto start_time = chrono::steady_clock::now();
    promise<double> sum; //3
    concurrent_task(0,MAX,&sum);
    auto end_time = chrono::steady_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end_time-start_time).count();
    cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum.get_future().get() << endl;
    return 0;
}

輸出:

threads create finish
concurrent_task finish
Concurrent task finish, 731 ms consumed, Result: 2.10819e+13

這段程式碼和上面的示例在很大程度上是一樣的。只有小部分內容做了改動:

  1. concurrent_task不再直接返回計算結果,而是增加了一個promise物件來存放結果。
  2. 在任務計算完成之後,將總結果設定到primise物件上。一旦這裡呼叫了set_value,其相關聯的future物件就會就緒。
  3. 這裡是在main中建立一個promise來存放結果,並以指標的形式傳遞concurrent_task中。
  4. 通過sum.get_future().get()來獲取結果。第2點中已經說了,一旦呼叫了set_value,其相關聯的future物件就會就緒。

需要注意的是,future物件只有被一個執行緒獲取值。並且在呼叫get()之後,就沒有可以獲取的值了。如果從多個執行緒呼叫get會出現資料競爭,其結果是為定義的。

如果真的需要在多個執行緒中獲取future的結果,可以使用shared_future。

12、並行演算法

從c++17開始。<algorithm>和<numeric>標頭檔案中的很多演算法添加了一個新的引數:sequenced_policy。

藉助這個引數,開發可以直接使用這些演算法的並行版本,不用再自己建立併發系統和劃分資料來排程這些演算法。

sequenced_policy可能的取值有三種,它們的說明如下:

注意:本文的前面已經提到,目前clang編譯器還不支援這個功能。因此想要編譯這部分程式碼,你需要使用gcc 9.0或更高的版本。同時還需要安裝Intel Threading Building Blocks。

程式碼示例:

void generateRandomData(vector<double>& collection, int size) {
  random_device rd;
  mt19937 mt(rd());
  uniform_real_distribution<double> dist(1.0, 100.0);
  for (int i = 0; i < size; i++) {
    collection.push_back(dist(mt));
  }
}

int main() {
  vector<double> collection;
  generateRandomData(collection, 10e6); //

  vector<double> copy1(collection); //
  vector<double> copy2(collection);
  vector<double> copy3(collection);

  auto time1 = chrono::steady_clock::now(); //
  sort(execution::seq, copy1.begin(), copy1.end()); //
  auto time2 = chrono::steady_clock::now();
  auto duration = chrono::duration_cast<chrono::milliseconds>(time2 - time1).count();
  cout << "Sequenced sort consuming " << duration << "ms." << endl; //

  auto time3 = chrono::steady_clock::now();
  sort(execution::par, copy2.begin(),copy2.end()); //
  auto time4 = chrono::steady_clock::now();
  duration = chrono::duration_cast<chrono::milliseconds>(time4 - time3).count();
  cout << "Parallel sort consuming " << duration << "ms." << endl;

  auto time5 = chrono::steady_clock::now();
  sort(execution::par_unseq, copy2.begin(),copy2.end()); //
  auto time6 = chrono::steady_clock::now();
  duration = chrono::duration_cast<chrono::milliseconds>(time6 - time5).count();
  cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl;
}

這段程式碼很簡單:

  1. 通過一個函式生存1000,000個隨機數。
  2. 將資料拷貝3份,以備用。
  3. 接下來將通過三個不同的parallel_policy引數來呼叫同樣的sort演算法。每次呼叫記錄開始和結束時間。
  4. 第一次呼叫使用std::execution::seq引數。
  5. 輸出本次測試所使用的時間。
  6. 第二次呼叫使用std::execution::par引數。
  7. 第三次呼叫使用std::execution::par_unseq引數。

程式輸入如下:

Sequenced sort consuming 4464ms.
Parallel sort consuming 459ms.
Parallel unsequenced sort consuming 168ms.

可以看到,效能最好和效能最差相差了超過26倍。