muduo原始碼分析:ThreadPool 執行緒池的實現
阿新 • • 發佈:2018-11-19
原始碼:
https://github.com/chenshuo/muduo/blob/master/muduo/base/ThreadPool.h
https://github.com/chenshuo/muduo/blob/master/muduo/base/ThreadPool.cc
執行緒池ThreadPool用到了前面分析的Thread、MutexLock、Condition。ThreadPool可以設定工作執行緒的數量,並向任務佇列放入任務。放入到任務佇列中的任務將由某個工作執行緒執行。
ThreadPool.h
public: typedef boost::function<void ()> Task; explicit ThreadPool(const string& nameArg = string("ThreadPool")); ~ThreadPool(); // Must be called before start(). void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; } //設定執行緒池執行緒最大數目大小 void setThreadInitCallback(const Task& cb) //設定執行緒執行前的回撥函式 { threadInitCallback_ = cb; } void start(int numThreads); //啟動執行緒池,numThreads是執行緒池的容量 void stop(); //終止執行緒池 const string& name() const { return name_; } size_t queueSize() const; // Could block if maxQueueSize > 0 void run(const Task& f); #ifdef __GXX_EXPERIMENTAL_CXX0X__ void run(Task&& f); #endif
成員變數
private: bool isFull() const REQUIRES(mutex_); //判滿 void runInThread(); //執行緒池的執行緒執行函式 Task take(); //取任務函式 mutable MutexLock mutex_; //mutable表示在const函式也可以改變它 Condition notEmpty_ GUARDED_BY(mutex_); //任務佇列queue_不為空了,有任務可以執行了,進而喚醒等待的執行緒。 Condition notFull_ GUARDED_BY(mutex_); //任務佇列queue_不滿了,有空間可以使用了,進而喚醒等待的執行緒。 string name_; Task threadInitCallback_; //執行緒初始化回撥函式 boost::ptr_vector<muduo::Thread> threads_; //工作執行緒容器(執行緒陣列) std::deque<Task> queue_ GUARDED_BY(mutex_); //任務佇列 size_t maxQueueSize_; //佇列最大大小 bool running_; //執行緒池執行標誌
使用boost::ptr_vector存放Thead。
每個Task都是typedef boost::function<void ()> Task; 所有任務都放到
queue_中。需要使用條件變數來維護執行緒將的同步,比如:通知其他執行緒有任務到來了,可以向任務佇列放任務了等等。
ThreadPool::ThreadPool()
ThreadPool::ThreadPool(const string& nameArg) : mutex_(), notEmpty_(mutex_), notFull_(mutex_), name_(nameArg), maxQueueSize_(0), running_(false) { }
建構函式對成員變數進行初始化(使用初始化列表)。
ThreadPool::~ThreadPool()
ThreadPool::~ThreadPool()
{
if (running_)
{
stop();
}
}
解構函式會呼叫stop, 喚醒所有休眠的執行緒,然後等待所有執行緒處理完。
ThreadPool::stop()
void ThreadPool::stop() //終止執行緒池
{
{ // new scope
MutexLockGuard lock(mutex_); // ctor of MutexLockGuard will lock mutex_
running_ = false;
notEmpty_.notifyAll(); // 喚醒所有休眠的工作執行緒
} // dtor of MutexLockGuard will unlock mutex_
for_each(threads_.begin(),
threads_.end(),
boost::bind(&muduo::Thread::join, _1)); // 等待所有工作執行緒結束
}
ThreadPool::start()
void ThreadPool::start(int numThreads) //引數為執行緒數量,會建立相應數量的執行緒,執行緒函式為ThreadPool::runInThread
{
assert(threads_.empty());
running_ = true; //啟動標誌
threads_.reserve(numThreads); // 保證threads_容量至少為numThreads
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
//建立工作執行緒並加入執行緒陣列,構造Thread(this.runInThread,name+id)並加入執行緒陣列。執行緒函式是ThreadPool::runInThread
threads_.push_back(new muduo::Thread(
boost::bind(&ThreadPool::runInThread, this), name_+id));
threads_[i].start(); //啟動每個執行緒,但是由於執行緒執行的函式是runInThread,所以會阻塞
}
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
引數為執行緒數量,會建立相應數量的執行緒,執行體為ThreadPool::runInThread。
void ThreadPool::runInThread() //執行緒函式
{
try
{
if (threadInitCallback_) //如果設定了就執行,進行一些初始化設定
{
threadInitCallback_();
}
while (running_) //當執行緒池啟動之後,就在while迴圈中不停地取任務執行
{
Task task(take()); //從任務佇列取出一個任務
if (task)
{
task(); //執行該任務
}
}
}
catch (const Exception& ex) //異常處理
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}
獲取一個task
ThreadPool::Task ThreadPool::take() //獲取一個task
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)
{
notEmpty_.wait(); //沒有任務,則等待(利用條件變數)
}
Task task;
if (!queue_.empty()) //有任務了,就返回一個任務
{
task = queue_.front(); //獲取任務
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify(); //當解決了一個任務之後,任務佇列肯定不是滿的,通知某個等待向佇列放入task執行緒。
}
}
return task; //返回任務
}
條件變數的wait操作使用while包裹,預防“虛假喚醒”(如被其他執行緒搶佔了)。
向執行緒池新增task
void ThreadPool::run(const Task& task) //向執行緒池新增task
{
if (threads_.empty()
{
task(); //如果沒有子執行緒,就在主執行緒中執行該task
}
else
{
MutexLockGuard lock(mutex_);
while (isFull()) //如果task佇列queue_滿了,就等待
{
notFull_.wait();
}
assert(!isFull());
queue_.push_back(task); //將任務加入佇列
notEmpty_.notify(); //當添加了某個任務之後,任務佇列肯定不是空的,通知某個等待從queue_中取task的執行緒
}
}
使用示例
struct Foo {
public:
void DoWork() {
std::cout << "run member function in thread:" << CurrentThread::tid() << std::endl;
}
void operator() (){
std::cout << "run functor in thread:" << CurrentThread::tid() << std::endl;
}
};
void Task1()
{
std::cout << "function run in thread:" << CurrentThread::tid() << std::endl;
}
int main()
{
ThreadPool tp("TestThreadPool");
tp.setMaxQueueSize(10);
tp.start(4); // 啟動4個工作執行緒,啟動之後,由於任務佇列queue_為空,所以所有工作執行緒都休眠了
tp.run(Task1); // 放入一個task,會喚醒某個工作執行緒
Foo f;
tp.run(boost::bind(&Foo::DoWork, &f));
tp.run(f);
tp.run( [](){ std::cout << "lambda function run in thread:" << CurrentThread::tid() << std::endl; });
typedef void(*pFunc)();
pFunc pf = Task1;
tp.run(pf);
}
可以看到,ThreadPool可以很方便的將某個task放到任務佇列中,該task會由某個執行緒執行。task使用boost::function表示,可以方便地將函式指標、普通函式、成員函式(結合boost::bind)、lambda、過載了函式呼叫運算子‘()’的類的物件(這些統稱為可呼叫物件)放入到任務隊列當中,非常方便。