1. 程式人生 > >淺析muduo庫中的線程設施

淺析muduo庫中的線程設施

%d except a* empty fde returns exceptio HR 參數傳遞

muduo是目前我在學習過程中遇到的最具有學習意義的網絡庫,下文將分析muduo庫中的基礎設施--Thread和ThreadPool.
首先,介紹在多線程編程中不可缺少的同步措施--Mutex和Condition.

  • Mutex
    ```
    /Mutex.h/
    class MutexLock : boost::noncopyable
    {
    public:
    MutexLock()
    : holder_(0)
    {
    MCHECK(pthread_mutex_init(&mutex_, NULL));//MCHECK有什麽作用?
    }

~MutexLock()
{
assert(holder_ == 0);
MCHECK(pthread_mutex_destroy(&mutex_));
}

// must be called when locked, i.e. for assertion
bool isLockedByThisThread() const//是否被當前線程鎖住
{
return holder_ == CurrentThread::tid();//防止跨線程調用
}

void assertLocked() const
{
assert(isLockedByThisThread());
}

// internal usage

void lock()
{
MCHECK(pthread_mutex_lock(&mutex_));//加鎖
assignHolder();//加鎖時獲得當前線程的線程號,即目前線程擁有這個鎖
}

void unlock()
{
unassignHolder();//表示目前沒有線程擁有這個鎖
MCHECK(pthread_mutex_unlock(&mutex_));//去鎖
}

pthread_mutex_t* getPthreadMutex() /* non-const */
{
return &mutex_;
}

private:
friend class Condition;//條件變量必須持有了鎖之後才能使用

class UnassignGuard : boost::noncopyable//這個內部類出現的莫名其妙
{
public:
UnassignGuard(MutexLock& owner)
: owner_(owner)
{
owner_.unassignHolder();
}

~UnassignGuard()
{
  owner_.assignHolder();
}

private:
MutexLock& owner_;
};

void unassignHolder()
{
holder_ = 0;
}

void assignHolder()
{
holder_ = CurrentThread::tid();
}

pthread_mutex_t mutex_;
pid_t holder_;
};

// Use as a stack variable, eg.
// int Foo::size() const
// {
// MutexLockGuard lock(mutex_);
// return data_.size();
// }
//該類負責管理互斥量的加鎖和去鎖
class MutexLockGuard : boost::noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex)
: mutex_(mutex)
{
mutex_.lock();
}

~MutexLockGuard()
{
mutex_.unlock();
}

private:

MutexLock& mutex_;
};

有四種操作互斥鎖的方式:創建,銷毀,加鎖,解鎖。在muduo中,用一個低級的資源管理類MutexLock來實現這四種操作,再用一個較高級的資源管理類MutexLockGuard來管理MutexLock,即用RAII手法對資源進行兩次封裝,防止資源泄漏。
兩個類都具有nocopy的屬性,試想對Mutex的拷貝會在多線程程序中造成什麽樣的結果?有至少兩個線程在同一時間擁有對一份資源的使用資格,後果不可設想。
在MutexLock中有一個好玩的私有變量:holder_. 該變量在一個線程對資源加鎖時,將holder_設置為使用資源線程的索引;解鎖時將holder_設置為0。初始化Mutex時將holder_設置為0;銷毀時檢查holder_是否為0。以上四個步驟保證了Mutex在某一個時間段內能被一個線程使用。
MutexLock與Condition是友元關系,具有很強的耦合度。
+ Condition

/Condition.h/
class Condition : boost::noncopyable
{
public:
explicit Condition(MutexLock& mutex)
: mutex_(mutex)
{
MCHECK(pthread_cond_init(&pcond_, NULL));
}

~Condition()
{
MCHECK(pthread_cond_destroy(&pcond_));
}

void wait()
{
MutexLock::UnassignGuard ug(mutex_);
MCHECK(pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()));
}

// returns true if time out, false otherwise.
bool waitForSeconds(double seconds);

void notify()
{
MCHECK(pthread_cond_signal(&pcond_));
}

void notifyAll()
{
MCHECK(pthread_cond_broadcast(&pcond_));
}

private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};

條件變量有五種操作方式:創建,銷毀,等待,單一通知,全部通知。
在MutexLock中有一個內部類:UnassignGuard,該類的實例對象在Condition等待時創建,將holder_設置為0;當等待事件結束,又將holder_設置為原值。用MutexLock的析構函數檢查等待事件是否發生在同一個線程中。
Condition類中有一個waitForSecond函數,用於實現pthread_cond_timewait的封裝。
接下來,聊一聊主題--Thread。
+ Thread

/Thread.h/
class Thread : boost::noncopyable //禁止拷貝
{
public:
typedef boost::function

explicit Thread(const ThreadFunc&, const string& name = string());//普通的線程構造函數

ifdef GXX_EXPERIMENTAL_CXX0X

explicit Thread(ThreadFunc&&, const string& name = string());//移動的線程構造函數,比上面的更節省資源std::move

endif

~Thread();//析構函數

void start();//啟動線程
int join(); // 類似於 pthread_join()

bool started() const { return started_; }
// pthread_t pthreadId() const { return pthreadId_; }
pid_t tid() const { return *tid_; } //返回線程索引
const string& name() const { return name_; }//返回線程名字

static int numCreated() { return numCreated_.get(); }

private:
void setDefaultName();

bool started_; //是否啟動
bool joined_; //是否終止
pthread_t pthreadId_; //線程索引
boost::shared_ptr

static AtomicInt32 numCreated_; //static變量在所有的線程對象中共享,為由該類產生線程排序
};

1. 在muduo的線程對象封裝中,最精彩的是使用boost::function函數對象將線程函數以回調的方式傳遞進線程對象中。
` typedef boost::function<void ()> ThreadFun; `
2. 在多線程情況下,避免在對象外操作指向對象的指針的情形,可以在一定程度上保證了線程安全。

/Thread.cc/
AtomicInt32 Thread::numCreated_;

//兩種線程構造函數
//線程對象的可移動屬性很有意思。
Thread::Thread(const ThreadFunc& func, const string& n)
: started_(false),
joined_(false),
pthreadId_(0),
tid_(new pid_t(0)),
func_(func),
name_(n)
{
setDefaultName();
}

ifdef GXX_EXPERIMENTAL_CXX0X

Thread::Thread(ThreadFunc&& func, const string& n)
: started_(false),
joined_(false),
pthreadId_(0),
tid_(new pid_t(0)),
func_(std::move(func)),
name_(n)
{
setDefaultName();
}

endif

Thread::~Thread()
{
if (started_ && !joined_) //將該線程設置為分離屬性
{
pthread_detach(pthreadId_); //線程結束將自動回收資源
}
}

void Thread::setDefaultName() //設置線程名字,比如Thread1,Thread2等
{
int num = numCreated_.incrementAndGet();
if (name_.empty())
{
char buf[32];
snprintf(buf, sizeof buf, "Thread%d", num);
name_ = buf;
}
}

void Thread::start()
{
assert(!started_); //斷言線程是否已經開始運行
started_ = true; //斷言失敗則設置線程開始運行的標誌
// FIXME: move(func_)
detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_); //獲得線程運行的所需要的參數
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))//線程開始運行並且線程的控制流停止再在此。
{ //線程運行結束,線程自行運行結束並且自己做日誌記錄
started_ = false;
printf("blockDim.x: %d\n",blockDim.x);
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
}

int Thread::join()
{
assert(started_); //斷言線程是否正在運行
assert(!joined_); //斷言線程是否已經被終止
joined_ = true;
return pthread_join(pthreadId_, NULL); //等待線程結束
}

在線程的析構函數中只設置線程的分離屬性,即等待線程運行結束後自動回收線程資源,不強行終止線程。

struct ThreadData //作為線程數據使用,將線程運行有關的數據保存到該結構體中,有點抽象回調的意思
{
typedef muduo::Thread::ThreadFunc ThreadFunc;
ThreadFunc func_;
string name_;
boost::weak_ptr

ThreadData(const ThreadFunc& func,
const string& name,
const boost::shared_ptr

void runInThread() //核心函數
{
pid_t tid = muduo::CurrentThread::tid(); //得到當前的線程標誌

boost::shared_ptr<pid_t> ptid = wkTid_.lock();    //判斷保存在ThreadData中的線程是否存在
if (ptid)    //如果存在,ptid釋放之前指向的線程標識
{
  *ptid = tid;
  ptid.reset();
}

muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str(); //獲得當前線程名稱
::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
try
{
  func_(); //運行線程函數
  muduo::CurrentThread::t_threadName = "finished";
}
catch (const Exception& ex)    //異常捕捉部分
{
  muduo::CurrentThread::t_threadName = "crashed";
  fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
  fprintf(stderr, "reason: %s\n", ex.what());
  fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
  abort();
}
catch (const std::exception& ex)
{
  muduo::CurrentThread::t_threadName = "crashed";
  fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
  fprintf(stderr, "reason: %s\n", ex.what());
  abort();
}
catch (...)
{
  muduo::CurrentThread::t_threadName = "crashed";
  fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
  throw; // rethrow
}

}
};

void* startThread(void* obj) //這個函數最有意思
{
ThreadData* data = static_cast

將線程中的若幹數據保存到ThreadData中,然後將ThreadData作為傳遞給`pthread_create(...,void* arg)`中的最後一個數據參數傳遞給`void Thread(void* )`標準的線程啟動函數。然後在標準的線程啟動函數內將`void* arg`強行轉化為ThreadData,然後使用ThreadData啟動線程。
在使用muduo的接口時,使用bind將線程運行函數再打包,然後傳遞進Thread.
最後,向大家介紹muduo庫中對於線程池的封裝的理解。
1. 最重要的想法就是線程池將線程看為自己可執行的最小並且可隨時增加的單位。
2. 整個線程池對象維持兩個任務隊列,threads_表示目前正在運行中的線程池,queue_表示位於存儲隊列中的等待線程。
3. thread_在運行的過程中使用while循環+條件變量判斷當前的活動線程池中是否有空位,以及新的等待線程進入線程池。
4. 線程池從一開始就確定了自己將要運行的線程數目,不能在後面的運行中更改。

/ThreadPool.h/
class ThreadPool : boost::noncopyable
{
public:
typedef boost::function

explicit ThreadPool(const string& nameArg = string("ThreadPool"));
~ThreadPool();

// Must be called before start().
// 設置線程池運行的最大的負載以及線程池中將要運行的線程
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }//
void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb; }

void start(int numThreads);//啟動一定數量的線程
void stop();//線程池運算停止

const string& name() const
{ return name_; }

size_t queueSize() const;//返回正在排隊等待的線程任務

// Could block if maxQueueSize > 0
void run(const Task& f);//將一個想要運行的線程放入線程池的任務隊列

ifdef GXX_EXPERIMENTAL_CXX0X

void run(Task&& f);//C++11的移動方法,用於節省資源

endif

private:
bool isFull() const;//判斷線程隊列是否已經滿了
void runInThread();//真正讓線程跑起來的函數
Task take();//獲得任務隊列的首個線程

mutable MutexLock mutex_;//互斥鎖
Condition notEmpty_;//條件變量
Condition notFull_;
string name_;
Task threadInitCallback_;//線程池中執行的線程對象
boost::ptr_vector

5. 每一個加入線程池的線程都帶有一個while循環,保證線程等待隊列中的線程不會等待太久。即所有將加入線程池的線程都會進入線程等待隊列接受檢查。
6. start():線程池啟動函數保證在調用時啟動一定數量的線程。
7. stop():保證所有的正在運行的線程停止
8. queueSize():返回此時線程等待隊列中的個數,用於判斷線程等待隊列是否為空
9. run():如果線程池為空,直接跑傳入的線程。如果線程池等待隊列滿了,則當前控制流(線程)在notFull_上等待;否則將傳入的線程加入線程等待隊列,並且使用條件變量notEmpty_通知一條阻塞在該條件變量上的控制流(線程)。
10. take():如果當前線程等待隊列為空並且線程池正在跑,則控制流(線程)阻塞在notEmpty_條件變量上。當條件變量被激活時(有線程對象加入呆線程等待隊列),判斷是否可以從線程等待隊列中拿出一個線程對象,如果可以,則將使用條件變量notFull_通知run()中阻塞在--想加入隊列但是隊列沒有空余位置的變量上。
11. isFull():返回在線程等待隊列中的個數,用於判斷是否可以將想要運行的線程放到線程等待隊列中。
12. runInThread():如果線程啟動函數不為空,則在此將線程的控制流交給用於初始化線程池的線程對象。當此線程對象運行結束的時候,並且此時的線程池還在運行,則線程池離開初始化模式,進入線程池的循環線程補充模式。這種模式控制著線程池中的線程數量:當有新的線程對象進入線程池,則當前的線程控制流交給將要執行的線程對象。也就是說線程池中的線程對象要麽主動結束自己的‘life’,然後由線程池的線程補充模式決定將要進入線程池運行的線程對象。然後在後面的take()中使用條件變量完成新的線程進入線程池的同步。

/ThreadPool.cc/
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}

ThreadPool::~ThreadPool()
{
if (running_)
{
stop();
}
}

void ThreadPool::start(int numThreads)
{
assert(threads_.empty());//首次啟動,斷言線程池為空
running_ = true;
threads_.reserve(numThreads);//預分配空間,且分配的空間不可變。
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
threads_.push_back(new muduo::Thread(
boost::bind(&ThreadPool::runInThread, this), name_+id));
threads_[i].start();//直接啟動線程
}
if (numThreads == 0 && threadInitCallback_)//只啟動一條線程
{
threadInitCallback_();
}
}

void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
notEmpty_.notifyAll();
}
for_each(threads_.begin(),
threads_.end(),
boost::bind(&muduo::Thread::join, _1));
}

size_t ThreadPool::queueSize() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

void ThreadPool::run(const Task& task)
{
if (threads_.empty())//如果線程池為空,直接跑這條線程
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())//如果線程池滿了,在notfull條件變量上等待
{
notFull_.wait();
}
assert(!isFull());

queue_.push_back(task);//現在線程池中有空位了
notEmpty_.notify();//notempty條件變量通知信息

}
}

ifdef GXX_EXPERIMENTAL_CXX0X

void ThreadPool::run(Task&& task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())
{
notFull_.wait();
}
assert(!isFull());

queue_.push_back(std::move(task));
notEmpty_.notify();

}
}

endif

ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)//如果線程隊列為空並且線程池正在跑
{//在notempty條件變量上等待
notEmpty_.wait();//當前線程停下來等待,當隊列不為空了繼續跑
}//然後獲得新任務
Task task;//創建一個新的任務
if (!queue_.empty())
{
task = queue_.front();//獲得隊列中的頭任務
queue_.pop_front();//彈出隊列中的頭任務
if (maxQueueSize_ > 0)//如果隊列最大長度大於0
{
notFull_.notify();//通知線程可以跑了
}
}
return task;//返回任務
}

bool ThreadPool::isFull() const
{//用來判斷線程隊列是否已經
mutex_.assertLocked();
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}

void ThreadPool::runInThread()//生成一個threadFunc對象
{
try
{
if (threadInitCallback_)//如果線程啟動函數不為空,直接啟動
{
threadInitCallback_();//此處開啟新的線程,程序的運行流程在此停止;當線程運行完成則進入下面的while循環
}
while (running_)//該循環保證當上面的線程運行完成或者沒有初始化線程,則進入線程池的循環模式
{
Task task(take());
if (task)
{
task();
}
}
}
catch (const Exception& ex) //異常捕捉過程
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
```

淺析muduo庫中的線程設施