簡單的執行緒池(四)
概要
筆者對 《簡單的執行緒池(一)》 中的非阻塞式執行緒池進行了改造。在新的執行緒池中,為每個工作執行緒配備一個獨佔的任務佇列。執行緒池使用者提交的任務被隨機地分配到各個獨佔的任務佇列中。工作執行緒從獨佔的任務佇列中獲取任務並執行。
本文不再贅述與 《簡單的執行緒池(一)》 相同的內容。如有不明之處,請參考該部落格。
實現
以下程式碼給出了此執行緒池的實現,(lockwise_unique_pool.h)
class Thread_Pool { private: struct Task_Wrapper { ... }; atomic<bool> _suspend_; // #5 atomic<bool> _done_; unsigned _workersize_; thread* _workers_; Lockwise_Queue<Task_Wrapper>* _workerqueues_; // #2 void work(unsigned index) { Task_Wrapper task; while (!_done_.load(memory_order_acquire)) { if (_workerqueues_[index].pop(task)) // #4 task(); while (_suspend_.load(memory_order_acquire)) // #7 std::this_thread::yield(); } } void stop() { size_t remaining = 0; _suspend_.store(true, memory_order_release); // #6 for (unsigned i = 0; i < _workersize_; ++i) remaining += _workerqueues_[i].size(); _suspend_.store(false, memory_order_release); // #8 for (unsigned i = 0; i < _workersize_; ++i) while (!_workerqueues_[i].empty()) std::this_thread::yield(); std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining); _done_.store(true, memory_order_release); for (unsigned i = 0; i < _workersize_; ++i) if (_workers_[i].joinable()) _workers_[i].join(); delete[] _workers_; delete[] _workerqueues_; // #9 } public: Thread_Pool() : _suspend_(false), _done_(false) { try { _workersize_ = thread::hardware_concurrency(); _workers_ = new thread[_workersize_](); _workerqueues_ = new Lockwise_Queue<Task_Wrapper>[_workersize_](); // #1 for (unsigned i = 0; i < _workersize_; ++i) _workers_[i] = thread(&Thread_Pool::work, this, i); } catch (...) { stop(); throw; } } ~Thread_Pool() { stop(); } template<class Callable> future<typename std::result_of<Callable()>::type> submit(Callable c) { typedef typename std::result_of<Callable()>::type R; packaged_task<R()> task(c); future<R> r = task.get_future(); _workerqueues_[std::rand() % _workersize_].push(std::move(task)); // #3 return r; } };
構造 Thread_Pool 物件時,為每個工作執行緒配備一個獨佔的任務佇列(#1),由 _workerqueues_ 指標引用(#2)。執行緒池使用者提交的任務被隨機地分配到各個獨佔的任務佇列中(#3),每個執行緒根據自己的索引編號從與之對應的任務佇列(#4)中獲取任務並執行。
線上程池退出時,將 atomic<bool> _suspend_ 資料成員(#5)置為 true(#6),工作執行緒被暫停(#7);準確地統計完剩餘的工作任務後,將其置為 false(#8),工作執行緒繼續處理剩餘的工作任務。最後,需要回收任務佇列資源(#9)。
邏輯
以下類圖展現了此執行緒池的程式碼主要邏輯結構。它與
執行緒池使用者提交任務與工作執行緒執行任務的併發過程與 《簡單的執行緒池(一)》 中的一致,此處略。
驗證
驗證過程採用了 《簡單的執行緒池(三)》 中定義的的測試用例,對應的測試結果均儲存在 [github] cnblogs/15661191 中。
最後
完整的程式碼與測試資料請參考 [github] cnblogs/15661191 。
受限於作者的水平,讀者如發現有任何錯誤或有疑問之處,請追加評論或發郵件聯絡 [email protected]。作者將在收到意見後的第一時間裡予以回覆。 本文來自部落格園,作者: