muduo庫的ThreadPool剖析
阿新 • • 發佈:2019-02-07
我們知道執行緒池本質就是一個生產者消費者模型,它維護一個執行緒佇列和任務佇列。一旦任務隊列當中有任務,相當於生產者生產了東西,就喚醒執行緒佇列中的執行緒來執行這些任務。那麼,這些執行緒就相當於消費者執行緒。
muduo庫的執行緒數目屬於啟動時配置,當執行緒池啟動時,執行緒數目就已經固定下來。
它的類圖如下:
先上程式碼,然後分析:
ThreadPool.h
#ifndef MUDUO_BASE_THREADPOOL_H #define MUDUO_BASE_THREADPOOL_H #include <muduo/base/Condition.h> #include <muduo/base/Mutex.h> #include <muduo/base/Thread.h> #include <muduo/base/Types.h> #include <boost/function.hpp> #include <boost/noncopyable.hpp> #include <boost/ptr_container/ptr_vector.hpp> #include <deque> namespace muduo { class ThreadPool : boost::noncopyable { public: typedef boost::function<void ()> Task; explicit ThreadPool(const string& nameArg = string("ThreadPool")); ~ThreadPool(); // Must be called before start(). void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; } //設定最大執行緒池執行緒最大數目大小 void setThreadInitCallback(const Task& cb) //設定執行緒執行前的回撥函式 { threadInitCallback_ = cb; } void start(int numThreads); void stop(); const string& name() const { return name_; } size_t queueSize() const; // Could block if maxQueueSize > 0 void run(const Task& f); #ifdef __GXX_EXPERIMENTAL_CXX0X__ void run(Task&& f); #endif private: bool isFull() const; //判滿 void runInThread(); //執行緒池的執行緒執行函式 Task take(); //取任務函式 mutable MutexLock mutex_; Condition notEmpty_; //不空condition Condition notFull_; //未滿condition string name_; Task threadInitCallback_; //執行緒執行前的回撥函式 boost::ptr_vector<muduo::Thread> threads_; //執行緒陣列 std::deque<Task> queue_; //任務佇列 size_t maxQueueSize_; //因為deque是通過push_back增加執行緒數目的,所以通過外界max_queuesize儲存最多執行緒數目 bool running_; //執行緒池執行標誌 }; } #endif
ThreadPool.cc
#include <muduo/base/ThreadPool.h> #include <muduo/base/Exception.h> #include <boost/bind.hpp> #include <assert.h> #include <stdio.h> using namespace muduo; ThreadPool::ThreadPool(const string& nameArg) : mutex_(), notEmpty_(mutex_), //初始化的時候需要把condition和mutex關聯起來 notFull_(mutex_), name_(nameArg), maxQueueSize_(0), //初始化0 running_(false) { } ThreadPool::~ThreadPool() { if (running_) //如果執行緒池在執行,那就要進行記憶體處理,在stop()函式中執行 { stop(); } //如果沒有分配過執行緒,那就不存在需要釋放的記憶體,什麼都不做就可以了 } void ThreadPool::start(int numThreads) { assert(threads_.empty()); //確保未啟動過 running_ = true; //啟動標誌 threads_.reserve(numThreads); //預留reserver個空間 for (int i = 0; i < numThreads; ++i) { char id[32]; //id儲存執行緒id snprintf(id, sizeof id, "%d", i+1); threads_.push_back(new muduo::Thread( //boost::bind在繫結類內部成員時,第二個引數必須是類的例項 boost::bind(&ThreadPool::runInThread, this), name_+id));//runInThread是每個執行緒的執行緒執行函式,執行緒為執行任務情況下會阻塞 threads_[i].start(); //啟動每個執行緒,但是由於執行緒執行函式是runInThread,所以會阻塞。 } if (numThreads == 0 && threadInitCallback_) //如果執行緒池執行緒數為0,且設定了回撥函式 { threadInitCallback_(); //init回撥函式 } } void ThreadPool::stop() //執行緒池停止 { { MutexLockGuard lock(mutex_); //區域性加鎖 running_ = false; notEmpty_.notifyAll(); //讓阻塞在notEmpty contition上的所有執行緒執行完畢 } for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); //對每個執行緒呼叫,pthread_join(),防止資源洩漏 } size_t ThreadPool::queueSize() const //thread safe { MutexLockGuard lock(mutex_); return queue_.size(); } //執行一個任務//所以說執行緒池這個執行緒池執行任務是靠任務佇列,客端需要執行一個任務,必須首先將該任務push進任務佇列,等侯空閒執行緒處理 void ThreadPool::run(const Task& task) { if (threads_.empty()) //如果執行緒池為空,說明執行緒池未分配執行緒 { task(); //由當前執行緒執行 } else { MutexLockGuard lock(mutex_); while (isFull()) //當任務佇列滿的時候,迴圈 { notFull_.wait(); //一直等待任務佇列不滿 //這個鎖在take()取任務函式中,取出任務佇列未滿,喚醒該鎖 } assert(!isFull()); queue_.push_back(task); //當任務佇列不滿,就把該任務加入執行緒池的任務佇列 notEmpty_.notify(); //喚醒take()取任務函式,讓執行緒來取任務,取完任務後runInThread會執行任務 } } #ifdef __GXX_EXPERIMENTAL_CXX0X__ void ThreadPool::run(Task&& task) { if (threads_.empty()) //如果執行緒佇列是空的,就直接執行,因為只有一個執行緒啊 { task(); } else { MutexLockGuard lock(mutex_); while (isFull()) { notFull_.wait(); } assert(!isFull()); queue_.push_back(std::move(task)); notEmpty_.notify(); } } #endif //take函式是每個執行緒都執行的,需要考慮執行緒安全,考慮多執行緒下取任務的執行緒安全性,只能序列化 ThreadPool::Task ThreadPool::take() //取任務函式 { MutexLockGuard lock(mutex_); //注意,必須用鎖 // always use a while-loop, due to spurious wakeup //防止驚群效應。 while (queue_.empty() && running_) //如果任務佇列為空,並且執行緒池處於執行態 { //佇列為空時沒有使用執行緒池 notEmpty_.wait(); //等待。條件變數需要用while迴圈,防止驚群效應。 } //因為所有的執行緒都在等同一condition,即notempty,只能有執行緒在wait返回時拿到mutex,並消耗資源 //其他執行緒雖然被notify同樣返回,但資源已被消耗,queue為空(以1個任務為例),其他執行緒就在while中繼續等待 Task task; if (!queue_.empty()) //從任務佇列隊頭中取任務 { task = queue_.front(); queue_.pop_front(); if (maxQueueSize_ > 0) //??如果未設定會等於0,不需要喚醒notFull { notFull_.notify(); //取出一個任務之後,如任務佇列長度大於0,喚醒notfull未滿鎖 } } return task; } bool ThreadPool::isFull() const //not thread safe { mutex_.assertLocked(); //呼叫確保被使用執行緒鎖住,因為isFull函式不是一個執行緒安全的函式,外部呼叫要加鎖 return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_; //因為deque是通過push_back增加執行緒數目的,所以通過外界max_queuesize儲存最多執行緒數目 } //執行緒執行函式,無任務時都會阻塞在take(),有任務時會爭互斥鎖 void ThreadPool::runInThread() //執行緒執行函式 { try { if (threadInitCallback_) { threadInitCallback_(); //支援每個執行緒執行前排程回撥函式 } while (running_)//當執行緒池處於啟動狀態,一直迴圈 //注意,這就和我之前專案那個問題一樣,需要讓物件析構時通知成員執行緒,讓它們也執行完畢,當時犯的錯唉~ { Task task(take()); //從任務佇列中取任務,無任務會阻塞 if (task) //如果上面取出來了 { task(); //做任務 } } } catch (const Exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTrace()); abort(); } catch (const std::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); } catch (...) { fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str()); throw; // rethrow } }
muduo庫採用的執行緒池模型實際上是producer和consumer模型,客端通過Thread::run()函式向任務佇列push任務,執行緒池等待處理任務。一旦任務佇列not empty,就進行多執行緒處理,pop front就是從隊頭開始處理任務。
task(任務)是客端要執行的函式,通過boost::bind註冊成為回撥函式,放進任務隊列當中。