Muduo網路庫原始碼分析(四)EventLoopThread和EventLoopThreadPool的封裝
muduo的併發模型為one loop per thread+ threadpool。為了方便使用,muduo封裝了EventLoop和Thread為EventLoopThread,為了方便使用執行緒池,又把EventLoopThread封裝為EventLoopThreadPool。所以這篇博文並沒有涉及到新鮮的技術,但是也有一些封裝和邏輯方面的注意點需要我們去分析和理解。
EventLoopThread
任何一個執行緒,只要建立並運行了EventLoop,就是一個IO執行緒。 EventLoopThread類就是一個封裝好了的IO執行緒。
EventLoopThread的工作流程為:
1、在主執行緒建立EventLoopThread物件。
2、主執行緒呼叫EventLoopThread.start(),啟動EventLoopThread中的執行緒(稱為IO執行緒),並且主執行緒要等待IO執行緒建立完成EventLoop物件。
3、IO執行緒呼叫threadFunc建立EventLoop物件。通知主執行緒已經建立完成。
4、主執行緒返回建立的EventLoop物件。
EventLoopThread.h
EventLoopThread.ccclass EventLoopThread : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback()); ~EventLoopThread(); EventLoop* startLoop(); // 啟動執行緒,該執行緒就成為了IO執行緒 private: void threadFunc(); // 執行緒函式 EventLoop* loop_; // loop_指標指向一個EventLoop物件 bool exiting_; Thread thread_; MutexLock mutex_; Condition cond_; ThreadInitCallback callback_; // 回撥函式在EventLoop::loop事件迴圈之前被呼叫 };
測試程式:EventLoopThread::EventLoopThread(const ThreadInitCallback& cb) : loop_(NULL), exiting_(false), thread_(boost::bind(&EventLoopThread::threadFunc, this)), mutex_(), cond_(mutex_), callback_(cb) { } EventLoopThread::~EventLoopThread() { exiting_ = true; loop_->quit(); // 退出IO執行緒,讓IO執行緒的loop迴圈退出,從而退出了IO執行緒 thread_.join(); //等待執行緒退出 } EventLoop* EventLoopThread::startLoop() { assert(!thread_.started()); thread_.start();//執行緒啟動,呼叫threadFunc() { MutexLockGuard lock(mutex_); while (loop_ == NULL) { cond_.wait();//需要等待EventLoop物件的建立 } } return loop_; } void EventLoopThread::threadFunc() { EventLoop loop; if (callback_) { callback_(&loop); } { MutexLockGuard lock(mutex_); // loop_指標指向了一個棧上的物件,threadFunc函式退出之後,這個指標就失效了 // threadFunc函式退出,就意味著執行緒退出了,EventLoopThread物件也就沒有存在的價值了。 // 因而不會有什麼大的問題 loop_ = &loop; cond_.notify(); //建立好,傳送通知 } loop.loop();// 會在這裡迴圈,直到EventLoopThread析構。此後不再使用loop_訪問EventLoop了 //assert(exiting_); }
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
}
int main()
{
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
EventLoopThread loopThread;
EventLoop* loop = loopThread.startLoop();
// 非同步呼叫runInThread,即將runInThread新增到loop物件所在IO執行緒,讓該IO執行緒執行
loop->runInLoop(runInThread);
sleep(1);
// runAfter內部也呼叫了runInLoop,所以這裡也是非同步呼叫
loop->runAfter(2, runInThread);
sleep(3);
loop->quit();
printf("exit main().\n");
}
對呼叫過程進行分析:(檢視日誌)
主執行緒呼叫 loop->runInLoop(runInThread); 由於主執行緒(不是IO執行緒)呼叫runInLoop, 故呼叫queueInLoop() 將runInThead 新增到佇列,然後wakeup() IO執行緒,IO執行緒在doPendingFunctors() 中取loop->runAfter() 要喚醒一下,此時只是執行runAfter() 添加了一個2s的定時器, 2s超時,timerfd_ 可讀,先handleRead()一下然後執行回撥函式runInThread()。
那為什麼exit main() 之後wakeupFd_ 還會有可讀事件呢?那是因為EventLoopThead 棧上物件析構,在解構函式內 loop_ ->quit(), 由於不是在IO執行緒呼叫quit(),故也需要喚醒一下,IO執行緒才能從poll 返回,這樣再次迴圈判斷 while (!quit_) 就能退出IO執行緒。
EventLoopThreadPool
muduo的執行緒模型:muduo的思想時eventLoop+thread pool,為了更方便使用,將EventLoopThread做了封裝。main reactor可以建立sub reactor,併發一些任務分發到sub reactor中去。EventLoopThreadPool的思想比較簡單,用一個main reactor建立EventLoopThreadPool。在EventLoopThreadPool中將EventLoop和Thread繫結,可以返回EventLoop物件來使用EventLoopThreadPool中的Thread。
EventLoopThreadPool.hclass EventLoopThreadPool : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
EventLoop* getNextLoop();
private:
EventLoop* baseLoop_; // 與Acceptor所屬EventLoop相同
bool started_;
int numThreads_; // 執行緒數
int next_; // 新連線到來,所選擇的EventLoop物件下標
boost::ptr_vector<EventLoopThread> threads_; // IO執行緒列表
std::vector<EventLoop*> loops_; // EventLoop列表
};
EventLoopThreadPool.cc
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
: baseLoop_(baseLoop),
started_(false),
numThreads_(0),
next_(0)
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
EventLoopThread* t = new EventLoopThread(cb);
threads_.push_back(t);
loops_.push_back(t->startLoop()); // 啟動EventLoopThread執行緒,在進入事件迴圈之前,會呼叫cb
}
if (numThreads_ == 0 && cb)
{
// 只有一個EventLoop,在這個EventLoop進入事件迴圈之前,呼叫cb
cb(baseLoop_);
}
}
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
EventLoop* loop = baseLoop_;
// 如果loops_為空,則loop指向baseLoop_
// 如果不為空,按照round-robin(RR,輪叫)的排程方式選擇一個EventLoop
if (!loops_.empty())
{
// round-robin
loop = loops_[next_];
++next_;
if (implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
mainReactor關注監聽事件,已連線套接字事件輪詢給執行緒池中的subReactors 處理,一個新的連線對應一個subReactor
我們採用round-robin(RR,輪叫)的排程方式選擇一個EventLoop,也就是getNextLoop函式。極端情況下,執行緒池中個數為0時,那麼新的連線交給mainReactor,這樣就退化成單執行緒的模式。