1. 程式人生 > >聊聊C++執行緒同步機制

聊聊C++執行緒同步機制

  執行緒同步是一個經常出現的場景,考慮一個生產者消費者模式,一個執行緒作為生產,一個執行緒作為消費。生產者往一個佇列中加入元素,消費者往一個佇列中取元素。實現對一個公共區域的同時訪問操作,是C++多執行緒經常會遇到的問題,所以C++提供了執行緒同步的機制。

  1.消費者輪詢。

參考下面程式碼:一個執行緒執行Producer,一個執行緒執行Comsumer,共同操作一個佇列,這會導致嚴重問題:CPU會被跑滿、一個執行緒加任務,一個執行緒取任務,導致共享資源被同時訪問。當任務較少時,消費者迴圈訪問佇列,CPU被無效使用。這種方式好比你無時無刻都要去檢查你錢包裡的錢有沒有多出來,有的話就用了它,這樣你估計會累死。

std::queue<Element> que;

void Producer()
{
    while (true)                 //迴圈放資料
    {
        que.push(Element());
    }
}
void Comsumer()
{
    while (true)         //有資料,取資料
    {
        if (!que.empty())
            que.pop();
    }
}
int main()
{
    std::thread thProducer(Producer);
    std::thread thComsumer(Comsumer);
    //......省略以下程式碼
    return 0;
}

解決CPU被無效使用的情況,我們可以用執行緒短暫休眠。則就是第二種方式。 

2.消費者輪詢加延遲。

將消費者和生產者都加上延遲,這樣當沒有任務時,消費者休眠,生產者投放,節約了CPU。這種情況就是,你每隔2小時檢查你的錢包裡的錢,有就花了,沒有就再休息。在任務量小時,出現問題的概率少,在任務量大時,會嚴重出現共享資源同時訪問的問題----生產者消費者同時操作公共佇列。

不恰當的例子就是:兩個人分別往錢包裡放錢和取錢,OK,結果把錢包撤破了,扯破了之後誰都用不了了。所有我們希望消費者和生成者任一時刻,只有一個人在訪問公共的佇列。

void Comsumer()
{
    while (true)         //有資料,取資料
    {
        if (!que.empty())
            que.pop();
        if (que.empty())
            std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

引出我們的第三種方式,互斥量 。

3.使用std::mutex。鎖的特點是:當獲取鎖操作失敗時,執行緒會進入睡眠,等待鎖釋放時被喚醒

保證每次取公共資料的時候,先獲取mutex,使得每次只能有一個執行緒去訪問公共資源。這樣,往錢包裡放錢和取錢的人,同一時刻只有一個,不至於把錢包弄破。程式碼如下:生產者一秒取鎖生成一個數據,然後釋放鎖(作用域退出釋放),休眠。消費者還是輪序檢查是否有資料,有資料就取,沒資料就輪詢。這還是存在浪費CPU的問題:當生成者一直沒有資料,消費者就一個輪訓,但是保證了每次只要一個人去訪問佇列,個人認為:當持鎖時間相對長時,則需要其他的執行緒等待相對長的時間時,這種情況比較適用,不浪費CPU,而且協調效率高。(意思就是:沒活幹,先睡覺,睡醒了再幹)。

優點是:實現了共享資料的有序訪問。

缺點:

1.搶鎖的順序不可控。因為只有強鎖和解鎖的過程,並沒有規定誰先搶。(如果需要順序,可使用訊號量,可參考同步和互斥的概念區別)

2.在生產者沒有生產資料的時候,消費者會被喚醒檢查。

適用場景是:執行緒佔用臨界資源相對較長(執行緒休眠的作用顯現出來),一個執行緒在使用時,另一個執行緒先休眠。

std::queue<Element> que;
std::mutex mu;
void Producer()
{
    while (true)                 //迴圈放資料
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::lock_guard<std::mutex> lock(mu);
        que.push(Element());
        std::cout << "thread = " << std::this_thread::get_id() << " push" << std::endl;
    }
}
void Comsumer()
{
    while (true)         //有資料,取資料
    {
        std::lock_guard<std::mutex> lock(mu);
        if (!que.empty())
        {
            que.pop();
            std::cout << "thread = " << std::this_thread::get_id() << " pop" << std::endl;
        }
    }
}

如果場景是:相對長的時間間隔才會發生一個訊息,然後我們去主動叫執行緒喚醒。最好的情況是,生產一個數據,就發一個訊息告訴消費者,我產生資料了,然後消費者消費。避免了浪費鎖,也避免浪費CPU資源。引出下面一個適用場景。

4.使用std::condition_variable。

std::condition_variable是又一種執行緒同步機制,在每次有元素加進佇列時,傳送訊息喚醒執行緒,當沒有資料時,消費者等待被喚醒。這樣就保證了有事件到達時,才喚醒消費者去處理。這就好比,錢包裡有錢了,然後告訴你,你再去拿來消費。注意這裡使用的是unique_lock,它保證了當沒有訊息時,能把鎖解開,讓其他執行緒使用,不至於造成死鎖。但是有一個問題,消費者必須比生成者先生成,不然消費者的傳送的喚醒訊息,沒辦法被接收到,導致明明有資料,確不能被使用。所以程式碼如下:

優點:不必浪費多餘的CPU,有一個訊息處理一個數據

缺點:當生產者生成資料很快,消費資料跟不上,導致沒有那麼多能力去處理已經在que裡的任務,則喚醒訊息cv.nodify()就沒有發揮作用,導致雖然有任務在佇列裡,卻無人處理。

適用場景:任務量不多,處理任務的速度相對較快。(保證每個任務,都有人處理)當然也可以增開多個執行緒作為消費者執行緒。

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>

typedef struct Element
{
}Element;

std::condition_variable cv;
std::queue<Element> que;
std::mutex mu;

void Producer()
{
    while (true)                 //迴圈放資料
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::lock_guard<std::mutex> lock(mu);
        que.push(Element());
        std::cout << "thread = " << std::this_thread::get_id() << " push" << std::endl;
        cv.notify_one();
    }
}
void Comsumer()
{
    while (true)         //有資料,取資料
    {
        std::unique_lock<std::mutex> lock(mu);
        cv.wait(lock);
        que.pop();
        std::cout << "thread = " << std::this_thread::get_id() << " pop" << std::endl;
    }
}

int main()
{
    std::thread thComsumer(Comsumer);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::thread thProducer(Producer);

    std::this_thread::sleep_for(std::chrono::hours(1));
}

引出下面一個場景:生成者速度快,消費者佔用週期小,處理時間相對長(休眠幾乎用不上)。boost中spinlock自旋鎖給我們提供了這樣一個機制

5.使用boost::spinlock 

每次投放的任務比較多,消費者佔用的共享資源時間短,spinlock是一個好的選擇,因為任務大部分時間都是在被處理當中,幾乎沒有休眠的過程。則我們使用一個加鎖量級輕的加鎖方式,避免執行緒休眠,spinlock在等待鎖的時候,不休眠,進行輪詢檢查,如果任務多的時候,充分利用了CPU。

優點:充分利用CPU。

缺點:任務持鎖長時,空等時間過長,浪費CPU。

適用場景:持鎖時間短,任務量大。

boost::detail::spinlock splock;

void Producer()
{
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::lock_guard<boost::detail::spinlock> lock(splock);
        for (int i = 0; i < 10; i++)
        {
            que.push(Element());
            std::cout << "thread = " << std::this_thread::get_id() << " push" << std::endl;
        }
    }
}
void Comsumer()
{
    while (true)
    {
        std::lock_guard<boost::detail::spinlock> lock(splock);
        if (!que.empty())
        {
            que.pop();
            std::cout << "thread = " << std::this_thread::get_id() << " pop" << std::endl;
        }
    }
}

6.使用boost的shared_mutex

讀寫鎖:boost提供的是shared_mutex,OK名不符意。區分讀和寫,處於讀操作時,可以允許多個執行緒同時獲得讀操作。但是同一時刻只能有一個執行緒可以獲得寫鎖。其它獲取寫鎖失敗的執行緒都會進入睡眠狀態,直到寫鎖釋放時被喚醒。 寫鎖會阻塞其它讀寫鎖。當有一個執行緒獲得寫鎖在寫時,讀鎖也不能被其它執行緒獲取;寫優先於讀,當有執行緒因為等待寫鎖而進入睡眠時,則後續讀者也必須等待 

優點:區分了讀寫,在讀操作多的時候,只需要輕量級加鎖,即可訪問到正確值。

缺點:在寫操作頻繁時,作用不顯著,還新增了寫鎖的資源。
適用場景:讀取資料的頻率遠遠大於寫資料的頻率的場合。

std::queue<Element> que;
boost::shared_mutex shmu;

void Producer()
{
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        std::unique_lock<boost::shared_mutex> lock(shmu);   //加寫鎖

        que.push(Element());
        std::cout << "thread = " << std::this_thread::get_id() << " push" << std::endl;
    }
}
void Comsumer1()
{
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        boost::shared_lock<boost::shared_mutex> lock(shmu);
        std::cout << "thread = " << std::this_thread::get_id() << " read" << std::endl;
    }
}
void Comsumer2()
{
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        boost::unique_lock<boost::shared_mutex> lock(shmu);
        if (!que.empty())
        {
            que.pop();
            std::cout << "thread = " << std::this_thread::get_id() << " pop" << std::endl;
        }
    }
}

int main()
{
    std::thread thProducer(Producer);
    std::thread thComsumer1(Comsumer1);
    std::thread thComsumer2(Comsumer2);

    std::this_thread::sleep_for(std::chrono::hours(1));
}

個人理解是沒一種方式都會有自己使用的場景,不能說哪種一定好,哪種一定不好,即使第一種輪詢的方法,都會在某些場合適用。俗話說什麼場合幹什麼樣的事情。