1. 程式人生 > 其它 >淺談C++11中的多執行緒(二)

淺談C++11中的多執行緒(二)

摘要

本篇文章圍繞以下幾個問題展開:

  1. 程序和執行緒的區別
  2. 何為併發?C++中如何解決併發問題?C++中多執行緒的基本操作淺談C++11中的多執行緒(一) - 唯有自己強大 - 部落格園 (cnblogs.com)
  3. 同步互斥原理以及如何處理資料競爭
  4. Qt中的多執行緒應用

一,同步互斥原理

首先說明兩個專業名詞。

臨界資源:對於同一程序的多個執行緒,程序資源中有些對執行緒是共享的,但有些資源一次只能供一個執行緒使用,這樣的資源被稱為臨界資源,也可以叫做互斥資源,即只能被各個執行緒互斥訪問。

臨界區:執行緒中對臨界資源實施操作的那段程式,稱之為臨界區。

  • 同步是指散步在不同任務之間的若干程式片斷,它們的執行必須嚴格按照規定的某種先後次序來執行,這種先後次序依賴於要完成的特定的任務。比如 A 任務的執行依賴於 B 任務產生的資料。

舉例:假如程式中有一個靜態變數,static int a;執行緒1負責往裡寫入資料,執行緒2需要讀取其中的資料,那麼執行緒2在讀資料之前必須是執行緒1寫入了資料,如果不是,那麼執行緒2必須停下來等待執行緒1的操作結束。這就是執行緒之間在某些地方上的合作關係,協同工作嘛!

  • 互斥是指散步在不同任務之間的若干程式片斷,當某個任務執行其中一個程式片段時,其它任務就不能執行它們之中的任一程式片段,只能等到該任務執行完這個程式片段後才可以執行。

舉例:還是假如程式中有一個靜態變數,static int b;執行緒1想要往裡寫入資料,執行緒2也想要往裡寫入資料,那麼此時靜態變數b就是一個臨界資源(互斥資源),即一次只能被一個執行緒訪問。

同步跟互斥都是針對於執行緒來說的,可以把這理解為是執行緒之間的合作關係和制約關係。可以這樣理解,同一程序的各個執行緒之間不可能是完全獨立的,或多或少會有關係,或是合作關係,或是制約關係。具體的例子放在了同步和互斥的介紹中。

二,如何處理資料競爭

從上面資料競爭形成的條件入手,資料競爭源於併發修改同一資料結構,那麼最簡單的處理資料競爭的方法就是對該資料結構採用某種保護機制,確保只有進行修改的執行緒才能看到資料被修改的中間狀態,從其他訪問執行緒的角度看,修改不是已經完成就是還未開始。

線上程裡也有這麼一把鎖——互斥鎖(mutex),互斥鎖是一種簡單的加鎖的方法來控制對共享資源的訪問。只要某一個執行緒上鎖了,那麼就會強行霸佔公共資源的訪問權,其他的執行緒無法訪問直到這個執行緒解鎖了,從而保護共享資源。

1️⃣ lock與unlock保護共享資源

Mutex全名mutual exclusion(互斥體),是個object物件,用來協助採取獨佔排他方式控制對資源的併發訪問。這裡的資源可能是個物件,或多個物件的組合。為了獲得獨佔式的資源訪問能力,相應的執行緒必須鎖定(lock) mutex,這樣可以防止其他執行緒也鎖定mutex,直到第一個執行緒解鎖(unlock) mutex。mutex類的主要操作函式見下表:

由圖可知互斥鎖只有兩種狀態,即上鎖( lock )和解鎖( unlock )。mutex不僅提供了常規鎖,還為常規鎖可能造成的阻塞提供了嘗試鎖(帶時間的鎖需要帶時間的互斥類timed_mutex支援,具體見下文)。下面先給出一段示例程式碼:

// mutex1.cpp         通過互斥體lock與unlock保護共享全域性變數

#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> 

std::chrono::milliseconds interval(100);

std::mutex mutex;
//兩個全域性變數
int job_shared = 0; //兩個執行緒都能修改'job_shared',mutex將保護此變數 int job_exclusive = 0; //只有一個執行緒能修改'job_exclusive',不需要保護 //此執行緒只能修改 'job_shared' void job_1() { mutex.lock(); std::this_thread::sleep_for(5 * interval); //令‘job_1’持鎖等待 ++job_shared; std::cout << "job_1 shared (" << job_shared << ")\n"; mutex.unlock(); } // 此執行緒能修改'job_shared'和'job_exclusive' void job_2() { while (true) { //無限迴圈,直到獲得鎖並修改'job_shared' if (mutex.try_lock()) { //嘗試獲得鎖成功則修改'job_shared' ++job_shared; std::cout << "job_2 shared (" << job_shared << ")\n"; mutex.unlock(); return; } else { //嘗試獲得鎖失敗,接著修改'job_exclusive' ++job_exclusive; std::cout << "job_2 exclusive (" << job_exclusive << ")\n"; std::this_thread::sleep_for(interval); } } } int main() { std::thread thread_1(job_1); std::thread thread_2(job_2); thread_1.join(); thread_2.join(); getchar(); return 0; }

從上面的程式碼看,建立了兩個執行緒和兩個全域性變數。

  • 全域性變數job_exclusive只有job2呼叫,因此兩執行緒並不共享,不會產生資料競爭,所以不需要鎖保護。
  • 另一個全域性變數job_shared是兩執行緒共享的,會引起資料競爭,因此需要鎖保護。執行緒thread_1持有互斥鎖lock的時間較長,執行緒thread_2為免於空閒等待,使用了嘗試鎖try_lock,如果獲得互斥鎖則操作共享變數job_shared,未獲得互斥鎖則操作排他變數job_exclusive,提高多執行緒效率。

輸出結果:

2️⃣ lock_guard與unique_lock保護共享資源

但lock與unlock必須成對合理配合使用,使用不當可能會造成資源被永遠鎖住,甚至出現死鎖(兩個執行緒在釋放它們自己的lock之前彼此等待對方的lock)。


相關知識點:

是不是想起了C++另一對兒需要配合使用的物件new與delete,若使用不當可能會造成記憶體洩漏等嚴重問題,為此C++引入了智慧指標shared_ptr與unique_ptr。智慧指標借用了RAII技術(Resource Acquisition Is Initialization—使用類來封裝資源的分配和初始化,在建構函式中完成資源的分配和初始化,在解構函式中完成資源的清理,可以保證正確的初始化和資源釋放)對普通指標進行封裝,達到智慧管理動態記憶體釋放的效果。


同樣的,C++也針對lock與unlock引入了智慧鎖lock_guard與unique_lock,同樣使用了RAII技術對普通鎖進行封裝,達到智慧管理互斥鎖資源釋放的效果。lock_guard與unique_lock的區別如下:

lock_guard:

unique_lock:

從上面兩個支援的操作函式表對比來看,unique_lock功能豐富靈活得多。如果需要實現更復雜的鎖策略可以用unique_lock,如果只需要基本的鎖功能,優先使用更嚴格高效的lock_guard。兩種鎖的簡單概述與策略對比見下表:

如果將上面的普通鎖lock/unlock替換為智慧鎖lock_guard,其中job_1函式程式碼修改如下:

void job_1()
{
    std::lock_guard<std::mutex> lockg(mutex);    //獲取RAII智慧鎖,離開作用域會自動析構解鎖
    std::this_thread::sleep_for(5 * interval);  //令‘job_1’持鎖等待
    ++job_shared;
    std::cout << "job_1 shared (" << job_shared << ")\n";
}

如果也想將job_2的嘗試鎖try_lock也使用智慧鎖替代,由於lock_guard鎖策略不支援嘗試鎖,只好使用unique_lock來替代,程式碼修改如下(其餘程式碼和程式執行結果與上面相同):

void job_2()
{
    while (true) {    //無限迴圈,直到獲得鎖並修改'job_shared'
        std::unique_lock<std::mutex> ulock(mutex, std::try_to_lock);        //以嘗試鎖策略建立智慧鎖
        //嘗試獲得鎖成功則修改'job_shared'
        if (ulock) {
            ++job_shared;
            std::cout << "job_2 shared (" << job_shared << ")\n";
            return;
        } else {      //嘗試獲得鎖失敗,接著修改'job_exclusive'
            ++job_exclusive;
            std::cout << "job_2 exclusive (" << job_exclusive << ")\n";
            std::this_thread::sleep_for(interval);
        }
    }
}

3️⃣timed_mutex與recursive_mutex提供更強大的鎖

前面介紹的互斥量mutex提供了普通鎖lock/unlock和智慧鎖lock_guard/unique_lock,基本能滿足我們大多數對共享資料資源的保護需求。但在某些特殊情況下,我們需要更復雜的功能,比如某個執行緒中函式的巢狀呼叫可能帶來對某共享資源的巢狀鎖定需求,mutex在一個執行緒中卻只能鎖定一次;再比如我們想獲得一個鎖,但不想一直阻塞,只想等待特定長度的時間,mutex也沒提供可設定時間的鎖。針對這些特殊需求,< mutex >庫也提供了下面幾種功能更豐富的互斥類,它們間的區別見下表:

繼續用前面的例子,將mutex替換為timed_mutex,將job_2的嘗試鎖tyr_lock()替換為帶時間的嘗試鎖try_lock_for(duration)。由於改變了嘗試鎖的時間,所以在真正獲得鎖之前的嘗試次數也有變化,該變化體現在嘗試鎖失敗後對排他變數job_exclusive的最終修改結果或修改次數上。更新後的程式碼如下所示:

#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> 

std::chrono::milliseconds interval(100);
 
std::timed_mutex tmutex;

int job_shared = 0; //兩個執行緒都能修改'job_shared',mutex將保護此變數
int job_exclusive = 0; //只有一個執行緒能修改'job_exclusive',不需要保護

//此執行緒只能修改 'job_shared'
void job_1()
{
    std::lock_guard<std::timed_mutex> lockg(tmutex);    //獲取RAII智慧鎖,離開作用域會自動析構解鎖
    std::this_thread::sleep_for(5 * interval);  //令‘job_1’持鎖等待
    ++job_shared;
    std::cout << "job_1 shared (" << job_shared << ")\n";
}

// 此執行緒能修改'job_shared'和'job_exclusive'
void job_2()
{
    while (true) {    //無限迴圈,直到獲得鎖並修改'job_shared'
        std::unique_lock<std::timed_mutex> ulock(tmutex,std::defer_lock);   //建立一個智慧鎖但先不鎖定
        //嘗試獲得鎖成功則修改'job_shared'
        if (ulock.try_lock_for(3 * interval)) {     //在3個interval時間段內嘗試獲得鎖
            ++job_shared;
            std::cout << "job_2 shared (" << job_shared << ")\n";
            return;
        } else {      //嘗試獲得鎖失敗,接著修改'job_exclusive'
            ++job_exclusive;
            std::cout << "job_2 exclusive (" << job_exclusive << ")\n";
            std::this_thread::sleep_for(interval);
        }
    }
}

int main() 
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);
 
    thread_1.join();
    thread_2.join();

    getchar();
    return 0;
}

前章解答

前一篇文章中最後的程式執行結果可能會出現某行與其他行交疊錯亂的情況,主要是由於不止一個執行緒併發訪問了std::cout顯示終端資源導致的,解決方案就是對cout << “somethings” << endl語句加鎖,保證多個執行緒對cout資源的訪問同步。為了儘可能降低互斥鎖對效能的影響,應使用微粒鎖,即只對cout資源訪問語句進行加鎖保護,cout資源訪問完畢儘快解鎖以供其他執行緒訪問該資源。新增互斥鎖保護後的程式碼如下:

//thread2.cpp  增加對cout顯示終端資源併發訪問的互斥鎖保護

#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>

using namespace std;

std::mutex mutex1;

void thread_function(int n)
{
    std::thread::id this_id = std::this_thread::get_id();       //獲取執行緒ID

    for (int i = 0; i < 5; i++) {
        mutex1.lock();
        cout << "子執行緒" << this_id << " 執行 : " << i + 1 << endl;
        mutex1.unlock();
        std::this_thread::sleep_for(std::chrono::seconds(n));   //程序睡眠n秒
    }
}

class Thread_functor
{
public:
    // functor行為類似函式,C++中的仿函式是通過在類中過載()運算子實現,使你可以像使用函式一樣來建立類的物件
    void operator()(int n)
    {
        std::thread::id this_id = std::this_thread::get_id();

        for (int i = 0; i < 5; i++) {
            {
                std::lock_guard<std::mutex> lockg(mutex1);
                cout << "子仿函式執行緒" << this_id << " 執行: " << i + 1 << endl;
            }
            std::this_thread::sleep_for(std::chrono::seconds(n));   //程序睡眠n秒
        }
    }
};


int main()
{
    thread mythread1(thread_function, 1);      // 傳遞初始函式作為執行緒的引數
    if (mythread1.joinable())                  //判斷是否可以成功使用join()或者detach(),返回true則可以,返回false則不可以
        mythread1.join();                     // 使用join()函式阻塞主執行緒直至子執行緒執行完畢

    Thread_functor thread_functor;
    thread mythread2(thread_functor, 3);     // 傳遞初始函式作為執行緒的引數
    if (mythread2.joinable())
        mythread2.detach();                  // 使用detach()函式讓子執行緒和主執行緒並行執行,主執行緒也不再等待子執行緒

    auto thread_lambda = [](int n) {
        std::thread::id this_id = std::this_thread::get_id();
        for (int i = 0; i < 5; i++)
        {
            mutex1.lock();
            cout << "子lambad執行緒" << this_id << " 執行: " << i + 1 << endl;
            mutex1.unlock();
            std::this_thread::sleep_for(std::chrono::seconds(n));   //程序睡眠n秒
        }
    };

    thread mythread3(thread_lambda, 4);     // 傳遞初始函式作為執行緒的引數
    if (mythread3.joinable())
        mythread3.join();                     // 使用join()函式阻塞主執行緒直至子執行緒執行完畢

    unsigned int n = std::thread::hardware_concurrency();       //獲取可用的硬體併發核心數
    mutex1.lock();
    std::cout << n << " 支援併發執行緒" << endl;
    mutex1.unlock();
    std::thread::id this_id = std::this_thread::get_id();

    for (int i = 0; i < 5; i++) {
        {
            std::lock_guard<std::mutex> lockg(mutex1);
            cout << "主執行緒" << this_id << " 執行: " << i + 1 << endl;
        }
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    getchar();
    return 0;
}

多次執行結果:(已解決)

參考博文:(2條訊息) C++多執行緒併發(二)---執行緒同步之互斥鎖_流雲-CSDN部落格_c++多執行緒互斥