1. 程式人生 > 其它 >python是如何實現程序池和執行緒池的_C++11實現的100行執行緒池

python是如何實現程序池和執行緒池的_C++11實現的100行執行緒池

技術標籤:python是如何實現程序池和執行緒池的

(給CPP開發者加星標,提升C/C++技能)

來源:程某有一計 https://segmentfault.com/a/1190000022456590

【導讀】:C++執行緒池一直都是各位程式設計師們造輪子的首選專案之一。今天,小編帶大家一起來看看這個輕量的執行緒池,本執行緒池是header-only的,並且整個檔案只有100行,其中C++的高階用法有很多,很值得我們學習,一起來看看吧。

以下是正文


執行緒池

C++帶有執行緒操作,非同步操作,就是沒有執行緒池,至於執行緒池的概念,我先搜一下別人的解釋:

一般而言,執行緒池有以下幾個部分:

1. 完成主要任務的一個或多個執行緒。

2. 用於排程管理的管理執行緒。

3. 要求執行的任務佇列。

我來講講人話:你的函式需要在多執行緒中執行,但是你又不能每來一個函式就開啟一個執行緒,所以你就需要固定的N個執行緒來跑執行,但是有的執行緒還沒有執行完,有的又在空閒,如何分配任務呢,你就需要封裝一個執行緒池來完成這些操作,有了執行緒池這層封裝,你就只需要告訴它開啟幾個執行緒,然後直接塞任務就行了,然後通過一定的機制獲取執行結果。

這裡有一個100行實現執行緒池的操作:

https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

分析原始碼 標頭檔案

#include #include #include #include #include #include #include #include #include 

vector,queue,momory 都沒啥說的,thread執行緒相關,mutex 互斥量,解決資源搶佔問題,condition_variable 條件量,用於喚醒執行緒和阻塞執行緒,future 從使用的角度出發,它是一個獲取執行緒資料的函式。functional 函式子,可以理解為規範化的函式指標。stdexcept 就跟它的名字一樣,標準異常。

class ThreadPool {public:    ThreadPool(size_t);    template<class F, class... Args>    auto enqueue(F&& f, Args&&... args)         -> std::future<typename std::result_of::type>;    ~ThreadPool();private:    // need to keep track of threads so we can join them    std::vector< std::thread > workers;    // the task queue    std::queue< std::function<void()> > tasks;        // synchronization    std::mutex queue_mutex;    std::condition_variable condition;    bool stop;};

執行緒池的宣告,建構函式,一個enqueue模板函式 返回std::future, 然後這個type又利用了執行時檢測(還是編譯時檢測?)推斷出來的,非常的amazing啊。成功的使用一行程式碼反覆套娃,這高階的用法就是大佬的水平嗎,i了i了。

workers 是vector<:thread> 俗稱工作執行緒。

std::queue<:function>> tasks 俗稱任務佇列。

那麼問題來了,這個任務佇列的任務只能是void() 型別的嗎?感覺沒那麼簡單,還得接著看吶。

mutex,condition_variable 沒啥講的,stop 控制執行緒池停止的。

// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(size_t threads)    :   stop(false){    for(size_t i = 0;i        workers.emplace_back(            [this]            {                for(;;)                {                    std::function task;                    {                        std::unique_lock<:mutex> lock(this->queue_mutex);                        this->condition.wait(lock,                            [this]{ return this->stop || !this->tasks.empty(); });                        if(this->stop && this->tasks.empty())                            return;                        task = std::move(this->tasks.front());                        this->tasks.pop();                    }                    task();                }            }        );}

大佬寫的註釋就是這麼樸實無華,說這個建構函式僅僅是把一定數量的執行緒塞進去,我是看了又看才悟出來這玩意是什麼意思……雖然本質上的確是它說的只是把執行緒塞進去,但是這個執行緒也太繞了。

workers.emplace_back 引數是一個lambda表示式,不會阻塞,也就是說最外層的是一個非同步函式,每個執行緒裡面的事情才是重點。

labmda表示式中最外層是一個死迴圈,至於為什麼是for(;;)而不是while(1) 這雖然不是重點,不過大佬的用法還是值得揣摩的,我估計是效率會更高?

task 申明後,緊跟著一個大括號,這個{}裡面的部分,是一個同步操作,至於為什麼用this->lock 而不是直接使用[&]來捕獲引數,想來也是處於記憶體考慮。精打細算的風格像極了摳門的地主,i了i了。

緊接著一個wait(lock,condtion)的操作,像極了千層餅的套路。

第一層:這TM不是要鎖死自己啊?這樣不是構造都得卡死?

第二層:我們看到它emplace_back了一個執行緒,不會阻塞,但是等開鎖,鎖不就在它自己的執行緒裡面嘛?那不得鎖死了啊?

第三層:我們看到這個lock其實只是個包裝,真正的鎖是外層的mutex,所以從這裡是不存在死鎖的。但是你的wait的condition怎麼可能不懂呢,必須要 stop 或者 !empty 才wait嗎?

第四層:我們查資料發現後面的condition是返回false才會wait,也就是說要!stop && empty才會wait,就是說這個執行緒池是 執行態,並且沒有任務才才會執行等待操作!否則就不等了,直接衝!

第五層:既然你判斷了上面判斷了stop和非空,為啥下面還要判斷stop和空才退出呢?不顯得冗餘?

第六層:要確定它的確是被置為stop了,且佇列執行空了,它才能夠光榮退休。有沒有問題呢,有,最後所有執行緒都阻塞了,你stop置為true它們也不知道啊……

我估計它的stop會有喚醒所有執行緒的操作,不過如果有的在執行,有的在等待,應該沒辦法都通知到位,但是在執行的在下一次判斷的時候也能正常退出。

因為有了疑惑,我們就想看stop相關的操作,結果發現放在了解構函式裡面……

// the destructor joins all threadsinline ThreadPool::~ThreadPool(){    {        std::unique_lock<std::mutex> lock(queue_mutex);        stop = true;    }    condition.notify_all();    for(std::thread &worker: workers)        worker.join();}

{}裡面上鎖進行了stop為true的操作,至於為什麼不用原子操作,我也不知道,但是仔細想了下大概是因為本來就有一把鎖了,再用原子就不是內味兒了。然後它果然通知了所有,並且還把工作執行緒join了。也就是等它們結束。

結束了千層餅の解析之後,我們看看最重要的入隊操作

// add new work item to the pooltemplate<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args)     -> std::future<typename std::result_of::type>{    using return_type = typename std::result_of::type;    auto task = std::make_shared< std::packaged_task >(            std::bind(std::forward(f), std::forward(args)...)        );            std::future res = task->get_future();    {        std::unique_lock<std::mutex> lock(queue_mutex);        // don't allow enqueueing after stopping the pool        if(stop)            throw std::runtime_error("enqueue on stopped ThreadPool");        tasks.emplace([task](){ (*task)(); });    }    condition.notify_one();    return res;}

typename std::result_of::type中的typename 應該是為消除歧義的,或者因為巢狀依賴名字的關係,做為一個堅決不寫模板的普通程式設計師,這段程式碼太難了……-> type 我倒是知道怎麼回事,就是指明它的返回型別的一種方式result_of 應該是指明瞭F是一個函式,簽名為Args...這個變參,Args是啥它不關係,它關心的是返回值的引數型別 所以有個type。


至於為什麼函式入口是一個右值引用那就超出我的理解範圍了。難道說functional 必須要右值引用?那它的銷燬誰來管呢?這個執行緒來管嗎?這些坑我以後慢慢填。

前面我們說了tasks 只能接收void() 的函式型別,這裡使用std::packaged_task完成對函式型別的推導,至於為什麼不用 function ,因為這還不是最終放入tasks的物件,它要承接一個返回future的工作,而package_task就是來打包返回future的……

然後就是加鎖入隊+通知工作執行緒+返回future的操作。本來是執行緒池最難理解的部分,反而顯得平淡無奇了,因為前面那些花裡胡哨的操作已經很好的打通了我們的理解能力。對於這個操作本來就有一點概念的,就顯得有種“就這?”的感覺……

好了,今天也是上班摸魚的一天。

- EOF -

推薦閱讀 點選標題可跳轉

1、C++呼叫Go方法的字串傳遞問題及解決方案

2、C++ vector詳解

3、C++之父:C++的成功屬於意料之外,C++11是轉折點

關於 C++執行緒池,歡迎在評論中和我探討。覺得文章不錯,請點贊和在看支援我繼續分享好文。謝謝!

關注『CPP開發者』

看精選C++技術文章 .加C++開發者專屬圈子

↓↓↓

90763920278a5c3a16c14a44ae794f4e.png

點贊和在看就是最大的支援❤️