C++ 學習筆記(28)C++ thread_pool
阿新 • • 發佈:2019-02-15
程式碼來自網路。只做私人記錄,參考用
#include <iostream> #include <vector> #include <queue> #include <string> #include <functional> #include <chrono> // thread #include <thread> #include <future> #include <atomic> #include <mutex> #include <condition_variable> namespace YHL { class thread_pool { private: // 一個執行緒池 + 一個任務佇列, 執行緒不斷檢查是否可以執行任務 std::vector< std::thread > pool; std::queue< std::function<void()> > tasks; // sunchronization std::mutex mtx; std::condition_variable cv; bool stop; public: thread_pool(const size_t); ~thread_pool(); // 獲取一個執行緒 std::function<void()> get_thread(); // 拓展執行緒池的容量 void add_thread(const size_t); template<typename F, class... Args> auto enqueue(F&& fun, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>; }; thread_pool::thread_pool(const size_t init_size) :stop(false) { for(size_t i = 0; i <init_size; ++i) this->pool.emplace_back(std::move(get_thread())); } // 獲取一個執行緒 std::function<void()> thread_pool::get_thread() { auto task = [this] { for(;;) { // 實現執行緒池的關鍵 : 每個執行緒輪詢佇列是否有未處理的任務 std::function<void()> cur; do{ std::unique_lock<std::mutex> lck(this->mtx); this->cv.wait(lck, [this]{ return this->stop || !this->tasks.empty();}); if(this->stop or this->tasks.empty()) return; cur = std::move(this->tasks.front()); this->tasks.pop(); } while(0); cur(); // 本次任務結束, 繼續輪詢任務佇列,把可以執行的任務放到執行緒中 } }; return task; } // 拓展執行緒池的容量 void thread_pool::add_thread(const size_t extend) { for(size_t i = 0;i < extend; ++i) this->pool.emplace_back(std::move(get_thread())); } // 放入新的任務到佇列中去 template<typename F, class... Args> auto thread_pool::enqueue(F&& fun, Args&& ...args) -> std::future< typename std::result_of<F(Args...)>::type > { using return_type = typename std::result_of<F(Args...)>::type; auto packed_task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(fun), std::forward<Args>(args)...) ); { std::unique_lock<std::mutex> lck(this->mtx); if(stop == true) throw std::runtime_error("enqueue task on stopped pool\n"); this->tasks.emplace([packed_task](){ (*packed_task)(); }); } std::future<return_type> res = packed_task->get_future(); this->cv.notify_one(); return res; } inline thread_pool::~thread_pool() { { std::unique_lock<std::mutex> lck(this->mtx); stop = true; } this->cv.notify_all(); for(auto &it : pool) it.join(); pool.clear(); pool.shrink_to_fit(); } } namespace test { int cnt = 0; std::mutex m; int fun() { std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "id : " << std::this_thread::get_id() << std::endl; std::lock_guard<std::mutex> lck(m); return 1022; } } int main() { YHL::thread_pool pool(4); for(int i = 0;i < 10; ++i) { auto result = pool.enqueue(test::fun); std::cout << "answer : " << result.get() << std::endl; } pool.add_thread(2); for(int i = 0;i < 20; ++i) { auto result = pool.enqueue(test::fun); std::cout << "answer : " << result.get() << std::endl; } return 0; }