1. 程式人生 > >muduo庫的ThreadPool剖析

muduo庫的ThreadPool剖析

    我們知道執行緒池本質就是一個生產者消費者模型,它維護一個執行緒佇列和任務佇列。一旦任務隊列當中有任務,相當於生產者生產了東西,就喚醒執行緒佇列中的執行緒來執行這些任務。那麼,這些執行緒就相當於消費者執行緒。

muduo庫的執行緒數目屬於啟動時配置,當執行緒池啟動時,執行緒數目就已經固定下來。

它的類圖如下:


先上程式碼,然後分析:

ThreadPool.h

#ifndef MUDUO_BASE_THREADPOOL_H
#define MUDUO_BASE_THREADPOOL_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Types.h>

#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>

#include <deque>

namespace muduo
{

class ThreadPool : boost::noncopyable
{
 public:
  typedef boost::function<void ()> Task;

  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  ~ThreadPool();

  // Must be called before start().
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }   //設定最大執行緒池執行緒最大數目大小
  void setThreadInitCallback(const Task& cb)    //設定執行緒執行前的回撥函式
  { threadInitCallback_ = cb; }

  void start(int numThreads);
  void stop();

  const string& name() const
  { return name_; }

  size_t queueSize() const;

  // Could block if maxQueueSize > 0
  void run(const Task& f);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
  void run(Task&& f);
#endif

 private:
  bool isFull() const;    //判滿
  void runInThread();   //執行緒池的執行緒執行函式
  Task take();    //取任務函式

  mutable MutexLock mutex_;   
  Condition notEmpty_;    //不空condition
  Condition notFull_;         //未滿condition
  string name_;
  Task threadInitCallback_;    //執行緒執行前的回撥函式
  boost::ptr_vector<muduo::Thread> threads_;  //執行緒陣列
  std::deque<Task> queue_;    //任務佇列
  size_t maxQueueSize_;     //因為deque是通過push_back增加執行緒數目的,所以通過外界max_queuesize儲存最多執行緒數目
  bool running_;    //執行緒池執行標誌
};

}

#endif

ThreadPool.cc

#include <muduo/base/ThreadPool.h>

#include <muduo/base/Exception.h>

#include <boost/bind.hpp>
#include <assert.h>
#include <stdio.h>

using namespace muduo;

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
    notEmpty_(mutex_),    //初始化的時候需要把condition和mutex關聯起來
    notFull_(mutex_),
    name_(nameArg),
    maxQueueSize_(0),   //初始化0
    running_(false)
{
}

ThreadPool::~ThreadPool()
{
  if (running_)    //如果執行緒池在執行,那就要進行記憶體處理,在stop()函式中執行
  {
    stop();
  }                  //如果沒有分配過執行緒,那就不存在需要釋放的記憶體,什麼都不做就可以了
}

void ThreadPool::start(int numThreads)
{
  assert(threads_.empty());   //確保未啟動過
  running_ = true;    //啟動標誌
  threads_.reserve(numThreads);   //預留reserver個空間
  for (int i = 0; i < numThreads; ++i)
  {
    char id[32];                  //id儲存執行緒id
    snprintf(id, sizeof id, "%d", i+1);
    threads_.push_back(new muduo::Thread(   //boost::bind在繫結類內部成員時,第二個引數必須是類的例項
          boost::bind(&ThreadPool::runInThread, this), name_+id));//runInThread是每個執行緒的執行緒執行函式,執行緒為執行任務情況下會阻塞
    threads_[i].start();    //啟動每個執行緒,但是由於執行緒執行函式是runInThread,所以會阻塞。
  }
  if (numThreads == 0 && threadInitCallback_) //如果執行緒池執行緒數為0,且設定了回撥函式
  {
    threadInitCallback_();  //init回撥函式
  }
}

void ThreadPool::stop()   //執行緒池停止
{
  {
  MutexLockGuard lock(mutex_);   //區域性加鎖
  running_ = false;
  notEmpty_.notifyAll(); //讓阻塞在notEmpty contition上的所有執行緒執行完畢
  }
  for_each(threads_.begin(),
           threads_.end(),
           boost::bind(&muduo::Thread::join, _1));   //對每個執行緒呼叫,pthread_join(),防止資源洩漏
}

size_t ThreadPool::queueSize() const  //thread safe
{
  MutexLockGuard lock(mutex_);
  return queue_.size();
}

//執行一個任務//所以說執行緒池這個執行緒池執行任務是靠任務佇列,客端需要執行一個任務,必須首先將該任務push進任務佇列,等侯空閒執行緒處理
void ThreadPool::run(const Task& task)    
{
  if (threads_.empty())     //如果執行緒池為空,說明執行緒池未分配執行緒
  { 
    task();    //由當前執行緒執行
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull())    //當任務佇列滿的時候,迴圈
    {
      notFull_.wait();    //一直等待任務佇列不滿   //這個鎖在take()取任務函式中,取出任務佇列未滿,喚醒該鎖
    }
    assert(!isFull()); 

    queue_.push_back(task);   //當任務佇列不滿,就把該任務加入執行緒池的任務佇列
    notEmpty_.notify();  //喚醒take()取任務函式,讓執行緒來取任務,取完任務後runInThread會執行任務
  }
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
void ThreadPool::run(Task&& task)
{
  if (threads_.empty())   //如果執行緒佇列是空的,就直接執行,因為只有一個執行緒啊
  {
    task();
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull())
    {
      notFull_.wait();   
    }
    assert(!isFull());

    queue_.push_back(std::move(task));
    notEmpty_.notify();
  }
}
#endif

//take函式是每個執行緒都執行的,需要考慮執行緒安全,考慮多執行緒下取任務的執行緒安全性,只能序列化
ThreadPool::Task ThreadPool::take()     //取任務函式
{
  MutexLockGuard lock(mutex_);   //注意,必須用鎖
  // always use a while-loop, due to spurious wakeup     //防止驚群效應。
  while (queue_.empty() && running_)     //如果任務佇列為空,並且執行緒池處於執行態
  {			//佇列為空時沒有使用執行緒池
    notEmpty_.wait();  //等待。條件變數需要用while迴圈,防止驚群效應。
  } 				   //因為所有的執行緒都在等同一condition,即notempty,只能有執行緒在wait返回時拿到mutex,並消耗資源
					  //其他執行緒雖然被notify同樣返回,但資源已被消耗,queue為空(以1個任務為例),其他執行緒就在while中繼續等待
  Task task;
  if (!queue_.empty())    //從任務佇列隊頭中取任務
  {
    task = queue_.front();
    queue_.pop_front();
    if (maxQueueSize_ > 0)    //??如果未設定會等於0,不需要喚醒notFull
    {
      notFull_.notify();    //取出一個任務之後,如任務佇列長度大於0,喚醒notfull未滿鎖
    }
  }
  return task;
}

bool ThreadPool::isFull() const   //not thread safe 
{
  mutex_.assertLocked(); //呼叫確保被使用執行緒鎖住,因為isFull函式不是一個執行緒安全的函式,外部呼叫要加鎖
  return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;   //因為deque是通過push_back增加執行緒數目的,所以通過外界max_queuesize儲存最多執行緒數目
}

//執行緒執行函式,無任務時都會阻塞在take(),有任務時會爭互斥鎖
void ThreadPool::runInThread()    //執行緒執行函式
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_();    //支援每個執行緒執行前排程回撥函式
    }
    while (running_)//當執行緒池處於啟動狀態,一直迴圈 //注意,這就和我之前專案那個問題一樣,需要讓物件析構時通知成員執行緒,讓它們也執行完畢,當時犯的錯唉~
    {
      Task task(take());   //從任務佇列中取任務,無任務會阻塞
      if (task)    //如果上面取出來了
      {
        task();    //做任務
      }
    }
  }
  catch (const Exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}

muduo庫採用的執行緒池模型實際上是producer和consumer模型,客端通過Thread::run()函式向任務佇列push任務,執行緒池等待處理任務。一旦任務佇列not empty,就進行多執行緒處理,pop front就是從隊頭開始處理任務。

task(任務)是客端要執行的函式,通過boost::bind註冊成為回撥函式,放進任務隊列當中。