1. 程式人生 > 其它 >C++ 11中的多執行緒

C++ 11中的多執行緒

介紹C++11,14,17中的多執行緒使用 目錄

C++11多執行緒簡單使用

開篇介紹

C++11中引入了多執行緒標頭檔案<thread>,讓我們能更方便的使用多執行緒進行程式設計

void TestThread(int index)
{
    std::cout << "Child Thread" << index << " id " << std::this_thread::get_id() << std::endl;
    std::cout << "Child Thread" << index << "Stop" << std::endl;
}

int main()
{
    std::thread newThread1(TestThread, 1);
    std::thread newThread2(TestThread, 2);
    std::cout << "Main Thread id " << std::this_thread::get_id() << std::endl;
    std::cout << "Main Thread Stop" << std::endl;
    if (newThread1.joinable())
        newThread1.join();
    if (newThread2.joinable())
        newThread2.join();
}

眾所周知,執行緒具有非同步性,也就是說這道程式的推進的方向是不確定的,而事實也正是如此

有兩句話說得好,彙總一下就是:join()detach()總要呼叫一個,並且呼叫之前最好是要進行joinable()檢查

Never call join() or detach() on std::thread object with no associated executing thread

Never forget to call either join or detach on a std::thread object with associated executing thread

如果對一個子執行緒執行兩次join()

操作,那麼會丟擲異常

void TestThread(int index)
{
    std::cout << "Child Thread" << index << " id " << std::this_thread::get_id() << std::endl;
    std::cout << "Child Thread" << index << "Stop" << std::endl;
}

int main()
{
    std::thread newThread(TestThread, 1);
    newThread.join();
    newThread.join();
}

cppreference中提到,當joinable()的執行緒被賦值或析構的時候,會呼叫std::terminate(),而一般std::terminate()意味著std::abort(),也就是說如果對一個執行緒既不執行join()操作也不執行detach()操作,而它卻被析構了,那麼“it will cause the program to Terminate(終止)”

void TestThread(int index)
{
    std::cout << "Child Thread" << index << " id " << std::this_thread::get_id() << std::endl;
    std::cout << "Child Thread" << index << "Stop" << std::endl;
}

int main()
{
    std::thread newThread(TestThread, 1);
    newThread = std::thread(TestThread, 2);

    std::cout << "Main Thread Sleep Begin" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Main Thread Sleep End" << std::endl;
    if (newThread.joinable())
        newThread.join();
}

拋開Bug不談,由於執行緒的非同步性,誰也說不清是先輸出Thread1還是輸出Thread2

針對這個Bug,可以這麼理解:執行緒1由物件newThread管理,當C++底層告知作業系統去建立一個新執行緒並給它分配一些任務後,卻馬上建立了一個執行緒2交給newThread,這樣子產生了一個覆蓋操作,導致執行緒1被析構,而它又是可結合的,所以程式被terminate

正確的做法是

int main()
{
    std::thread newThread(TestThread, 1);
    if (newThread.joinable())
        newThread.join();
    newThread = std::thread(TestThread, 2);
    if (newThread.joinable())
        newThread.join();
}

執行緒的構造

通過前文中的示範,我們瞭解到可以通過傳入一個函式指標來給一個執行緒分配它的“任務”

除了函式指標外,還可以通過lambda表示式,std::functionstd::bind,仿函式物件等來解決

struct Handle
{
    void operator()(int data) { std::cout << data << std::endl; }
};

int main()
{
    std::thread t(Handle(), 10);
    if (t.joinable())
        t.join();
}

join與detach

  • newThread.join()操作代表當前執行緒將阻塞,直到newThread完成它的任務
  • newThread.detach()操作代表newThread與當前執行緒分開(即當前執行緒結束銷燬後並不會同時銷燬newThread),但是程式結束後newThread仍會被終止(即使它還沒執行完,也會被作業系統強行叫停,這也可能會導致資源沒有正確的釋放)
class TestClass
{
public:
    TestClass() { cout << "create" << endl; }
    TestClass(const TestClass& t) { cout << "copy" << endl; }
    void print(int num)
    {
        for (int i = 0; i < num; i++)
            cout << i << ends;
        cout << endl;
    }
};

void Func2()
{
    cout << "start" << endl;
    TestClass t;
    std::thread newThread(&TestClass::print, &t, 10);
    // 讓當前執行緒等待newThread完成
    newThread.join();
    cout << "end" << endl;
}
int main()
{
    std::thread t(Func2);
    if (t.joinable())
        t.join();
    return 0;
}

程式的執行結果為

void repeat1000(int index, int time)
{
    for (int i = 0; i < time; i++)
        cout << index;
}

void detach_test()
{
    thread newThread(repeat1000, 1, 100000);
    newThread.detach();
    newThread = thread(repeat1000, 2, 1000);
    newThread.join();
}

int main()
{
    {
        thread t(detach_test);
        t.join();
    }
    cout << endl << "start wait" << endl;
    this_thread::sleep_for(chrono::seconds(5));
    return 0;
}

我認為這是一個很好的解釋join()detach()的例子,下面來分析一下

  • 首先建立了執行緒t,給它分配了detach_test()任務
  • 主執行緒阻塞 等待t完成它的任務
  • 同時作業系統完成了對t的分配,開始執行detach_test()
  • t程序再建立子執行緒newThread,並給他分配(repeat1000, 1, 100000)任務
  • 作業系統在分配newThread的“同時”,t執行緒將其和newThread分離
  • 此時(repeat1000, 1, 100000)仍然在執行,同時t執行緒給newThread賦了一個新執行緒,任務為(repeat1000, 2, 1000)
  • newThread呼叫join()操作,t程序阻塞,等待newThread(repeat1000, 2, 1000)工作完成
  • newThread(repeat1000, 2, 1000)完成,也意味著t程序的任務完成,此時主執行緒不再受到阻塞
  • t執行緒離開作用域,遭到銷燬。但此時(repeat1000, 1, 100000)操作因為比較耗時所以仍然還在執行
  • 主程式輸出"start wait"後開始休眠5秒鐘,此時執行緒(repeat1000, 1, 100000)仍然在執行
  • 最後程式結束,各種執行緒被作業系統銷燬(即可能(repeat1000, 1, 100000)執行到第八萬次的時候就突然被終止了)

在一般情況下,為了避免detachjoin使用不當造成的程式錯誤,可以建立一個執行緒類,使用解構函式執行執行緒的分離或合併(join)

執行緒傳參時應該避免的操作

應該避免傳入棧上物件的指標

void newThreadCallback(int* p)
{
    std::cout << "Inside Thread: " << *p << std::endl;
    // 等一秒鐘 讓startNewThread()執行結束 使i的記憶體空間被回收
    std::this_thread::sleep_for(std::chrono::seconds(1));
    // 丟擲異常
    *p = 19;
}

void startNewThread()
{
    int i = 10;
    std::cout << "Inside Main Thread: " << i << std::endl;
    std::thread t(newThreadCallback, &i);
    t.detach();
    std::cout << "Inside Main Thread: " << i << std::endl;
}

int main()
{
    startNewThread();
    // 等兩秒鐘 讓所有執行緒和方法都執行完畢再結束程式
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

堆上的資料同理

因為一般堆物件需要使用delete來銷燬,所以也無法確定別的執行緒訪問到指標的時候,它所指的記憶體是否有效

執行緒的引用傳參

使用std::ref或者std::cref,使用方法幾乎和std::bind中使一致的,所以不再贅述

多執行緒中的競爭

作業系統應該學過,競爭就是多個執行緒同時訪問一塊記憶體區域,導致不可預估的結果

class Wallet
{
    int money;
public:
    Wallet() : money(0) {}

    int getMoney() const { return money; }
    void addMoney(int increase)
    {
        for (int i = 0; i < increase; ++i)
            money++;
    }
};

int testMultiThreadWallet()
{
    Wallet walletObject;
    std::vector<std::thread> threads;
    // 建立五條執行緒非同步訪問Wallet "理應"得到的結果為5000
    threads.reserve(5);
    // reserve對應_back而resize對應[i]
    for (int i = 0; i < 5; ++i)
        threads.emplace_back(&Wallet::addMoney, &walletObject, 1000);

    // 等所有執行緒執行完再結束
    for (auto& thread : threads)
        thread.join();
    return walletObject.getMoney();
}

int main()
{
    int val = 0;
    for (int k = 0; k < 1000; k++)
    {
        if ((val = testMultiThreadWallet()) != 5000)
            std::cout << "Error at count = " << k << " Money in Wallet = " << val << std::endl;
    }
    return 0;
}

某次程式執行的結果為,這種結果是不確定的,可能1000次實驗,一次都不會出錯,也可能出現多至十多次錯誤

這一行短短的程式碼其實發生了三件事

money++;
  • money的值載入進暫存器中
  • 在暫存器中進行計算,即++操作
  • 將暫存器中的結果存回money所在的記憶體中

而當由於執行緒具有非同步性,在不加鎖的情況下我們無法控制多條執行緒對money的訪問順序,那麼就可能出現以下這種情況

  • 假設money的初始值是43
  • money的值被執行緒1取出放入暫存器1中
  • money的值被執行緒2取出放入暫存器2中
  • 暫存器1進行計算得到結果44
  • 暫存器2進行計算得到結果44
  • 暫存器1將結果放入money所在記憶體中,money為44
  • 暫存器2將結果放入money所在記憶體中,money為44

解決執行緒中的競爭

作業系統中的PV操作與之類似,關鍵就是加鎖

std::mutex

使用互斥鎖解鎖上文中的錢包問題

#include<mutex>
class Wallet
{
    int money;
    mutex mutexLock;
public:
    Wallet() : money(0) {}

    int getMoney() const { return money; }
    void addMoney(int increase)
    {
        mutexLock.lock();
        for (int i = 0; i < increase; ++i)
            money++;
        mutexLock.unlock();
    }
};

但是如果一個執行緒在加鎖後並沒有解鎖,那麼所有其他執行緒將會一直等待,導致程式無法結束(當使用join的情況下)

相反的,如果沒上鎖就解鎖

因此可以在此基礎上做一層簡單的封裝

class SmartMutex
{
private:
    mutex mutexLock;
public:
    void AutoLock(const function<void()>& func)
    {
        mutexLock.lock();
        func();
        mutexLock.unlock();
    }
};


class Wallet
{
    int money;
    SmartMutex smartMutex;
public:
    Wallet() : money(0) {}

    int getMoney() const { return money; }
    void addMoney(int increase)
    {
        smartMutex.AutoLock([this, &increase]()
        {
            for (int i = 0; i < increase; ++i)
                this->money++;
        });
    }
};

或者可以使用std::lock_guard

std::lock_guard

std::lock_guard有點像一個智慧指標,在它的作用域結束後,會呼叫解構函式,完成對互斥鎖的解鎖操作

class Wallet
{
    int money;
    mutex mutexLock;
public:
    Wallet() : money(0) {}

    int getMoney() const { return money; }

    void addMoney(int increase)
    {
        // 在lockGuard的建構函式中會自動上鎖
        lock_guard<mutex> lockGuard(mutexLock);
        for (int i = 0; i < increase; ++i)
            this->money++;
        // 離開作用域 lockGuard解構函式呼叫 自動解鎖
    }
};