1. 程式人生 > >C++11多執行緒---互斥量、鎖、條件變數的總結

C++11多執行緒---互斥量、鎖、條件變數的總結

關於互斥量std::mutex的總結

互斥量用於組成程式碼的臨界區。C++的多執行緒模型是基於記憶體的,或者說是基於程式碼片段的,這和我們作業系統學習的臨界區概念基本一致,但是與Golang不同,Golang是基於訊息模型的。

一個std::mutexlock()unlock()之間的程式碼片段組成一個臨界區,這個臨界區內部同時最多隻能有一個執行緒進行訪問,可以理解為這個片段內部的程式碼是受到保護的,不會被多執行緒同時訪問造成不可預知的問題。

std::mutex mtx;
mtx.lock();
// 這裡的程式碼,同時最多隻能有一個執行緒進行訪問
mtx.unlokc
();

當一個執行緒獲取到一個std::mutex並且呼叫lock()後,必須由同一個執行緒呼叫unlock()操作,因此一定要注意兩個函式成對出現,否則會造成死鎖。

當一個執行緒獲取一個std::mutex並呼叫lock()函式後,其他執行緒的呼叫會失敗。可以使用try_lock進行判別,具體細節參考:https://en.cppreference.com/w/cpp/thread/mutex

關於std::lock_guardstd::unique_lock的總結

std::lock_guard比較好理解,因為呼叫mutex需要時刻記著解鎖,所用這個類封裝了一系列的操作,在一個模組中構造了一個std::lock_guard

後,相當於對該結構塊加鎖,當執行緒離開結構塊後,std::lock_guard自動析構,相當於解鎖。它的結構簡單、速度快,但是功能比較少。

std::unique_lock是對std::lock_guard功能的一個拓展,功能更多,但是速度會慢一些,具體參照:https://en.cppreference.com/w/cpp/thread/unique_lock

關於條件變數std::condition_variable的總結

這個相當於作業系統中的waitsignal原語操作,需要結合一個std::unique_lock組成的臨界區共同完成功能。wait & signal原語操作最典型的特點是 “阻塞自己,喚醒別人”。可以這麼理解,如果當前滿足特定條件不滿足,那麼就不能進入臨界區,當前執行緒阻塞。當其他執行緒處理完後,使得條件滿足了,執行緒會喚醒那些處於阻塞狀態的執行緒,使之重新進入。直接通過下面的程式碼來說明,經典的生產者和消費者問題。

注意只有一個互斥量的時候,喚醒順序的問題,參照官網:https://en.cppreference.com/w/cpp/thread/condition_variable

在這裡給出一個更加簡潔的例子:

void worker_thread()
{
    // Wait until main() sends data
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, pred});
 	// 這裡執行工作程式碼,注意下面兩個語句的順序
    lk.unlock();
    cv.notify_one();
}

上述程式碼中,先執行lk.unlock(),說明當前執行緒放棄對臨界區的所有權,此時再呼叫notify_one會喚醒其他執行緒來對臨界區執行操作。如果先喚醒其他執行緒,則可能unlock未執行完畢,就有執行緒到臨界區了,此時新來的執行緒又會阻塞了。

程式碼示例

下面程式碼總共是3個例子。第一個Application是模擬一個事件處理系統的,但是沒有使用條件變數,自己實現了一下,第二個Application使用了條件變數。第三個ProduceAndConsume是典型的生產者和消費者模型。

未使用條件變數,僅僅藉助迴圈的載入資料應用

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>
#include <random>
#include <cstdlib>

const int MAXT = 1000;
std::uniform_int_distribution<int>dis(1, MAXT);
std::random_device rd;
std::mt19937 gen(rd());

class Application {
public:

    void mainTask() {
        std::cout << "Do some main task...\n";
        auto t = dis(gen);  // 隨機時間模擬主線任務
        std::this_thread::sleep_for(std::chrono::milliseconds(t));
        std::cout << "Finish main task in " << t << " ms\n";
        
        mtx.lock();
        while (!m_bDataLoaded) {
            mtx.unlock();
            std::this_thread::sleep_for(std::chrono::microseconds(100));
            mtx.lock();
        }
        mtx.unlock();

        std::cout << "Get loaded data\n";
    }

    void loadData() {
        std::cout << "Loading data...\n";
        auto t = dis(gen);  // 隨機時間模擬主線任務
        std::this_thread::sleep_for(std::chrono::milliseconds(t));
        std::lock_guard<std::mutex>lck(mtx);
        m_bDataLoaded = true;
        std::cout << "Finish loading data in " << t << " ms\n";
    }

    bool isDataLoaded()const {
        return m_bDataLoaded;
    }

private:
    bool m_bDataLoaded{ false };
    std::mutex mtx;
    std::condition_variable m_convVar;
};

int main() {
    Application app;
    std::thread t1(&Application::mainTask, &app);
    std::thread t2(&Application::loadData, &app);
    t1.join();
    t2.join();
    system("pause");
    return 0;
}

結果:

使用了條件變數的載入資料應用

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>
#include <random>
#include <cstdlib>

const int MAXT = 1000;
std::uniform_int_distribution<int>dis(1, MAXT);
std::random_device rd;
std::mt19937 gen(rd());

class Application {
public:

    void mainTask() {
        std::cout << "Do some main task...\n";
        auto t = dis(gen);  // 隨機時間模擬主線任務
        std::this_thread::sleep_for(std::chrono::milliseconds(t));
        std::cout << "Finish main task in " << t << " ms\n";

        std::unique_lock<std::mutex> lck(mtx);
        m_convVar.wait(lck, std::bind(&Application::isDataLoaded, this));
        std::cout << "Get loaded data\n";
    }

    void loadData() {
        std::cout << "Loading data...\n";
        auto t = dis(gen);  // 隨機時間模擬載入資料任務
        std::this_thread::sleep_for(std::chrono::milliseconds(t));
        std::cout << "Finish loading task in " << t << " ms\n";

        std::unique_lock<std::mutex> lck(mtx);
        m_bDataLoaded = true;
        lck.unlock();   // 最好是新增上這一句,本例子無所謂
        m_convVar.notify_one();
    }

    bool isDataLoaded()const {
        return m_bDataLoaded;
    }

private:
    bool m_bDataLoaded{ false };
    std::mutex mtx;
    std::condition_variable m_convVar;
};

int main() {
    Application app;
    std::thread t1(&Application::mainTask, &app);
    std::thread t2(&Application::loadData, &app);
    t1.join();
    t2.join();
    system("pause");
    return 0;
}

執行結果:在這裡插入圖片描述

生產者和消費者模型

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <random>
#include <functional>
#include <algorithm>
#include <queue>
#include <vector>
#include <cstdlib>

const int MAXT = 1000;
const int MAXN = 5;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int>dis(MAXT / 10, MAXT);

class ProducerAndConsumer {
public:

    void prodece() {
        // 隨機時間,模擬貨物生產過程,生產過程本身不是在臨界區
        std::this_thread::sleep_for(std::chrono::milliseconds(2 * dis(gen)));
        std::cout << "Prodece\n";

        std::unique_lock<std::mutex> lck(mtx);
        m_convPro.wait(lck, std::bind(&ProducerAndConsumer::notFull, this));
        m_qCargo.push(m_iCargoNum);
        ++m_iCargoNum;
        m_convCon.notify_one();
    }

    void consume() {
        std::unique_lock<std::mutex> lck(mtx);
        m_convCon.wait(lck, std::bind(&ProducerAndConsumer::notEmpty, this));
        int n = m_qCargo.front();
        m_qCargo.pop();
        m_convPro.notify_one();
        lck.unlock();  // 一定要先解鎖

        // 隨機時間,模擬貨物消費過程,消費過程本身不是在臨界區!!!
        std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
        std::cout << "Consume\n";

    }

    inline bool notFull()const {
        return m_qCargo.size() < m_iMaxCargoNum;
    }

    inline bool notEmpty()const {
        return m_qCargo.size() > 0;
    }

    inline int getBufferSize()const {
        return m_qCargo.size();
    }

    inline int getCargoNum()const {
        return m_iCargoNum;
    }

private:
    std::queue<int> m_qCargo;   // 貨物佇列
    int m_iMaxCargoNum{ MAXN }; // 最大容量
    int m_iCargoNum{ 0 };        // 貨物總數
    std::mutex mtx;
    std::condition_variable m_convPro, m_convCon;
};

int main() {
    ProducerAndConsumer pac;

    auto N = std::thread::hardware_concurrency();
    std::cout << "Thread num: " << N << std::endl;

    std::vector<std::thread>producerThreads;
    std::vector<std::thread>consumerThreads;

    for (int i = 0; i < N; ++i) {
        producerThreads.emplace_back(std::thread(&ProducerAndConsumer::prodece, &pac));
        consumerThreads.emplace_back(std::thread(&ProducerAndConsumer::consume, &pac));
    }

    std::for_each(producerThreads.begin(), producerThreads.end(),
        std::mem_fn(&std::thread::join));
    std::for_each(consumerThreads.begin(), consumerThreads.end(),
        std::mem_fn(&std::thread::join));
    
    std::cout << "Cargo in buffer: " << pac.getBufferSize() << std::endl;
    std::cout << "Cargo count: " << pac.getCargoNum() << std::endl;

    system("pause");
    return 0;
}

執行結果:
在這裡插入圖片描述