1. 程式人生 > 實用技巧 >muduo原始碼解析24-網路庫2:Reactor關鍵結構channel,poller和eventloop

muduo原始碼解析24-網路庫2:Reactor關鍵結構channel,poller和eventloop

簡介:

用eventloop,poller和channel共同完成一個最簡單的reactor模型。

注意本文超級長(介紹了三個類,channel,poller和eventloop,用他們實現一個最基本的reactor模型

//這是一個正常的IO複用模型結構,以poll為例子

while
(1) { poll(); //等待網路事件的發生 handleEvent();   //處理各個網路事件 }

根據上面的小例子來簡單說一下這三個類都是幹嘛的:

eventloop之前說過了,是封裝這整個迴圈體while(1){},而poller看名字就猜得出來是對 poll/epoll的封裝,channel則是對handleEvent()的封裝。

雖然說得很精簡,不過的確是這幾個類最核心的功能。

下面來具體看看各個類的實現:

channel類:

說明:

正常IO複用流程如下:(利用poll)

while(1)
{
poll();    //此時等待,有網路事件發生了就返回
handleEvent();   //這一步通過channel來實現
}

  當呼叫 ::poll() 獲取到發生網路事件的套接字時,channel的任務就是負責處理這些網路事件,為每一個套接字建立一個channel用於處理這個套接字上發生的網路事件。

根據muduo書上內容,有以下的原則:

每個channel物件自始至終只屬於一個eventloop,因此每個channel物件都只屬於某一個IO執行緒

每個channel物件自始至終只負責一個檔案描述符(fd)的IO事件分發,channel不擁有這個fd,析構也不關閉這個fd
channel會把不同的IO事件分發成不同的回撥,例如ReadCallback,WriteCallback等
muduo使用者一般不直接使用channel,而會使用更上層的封裝,例如TcpConnection(以後瞭解)
channel的生命期由其owner class 負責管理,一般是其他class直接或間接成員
channel的成員函式都只能在IO執行緒呼叫,因此更新其資料成員都不必加鎖

使用者一般只用set*Callback(),和enable*這幾個函式用來設定回撥函式和註冊網路事件

瞭解channel作用是處理網路事件之後,下面是具體的channel實現:

channel.h

#ifndef CHANNEL_H
#define CHANNEL_H

#include"base/noncopyable.h"
#include<functional>

namespace mymuduo {

namespace net {

class eventloop;

class channel:noncopyable
{
public:
  typedef std::function<void()> EventCallback;
    channel(eventloop* loop,int fd);

    void handleEvent();
    //設定回撥函式,read,write,error Callback函式,在處理event時被呼叫
    void setReadCallback(EventCallback cb)
    { m_readCallback = std::move(cb); }
    void setWriteCallback(EventCallback cb)
    { m_writeCallback = std::move(cb); }
    void setErrorCallback(EventCallback cb)
    { m_errorCallback = std::move(cb); }

    int fd()const {return m_fd;}    //channel所負責IO事件的那個fd
    //返回當前channel所註冊的網路事件
    int events() const{return m_events;}
    void set_revents(int revt){m_revents=revt;} //設定網路事件
    //判斷當前channel是否註冊了事件
    bool isNoneEvent() const{return m_events==kNoneEvent;}

    //在m_event上註冊讀/寫事件
    void enableReading(){m_events|=kReadEvent;update();}
    void enableWriting(){m_events|=kWriteEvent;update();}
    //在m_event上取消讀/寫事件
    void disableReading(){m_events&=~kReadEvent;update();}
    void disableWriting(){m_events&=~kWriteEvent;update();}

    //取消m_event所有事件
    void disableAll(){m_events=kNoneEvent;update();}

    //判斷m_event是否註冊了讀/寫事件
    bool isWriting() const{return m_events & kWriteEvent;}
    bool isReading() const{return m_events & kWriteEvent;}

    //for poller
    int index(){return m_index;}
    void set_index(int idx){m_index=idx;}

    //返回當前channel所在的那個eventloop
    eventloop* ownerLoop(){return m_loop;}

private:

//讓本channel 所屬於的那個eventloop回撥channel::update()完成channel的更新
    void update();
    //這三個靜態常量分別表示:無網路事件,讀網路事件,寫網路事件
    static const int kNoneEvent;
    static const int kReadEvent;
    static const int kWriteEvent;

    eventloop* m_loop;  //channel所屬的那個eventloop
    const int m_fd;     //每個channel負責處理一個sockfd上的網路事件
    int m_events;       //channel註冊(要監聽)的網路事件
    int m_revents;      //poll()返回的網路事件,具體發生的事件
    int m_index; //這個channel在poller中m_pollfds中的序號,預設-1表示不在其中

    //當發生了讀/寫/錯誤網路事件時,下面三個函式會被呼叫
    EventCallback m_readCallback;
    EventCallback m_writeCallback;
    EventCallback m_errorCallback;

};

}//namespace net

}//namespace mymuduo

#endif // CHANNEL_H

channel.cpp

#include "channel.h"

#include"base/logging.h"
#include"net/channel.h"
#include"net/eventloop.h"

#include<sstream>

#include<poll.h>


namespace mymuduo {

namespace net {

const int channel::kNoneEvent=0;
const int channel::kReadEvent=POLLIN|POLLPRI;
const int channel::kWriteEvent=POLLOUT;

channel::channel(eventloop* loop,int fd)
    :m_loop(loop),m_fd(fd),m_events(0),m_revents(0),m_index(-1)
{

}

//此時eventloop::loop()中poll函式返回,說明有網路事件發生了,
//針對網路事件型別進行相應的處理
void channel::handleEvent()
{
    //處理 網路事件 POLLNVAL
    if(m_revents & POLLNVAL)
        LOG_WARN<<"channel::handle_event() POLLNVAL";
    //處理 網路事件 錯誤
    if(m_revents & (POLLERR|POLLNVAL))
        if(m_errorCallback)
            m_errorCallback();
    //處理 網路事件 read
    if(m_revents & (POLLIN |POLLPRI|POLLRDHUP))
        if(m_readCallback)
            m_readCallback();
    //處理 網路事件 write
    if(m_revents & POLLOUT)
        if(m_writeCallback)
            m_writeCallback();
}

//讓本channel 所屬於的那個eventloop回撥channel::update()完成channel的更新
void channel::update()
{
    m_loop->updateChannel(this);
}


}//namespace net

}//namespace mymuduo

時序流程:

eventloop::loop()

  {  在loop()迴圈中

   ::poll(); 獲取網路事件,併為每一個發生網路事件的套接字建立一個channel。

  使用channel來處理具體的網路事件:

  channel::handleEvent()--->

    {

    讀網路事件:處理讀;

    寫網路事件:處理寫;

    錯誤事件:  處理錯誤;

    }

  }

poller類:

說明:

  是對IO複用模型的封裝例如poll和epoll,poller是屬於eventloop的,因此會在eventloop::loop()中呼叫

poller::poll(),來獲取發生了網路事件的套接字。

  應當注意,poller內部擁有一個pollfd集合,用於儲存poll()要監聽的那個套接字集合,另外有一個m_channels用於儲存每個套接字fd到其所對應的channel之間的對映。可以說,關於套接字集合和channel集合的一些資料都儲存在poller中,他們的更新也通過poller來管理。

poller.h:

#ifndef POLLER_H
#define POLLER_H

#include<map>
#include<vector>

#include"base/timestamp.h"
#include"net/eventloop.h"

struct pollfd;  //在.cpp檔案包含<poll.h>

namespace mymuduo {

namespace net {

class channel;

class poller:noncopyable
{
public:
    typedef std::vector<channel*> ChannelList;
    //建構函式,指定poller所屬的那個eventloop物件
    poller(eventloop* loop):m_ownerloop(loop){}
    ~poller()=default;

    //作為poller函式的核心,eventloop在loop()中呼叫poll()函式獲得當前活動的IO事件
    //然後填充eventloop生成的所有channel到ChannelList中,儲存所有的channel資訊
    timestamp poll(int timeoutMs,ChannelList* activeChannels);

    //負責更新把channel更新到m_pollfds
    void updateChannel(channel* ch);

    //保證eventloop所在的執行緒為當前執行緒
    void assertInLoopThread() const
    {
        m_ownerloop->assertInLoopThread();
    }

private:

    void fillActiveChannels(int numEvents,ChannelList* activeChannels)const;

    typedef std::vector<struct pollfd> PollFdList;
    typedef std::map<int,channel*> ChannelMap;
    ChannelMap m_channels;  //儲存從fd到Channel*的對映.
    PollFdList m_pollfds;   //pollfd集合,使用者儲存所有客戶機套接字資訊
    eventloop* m_ownerloop; //poller所屬於的那個eventloop物件
};

}//namespace net

}//namespace mymuduo

#endif // POLLER_H

poller.cpp:

#include "poller.h"

#include"base/logging.h"
#include"net/channel.h"

#include<poll.h>

namespace mymuduo{

namespace net {

//呼叫::poll()獲取發生了網路事件的套接字,併為每一個網路事件分配一個channel用於處理
//函式返回當前::poll()返回的時間戳
timestamp poller::poll(int timeoutMs, ChannelList *activeChannels)
{
    int numEvents=::poll(&*m_pollfds.begin(),m_pollfds.size(),timeoutMs);
    timestamp now(timestamp::now());

    //此時poll返回,numEvents個套接字上發生了網路事件
    if(numEvents>0)
    {
        LOG_TRACE<<numEvents<<" events happended";
        //在acticeChannels中新增numThreads個channel,每個事件分配一個channel
        fillActiveChannels(numEvents,activeChannels);
    }else if(numEvents==0)
    {
        LOG_TRACE<<"nothing happended";
    }
    else
        LOG_SYSERR<<"poller::poll()";

    return now;
}

//主要功能是負責維護和更新pollfd陣列,
//傳入一個channel進來,我們得到ch->index()判斷它在m_pollfds中的位置,如果不存在
//需要在m_pollfds中新加一個pollfd
//如果存在直接更新即可
void poller::updateChannel(channel *ch)
{
    //保證eventloop所線上程就是其所屬的執行緒
    assertInLoopThread();
    LOG_TRACE<<"fd= "<<ch->fd()<<" events= "<<ch->events();

    //更新m_pollfds陣列,程式碼很長是因為加了很多斷言
    //先判斷ch這個channel是否已經在m_channels中,如果不在 index()<0
    if(ch->index()<0)
    {
        //ch->index()==-1,說明這個channel對應的套接字不在m_pollfds中,新增
        assert(m_channels.find(ch->fd())==m_channels.end());
        //新建一個pollfd
        struct pollfd pfd;
        pfd.fd=ch->fd();
        pfd.events=static_cast<short>(ch->events());
        pfd.revents=0;
        //加入到m_pollfds中
        m_pollfds.push_back(pfd);
        int idx=static_cast<int>(m_pollfds.size())-1;
        ch->set_index(idx);
        m_channels[pfd.fd]=ch;
    }else
    {
        //channel對應的套接字在m_pollfds中,修改
        assert(m_channels.find(ch->fd())!=m_channels.end());
        assert(m_channels[ch->fd()]==ch);
        int idx=ch->index();
        assert(0<=idx && idx<static_cast<int>(m_pollfds.size()));
        //修改m_pollfds中的這個pollfd
        struct pollfd pfd=m_pollfds[idx];
        assert(pfd.fd==ch->fd() || pfd.fd == -1);
        pfd.events=ch->events();
        pfd.revents=0;
        if(ch->isNoneEvent())
            pfd.fd=-1;
    }
}

//便利,_pollfds,找出當前活動(發生網路事件)的套接字,把它相對應的channel加入到
//activeChannels.
//這個時候poll和epoll的區別就體現出來了
//poll需要輪尋pollfd陣列,而epoll直接返回發生了網路事件的epollfd陣列,不需要輪尋
void poller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
{
    //輪尋pollfd陣列,找到發生網路事件的套接字,為其分配channel加入到activeChannel
    for(PollFdList::const_iterator pfd=m_pollfds.begin();
        pfd!=m_pollfds.end() && numEvents>0;pfd++)
    {
        numEvents--;    //總共就numEvents個套接字,為0就提前退出
        ChannelMap::const_iterator ch=m_channels.find(pfd->fd);
        //保證在m_channel中能夠找到這個套接字對應的channel
        assert(ch!=m_channels.end());
        channel* chan=ch->second;
        assert(chan->fd()==pfd->fd);
        chan->set_revents(pfd->revents);    //設定channel發生的網路事件
        activeChannels->push_back(chan);    //新增到acticeChannels
    }
}

}//namespace net

}//namespace mymuduo

eventloop類:

  之前說過這個類了,現在需要對這個類新增一些功能,首先eventloop中要有一個poller物件來實現poller::poll()操作,還要有一個channel的集合用於呼叫channel::handleEvent()處理網路事件。

eventloop.h

#ifndef EVENTLOOP_H
#define EVENTLOOP_H

#include<pthread.h>

#include"base/noncopyable.h"

#include<vector>
#include<memory>

namespace mymuduo
{
namespace net {

class channel;
class poller;

class eventloop:noncopyable
{
public:
    eventloop();
    ~eventloop();
    //eventloop的核心函式,用於不斷迴圈,在其中呼叫poller::poll()用於獲取發生的網路事件
    void loop();
    //停止loop()
    void quit();


    //更新channel,實際上呼叫了poller::updatechannel,更新poller的m_pollfds陣列
    void updateChannel(channel* ch);

    //斷言,eventloop所屬於的執行緒ID就是當前執行緒的ID
    void assertInLoopThread();
    //判斷是否eventloop所屬於的執行緒ID就是當先執行緒的ID
    bool isInLoopThread() const;

    //獲得當前執行緒的那個eventloop*
    eventloop* getEventLoopOfCurrentThread();

private:

    //LOG_FATAL
    void abortNotInLoopThread();
    typedef std::vector<channel*> ChannelList;

    bool m_looping;     //eventloop是否正在loop
    bool m_quit;        //是否退出
    const pid_t m_threadId; //eventloop所在的那個執行緒ID,要求one eventloop one thread
    std::unique_ptr<poller> m_poller;   //用於在loop()中呼叫poller::poll()
    //在poller::poll()中返回的activeChannels,也就是每個網路事件對應的channel
    ChannelList m_activeChannels;

};

}//namespace net

}//namespace mymuduo



#endif // EVENTLOOP_H

eventloop.cpp

#include "eventloop.h"

#include"base/currentthread.h"
#include"base/logging.h"
#include"net/poller.h"
#include"net/channel.h"


#include<poll.h>


namespace mymuduo {

namespace net {

//每個執行緒都有一個t_loopInThisThread,表示當前執行緒擁有的那個eventloop*
__thread eventloop* t_loopInThisThread=0;
const int kPollTimeMs = 10*1000;    //預設poller->poll()阻塞時間

//建構函式,初始化成員,設定當前執行緒的t_loopInThisThread為this
eventloop::eventloop()
    :m_looping(false),m_threadId(currentthread::tid()),
      m_poller(new poller(this)),m_quit(false),m_activeChannels()
{
    LOG_TRACE<<"eventloop created "<<this<<" in thread "<<m_threadId;
    //判斷當前執行緒是否已經存在eventloop了,保證最多隻有一個
    if(t_loopInThisThread)  //多建立eventloop報錯
        LOG_FATAL<<"another eventloop "<<this<<" exits in this thread "<<m_threadId;
    else
        t_loopInThisThread=this;
}

eventloop::~eventloop()
{
    assert(!m_looping);
    t_loopInThisThread=NULL;    //刪除當前執行緒的那個eventlloop*,指向NULL
}

void eventloop::loop()
{
    assert(!m_looping);
    assertInLoopThread();
    m_looping=true;
    m_quit=false;

    while(!m_quit)
    {
        //poller::poll()得到發生的網路事件,為每個網路事件分配一個channel去處理
        //最後把所有的channel都加入到m_activeChannel中
        m_activeChannels.clear();
        m_poller->poll(kPollTimeMs,&m_activeChannels);

        //對每一個channel,讓其呼叫handleEvent()去處理網路事件
        for(ChannelList::const_iterator it=m_activeChannels.begin();
            it!=m_activeChannels.end();it++)
            (*it)->handleEvent();
    }
    LOG_TRACE<<"eventloop "<<this<<" stop looping";
    m_looping=false;
}

//停止eventloop::loop()
void eventloop::quit()
{
    m_quit=true;
}

//更新channel,實際上呼叫了poller::updatechannel,更新poller的m_pollfds陣列
void eventloop::updateChannel(channel *ch)
{
    m_poller->updateChannel(ch);
}

//斷言,當前執行緒就是eventloop所在的執行緒
void eventloop::assertInLoopThread()
{
    if(!isInLoopThread())
        abortNotInLoopThread();
}
//...
bool eventloop::isInLoopThread() const
{
    return m_threadId==currentthread::tid();
}

//獲取當前執行緒的那個eventloop*
eventloop* eventloop::getEventLoopOfCurrentThread()
{
    return t_loopInThisThread;
}

//LOG_FATAL,錯誤:eventloop所屬執行緒不是當前執行緒
void eventloop::abortNotInLoopThread()
{
    LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
              << " was created in threadId_ = " << m_threadId
              << ", current thread id = " <<  currentthread::tid();
}


}//namespace net

}//namespace mymuduo

測試:

已經完成了這三個類最基本的功能了,我們就來實現一個最精簡的Reactor模型。

#include "net/eventloop.h"
#include"net/channel.h"
#include"base/logging.h"
#include"base/thread.h"

#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include<sys/timerfd.h>


using namespace mymuduo;
using namespace mymuduo::net;

eventloop* g_loop;

void timeout()
{
    printf("timeout! poller::poll()返回\n");
    g_loop->quit(); //停止eventloop::loop()
}

int main()
{
    mymuduo::logger::setLogLevel(mymuduo::logger::TRACE);

    eventloop loop;
    g_loop=&loop;

    //設定一個定時器fd,定時器觸發式會在timerfd上傳送一個數據
    int timerfd=::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK|TFD_CLOEXEC);
    channel ch(g_loop,timerfd); //把timerfd繫結到ch上
    ch.setReadCallback(timeout);  //處理讀事件時呼叫timeout函式
    ch.enableReading(); //ch註冊讀事件,並用update把ch更新到poller中的m_pollfds

    //設定定時器fd在5s後觸發,此時m_pollfds中有timerfd,
    //之後呼叫poller::poll(),這個事件會被poll()捕捉到返回,
    //然後呼叫channel::handleEvent()處理該事件
    //由於是讀事件,Channel會呼叫readCallback也就是timeout來處理
    struct itimerspec howlong;
    bzero(&howlong,sizeof howlong);
    howlong.it_value.tv_sec=5;
    ::timerfd_settime(timerfd,0,&howlong,NULL);

    loop.loop();

    ::close(timerfd);


}

列印結果:

2020年09月01日 星期2 17:50:42.1598953842 105123 TRACE eventloop eventloop created 0x7FFF5DE109C0 in thread 105123 - eventloop.cpp:25
2020年09月01日 星期2 17:50:42.1598953842 105123 TRACE updateChannel fd= 3 events= 3 - poller.cpp:44
2020年09月01日 星期2 17:50:47.1598953847 105123 TRACE poll 1 events happended - poller.cpp:22
timeout! poller::poll()返回
2020年09月01日 星期2 17:50:47.1598953847 105123 TRACE loop eventloop 0x7FFF5DE109C0 stop looping - eventloop.cpp:58