muduo原始碼解析24-網路庫2:Reactor關鍵結構channel,poller和eventloop
簡介:
用eventloop,poller和channel共同完成一個最簡單的reactor模型。
注意本文超級長(介紹了三個類,channel,poller和eventloop,用他們實現一個最基本的reactor模型)
//這是一個正常的IO複用模型結構,以poll為例子
while(1) { poll(); //等待網路事件的發生 handleEvent(); //處理各個網路事件 }
根據上面的小例子來簡單說一下這三個類都是幹嘛的:
eventloop之前說過了,是封裝這整個迴圈體while(1){},而poller看名字就猜得出來是對 poll/epoll的封裝,channel則是對handleEvent()的封裝。
雖然說得很精簡,不過的確是這幾個類最核心的功能。
下面來具體看看各個類的實現:
channel類:
說明:
正常IO複用流程如下:(利用poll)
while(1) { poll(); //此時等待,有網路事件發生了就返回 handleEvent(); //這一步通過channel來實現 }
當呼叫 ::poll() 獲取到發生網路事件的套接字時,channel的任務就是負責處理這些網路事件,為每一個套接字建立一個channel用於處理這個套接字上發生的網路事件。
根據muduo書上內容,有以下的原則:
每個channel物件自始至終只屬於一個eventloop,因此每個channel物件都只屬於某一個IO執行緒
channel會把不同的IO事件分發成不同的回撥,例如ReadCallback,WriteCallback等
muduo使用者一般不直接使用channel,而會使用更上層的封裝,例如TcpConnection(以後瞭解)
channel的生命期由其owner class 負責管理,一般是其他class直接或間接成員
channel的成員函式都只能在IO執行緒呼叫,因此更新其資料成員都不必加鎖
使用者一般只用set*Callback(),和enable*這幾個函式用來設定回撥函式和註冊網路事件
瞭解channel作用是處理網路事件之後,下面是具體的channel實現:
channel.h
#ifndef CHANNEL_H #define CHANNEL_H #include"base/noncopyable.h" #include<functional> namespace mymuduo { namespace net { class eventloop; class channel:noncopyable { public: typedef std::function<void()> EventCallback; channel(eventloop* loop,int fd); void handleEvent(); //設定回撥函式,read,write,error Callback函式,在處理event時被呼叫 void setReadCallback(EventCallback cb) { m_readCallback = std::move(cb); } void setWriteCallback(EventCallback cb) { m_writeCallback = std::move(cb); } void setErrorCallback(EventCallback cb) { m_errorCallback = std::move(cb); } int fd()const {return m_fd;} //channel所負責IO事件的那個fd //返回當前channel所註冊的網路事件 int events() const{return m_events;} void set_revents(int revt){m_revents=revt;} //設定網路事件 //判斷當前channel是否註冊了事件 bool isNoneEvent() const{return m_events==kNoneEvent;} //在m_event上註冊讀/寫事件 void enableReading(){m_events|=kReadEvent;update();} void enableWriting(){m_events|=kWriteEvent;update();} //在m_event上取消讀/寫事件 void disableReading(){m_events&=~kReadEvent;update();} void disableWriting(){m_events&=~kWriteEvent;update();} //取消m_event所有事件 void disableAll(){m_events=kNoneEvent;update();} //判斷m_event是否註冊了讀/寫事件 bool isWriting() const{return m_events & kWriteEvent;} bool isReading() const{return m_events & kWriteEvent;} //for poller int index(){return m_index;} void set_index(int idx){m_index=idx;} //返回當前channel所在的那個eventloop eventloop* ownerLoop(){return m_loop;} private: //讓本channel 所屬於的那個eventloop回撥channel::update()完成channel的更新 void update(); //這三個靜態常量分別表示:無網路事件,讀網路事件,寫網路事件 static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; eventloop* m_loop; //channel所屬的那個eventloop const int m_fd; //每個channel負責處理一個sockfd上的網路事件 int m_events; //channel註冊(要監聽)的網路事件 int m_revents; //poll()返回的網路事件,具體發生的事件 int m_index; //這個channel在poller中m_pollfds中的序號,預設-1表示不在其中 //當發生了讀/寫/錯誤網路事件時,下面三個函式會被呼叫 EventCallback m_readCallback; EventCallback m_writeCallback; EventCallback m_errorCallback; }; }//namespace net }//namespace mymuduo #endif // CHANNEL_H
channel.cpp
#include "channel.h" #include"base/logging.h" #include"net/channel.h" #include"net/eventloop.h" #include<sstream> #include<poll.h> namespace mymuduo { namespace net { const int channel::kNoneEvent=0; const int channel::kReadEvent=POLLIN|POLLPRI; const int channel::kWriteEvent=POLLOUT; channel::channel(eventloop* loop,int fd) :m_loop(loop),m_fd(fd),m_events(0),m_revents(0),m_index(-1) { } //此時eventloop::loop()中poll函式返回,說明有網路事件發生了, //針對網路事件型別進行相應的處理 void channel::handleEvent() { //處理 網路事件 POLLNVAL if(m_revents & POLLNVAL) LOG_WARN<<"channel::handle_event() POLLNVAL"; //處理 網路事件 錯誤 if(m_revents & (POLLERR|POLLNVAL)) if(m_errorCallback) m_errorCallback(); //處理 網路事件 read if(m_revents & (POLLIN |POLLPRI|POLLRDHUP)) if(m_readCallback) m_readCallback(); //處理 網路事件 write if(m_revents & POLLOUT) if(m_writeCallback) m_writeCallback(); } //讓本channel 所屬於的那個eventloop回撥channel::update()完成channel的更新 void channel::update() { m_loop->updateChannel(this); } }//namespace net }//namespace mymuduo
時序流程:
eventloop::loop()
{ 在loop()迴圈中
::poll(); 獲取網路事件,併為每一個發生網路事件的套接字建立一個channel。
使用channel來處理具體的網路事件:
channel::handleEvent()--->
{
讀網路事件:處理讀;
寫網路事件:處理寫;
錯誤事件: 處理錯誤;
}
}
poller類:
說明:
是對IO複用模型的封裝例如poll和epoll,poller是屬於eventloop的,因此會在eventloop::loop()中呼叫
poller::poll(),來獲取發生了網路事件的套接字。
應當注意,poller內部擁有一個pollfd集合,用於儲存poll()要監聽的那個套接字集合,另外有一個m_channels用於儲存每個套接字fd到其所對應的channel之間的對映。可以說,關於套接字集合和channel集合的一些資料都儲存在poller中,他們的更新也通過poller來管理。
poller.h:
#ifndef POLLER_H #define POLLER_H #include<map> #include<vector> #include"base/timestamp.h" #include"net/eventloop.h" struct pollfd; //在.cpp檔案包含<poll.h> namespace mymuduo { namespace net { class channel; class poller:noncopyable { public: typedef std::vector<channel*> ChannelList; //建構函式,指定poller所屬的那個eventloop物件 poller(eventloop* loop):m_ownerloop(loop){} ~poller()=default; //作為poller函式的核心,eventloop在loop()中呼叫poll()函式獲得當前活動的IO事件 //然後填充eventloop生成的所有channel到ChannelList中,儲存所有的channel資訊 timestamp poll(int timeoutMs,ChannelList* activeChannels); //負責更新把channel更新到m_pollfds void updateChannel(channel* ch); //保證eventloop所在的執行緒為當前執行緒 void assertInLoopThread() const { m_ownerloop->assertInLoopThread(); } private: void fillActiveChannels(int numEvents,ChannelList* activeChannels)const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int,channel*> ChannelMap; ChannelMap m_channels; //儲存從fd到Channel*的對映. PollFdList m_pollfds; //pollfd集合,使用者儲存所有客戶機套接字資訊 eventloop* m_ownerloop; //poller所屬於的那個eventloop物件 }; }//namespace net }//namespace mymuduo #endif // POLLER_H
poller.cpp:
#include "poller.h" #include"base/logging.h" #include"net/channel.h" #include<poll.h> namespace mymuduo{ namespace net { //呼叫::poll()獲取發生了網路事件的套接字,併為每一個網路事件分配一個channel用於處理 //函式返回當前::poll()返回的時間戳 timestamp poller::poll(int timeoutMs, ChannelList *activeChannels) { int numEvents=::poll(&*m_pollfds.begin(),m_pollfds.size(),timeoutMs); timestamp now(timestamp::now()); //此時poll返回,numEvents個套接字上發生了網路事件 if(numEvents>0) { LOG_TRACE<<numEvents<<" events happended"; //在acticeChannels中新增numThreads個channel,每個事件分配一個channel fillActiveChannels(numEvents,activeChannels); }else if(numEvents==0) { LOG_TRACE<<"nothing happended"; } else LOG_SYSERR<<"poller::poll()"; return now; } //主要功能是負責維護和更新pollfd陣列, //傳入一個channel進來,我們得到ch->index()判斷它在m_pollfds中的位置,如果不存在 //需要在m_pollfds中新加一個pollfd //如果存在直接更新即可 void poller::updateChannel(channel *ch) { //保證eventloop所線上程就是其所屬的執行緒 assertInLoopThread(); LOG_TRACE<<"fd= "<<ch->fd()<<" events= "<<ch->events(); //更新m_pollfds陣列,程式碼很長是因為加了很多斷言 //先判斷ch這個channel是否已經在m_channels中,如果不在 index()<0 if(ch->index()<0) { //ch->index()==-1,說明這個channel對應的套接字不在m_pollfds中,新增 assert(m_channels.find(ch->fd())==m_channels.end()); //新建一個pollfd struct pollfd pfd; pfd.fd=ch->fd(); pfd.events=static_cast<short>(ch->events()); pfd.revents=0; //加入到m_pollfds中 m_pollfds.push_back(pfd); int idx=static_cast<int>(m_pollfds.size())-1; ch->set_index(idx); m_channels[pfd.fd]=ch; }else { //channel對應的套接字在m_pollfds中,修改 assert(m_channels.find(ch->fd())!=m_channels.end()); assert(m_channels[ch->fd()]==ch); int idx=ch->index(); assert(0<=idx && idx<static_cast<int>(m_pollfds.size())); //修改m_pollfds中的這個pollfd struct pollfd pfd=m_pollfds[idx]; assert(pfd.fd==ch->fd() || pfd.fd == -1); pfd.events=ch->events(); pfd.revents=0; if(ch->isNoneEvent()) pfd.fd=-1; } } //便利,_pollfds,找出當前活動(發生網路事件)的套接字,把它相對應的channel加入到 //activeChannels. //這個時候poll和epoll的區別就體現出來了 //poll需要輪尋pollfd陣列,而epoll直接返回發生了網路事件的epollfd陣列,不需要輪尋 void poller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const { //輪尋pollfd陣列,找到發生網路事件的套接字,為其分配channel加入到activeChannel for(PollFdList::const_iterator pfd=m_pollfds.begin(); pfd!=m_pollfds.end() && numEvents>0;pfd++) { numEvents--; //總共就numEvents個套接字,為0就提前退出 ChannelMap::const_iterator ch=m_channels.find(pfd->fd); //保證在m_channel中能夠找到這個套接字對應的channel assert(ch!=m_channels.end()); channel* chan=ch->second; assert(chan->fd()==pfd->fd); chan->set_revents(pfd->revents); //設定channel發生的網路事件 activeChannels->push_back(chan); //新增到acticeChannels } } }//namespace net }//namespace mymuduo
eventloop類:
之前說過這個類了,現在需要對這個類新增一些功能,首先eventloop中要有一個poller物件來實現poller::poll()操作,還要有一個channel的集合用於呼叫channel::handleEvent()處理網路事件。
eventloop.h
#ifndef EVENTLOOP_H #define EVENTLOOP_H #include<pthread.h> #include"base/noncopyable.h" #include<vector> #include<memory> namespace mymuduo { namespace net { class channel; class poller; class eventloop:noncopyable { public: eventloop(); ~eventloop(); //eventloop的核心函式,用於不斷迴圈,在其中呼叫poller::poll()用於獲取發生的網路事件 void loop(); //停止loop() void quit(); //更新channel,實際上呼叫了poller::updatechannel,更新poller的m_pollfds陣列 void updateChannel(channel* ch); //斷言,eventloop所屬於的執行緒ID就是當前執行緒的ID void assertInLoopThread(); //判斷是否eventloop所屬於的執行緒ID就是當先執行緒的ID bool isInLoopThread() const; //獲得當前執行緒的那個eventloop* eventloop* getEventLoopOfCurrentThread(); private: //LOG_FATAL void abortNotInLoopThread(); typedef std::vector<channel*> ChannelList; bool m_looping; //eventloop是否正在loop bool m_quit; //是否退出 const pid_t m_threadId; //eventloop所在的那個執行緒ID,要求one eventloop one thread std::unique_ptr<poller> m_poller; //用於在loop()中呼叫poller::poll() //在poller::poll()中返回的activeChannels,也就是每個網路事件對應的channel ChannelList m_activeChannels; }; }//namespace net }//namespace mymuduo #endif // EVENTLOOP_H
eventloop.cpp
#include "eventloop.h" #include"base/currentthread.h" #include"base/logging.h" #include"net/poller.h" #include"net/channel.h" #include<poll.h> namespace mymuduo { namespace net { //每個執行緒都有一個t_loopInThisThread,表示當前執行緒擁有的那個eventloop* __thread eventloop* t_loopInThisThread=0; const int kPollTimeMs = 10*1000; //預設poller->poll()阻塞時間 //建構函式,初始化成員,設定當前執行緒的t_loopInThisThread為this eventloop::eventloop() :m_looping(false),m_threadId(currentthread::tid()), m_poller(new poller(this)),m_quit(false),m_activeChannels() { LOG_TRACE<<"eventloop created "<<this<<" in thread "<<m_threadId; //判斷當前執行緒是否已經存在eventloop了,保證最多隻有一個 if(t_loopInThisThread) //多建立eventloop報錯 LOG_FATAL<<"another eventloop "<<this<<" exits in this thread "<<m_threadId; else t_loopInThisThread=this; } eventloop::~eventloop() { assert(!m_looping); t_loopInThisThread=NULL; //刪除當前執行緒的那個eventlloop*,指向NULL } void eventloop::loop() { assert(!m_looping); assertInLoopThread(); m_looping=true; m_quit=false; while(!m_quit) { //poller::poll()得到發生的網路事件,為每個網路事件分配一個channel去處理 //最後把所有的channel都加入到m_activeChannel中 m_activeChannels.clear(); m_poller->poll(kPollTimeMs,&m_activeChannels); //對每一個channel,讓其呼叫handleEvent()去處理網路事件 for(ChannelList::const_iterator it=m_activeChannels.begin(); it!=m_activeChannels.end();it++) (*it)->handleEvent(); } LOG_TRACE<<"eventloop "<<this<<" stop looping"; m_looping=false; } //停止eventloop::loop() void eventloop::quit() { m_quit=true; } //更新channel,實際上呼叫了poller::updatechannel,更新poller的m_pollfds陣列 void eventloop::updateChannel(channel *ch) { m_poller->updateChannel(ch); } //斷言,當前執行緒就是eventloop所在的執行緒 void eventloop::assertInLoopThread() { if(!isInLoopThread()) abortNotInLoopThread(); } //... bool eventloop::isInLoopThread() const { return m_threadId==currentthread::tid(); } //獲取當前執行緒的那個eventloop* eventloop* eventloop::getEventLoopOfCurrentThread() { return t_loopInThisThread; } //LOG_FATAL,錯誤:eventloop所屬執行緒不是當前執行緒 void eventloop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << m_threadId << ", current thread id = " << currentthread::tid(); } }//namespace net }//namespace mymuduo
測試:
已經完成了這三個類最基本的功能了,我們就來實現一個最精簡的Reactor模型。
#include "net/eventloop.h" #include"net/channel.h" #include"base/logging.h" #include"base/thread.h" #include <assert.h> #include <stdio.h> #include <unistd.h> #include<sys/timerfd.h> using namespace mymuduo; using namespace mymuduo::net; eventloop* g_loop; void timeout() { printf("timeout! poller::poll()返回\n"); g_loop->quit(); //停止eventloop::loop() } int main() { mymuduo::logger::setLogLevel(mymuduo::logger::TRACE); eventloop loop; g_loop=&loop; //設定一個定時器fd,定時器觸發式會在timerfd上傳送一個數據 int timerfd=::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK|TFD_CLOEXEC); channel ch(g_loop,timerfd); //把timerfd繫結到ch上 ch.setReadCallback(timeout); //處理讀事件時呼叫timeout函式 ch.enableReading(); //ch註冊讀事件,並用update把ch更新到poller中的m_pollfds //設定定時器fd在5s後觸發,此時m_pollfds中有timerfd, //之後呼叫poller::poll(),這個事件會被poll()捕捉到返回, //然後呼叫channel::handleEvent()處理該事件 //由於是讀事件,Channel會呼叫readCallback也就是timeout來處理 struct itimerspec howlong; bzero(&howlong,sizeof howlong); howlong.it_value.tv_sec=5; ::timerfd_settime(timerfd,0,&howlong,NULL); loop.loop(); ::close(timerfd); }
列印結果:
2020年09月01日 星期2 17:50:42.1598953842 105123 TRACE eventloop eventloop created 0x7FFF5DE109C0 in thread 105123 - eventloop.cpp:25
2020年09月01日 星期2 17:50:42.1598953842 105123 TRACE updateChannel fd= 3 events= 3 - poller.cpp:44
2020年09月01日 星期2 17:50:47.1598953847 105123 TRACE poll 1 events happended - poller.cpp:22
timeout! poller::poll()返回
2020年09月01日 星期2 17:50:47.1598953847 105123 TRACE loop eventloop 0x7FFF5DE109C0 stop looping - eventloop.cpp:58