#muduo學習筆記(二)Reactor關鍵結構
目錄
muduo學習筆記(二)Reactor關鍵結構
Reactor簡述
什麼是Reactor
Reactor
是一種基於事件驅動的設計模式,即通過回撥機制,我們將事件的介面註冊到Reactor上,當事件發生之後,就會回撥註冊的介面。
Reactor必要的幾個元件
:
Event Multiplexer事件分發器
:即一些I/O複用機制select、poll、epoll等.程式將事件源註冊到分發器上,等待事件的觸發,做相應處理.
Handle事件源
:用於標識一個事件,Linux上是檔案描述符.
Reactor反應器
:用於管理事件的排程及註冊刪除.當有啟用的事件時,則呼叫回撥函式處理,沒有則繼續事件迴圈.
event handler事件處理器
:管理已註冊事件和的排程,分成不同型別的事件(讀/寫,定時)當事件發生,呼叫對應的回撥函式處理.
Reactor模型的優缺點
優點
1)響應快,不必為單個同步時間所阻塞,雖然Reactor本身依然是同步的;
2)程式設計相對簡單,可以最大程度的避免複雜的多執行緒及同步問題,並且避免了多執行緒/程序的切換開銷;
3)可擴充套件性,可以方便的通過增加Reactor例項個數來充分利用CPU資源;
4)可複用性,reactor框架本身與具體事件處理邏輯無關,具有很高的複用性;
缺點
Reactor模式在IO讀寫資料時還是在同一個執行緒中實現的,即使使用多個Reactor機制的情況下,那些共享一個Reactor的Channel如果出現一個長時間的資料讀寫,會影響這個Reactor中其他Channel的相應時間,比如在大檔案傳輸時,IO操作就會影響其他Client的相應時間,因而對這種操作,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用Proactor模式。
poll簡述
poll的使用方法與select相似,輪詢多個檔案描述符,有讀寫時設定相應的狀態位,poll相比select優在沒有最大檔案描述符數量的限制.
# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
struct pollfd {
int fd; /* 檔案描述符 */
short events; /* 等待的事件 */
short revents; /* 實際發生了的事件 */
} ;
每一個pollfd結構體指定了一個被監視的檔案描述符,可以傳遞多個結構體,指示poll()監視多個檔案描述符。每個結構體的events域是監視該檔案描述符的事件掩碼,由使用者來設定這個域。revents域是檔案描述符的操作結果事件掩碼,核心在呼叫返回時設定這個域。events域中請求的任何事件都可能在revents域中返回。合法的事件如下:
POLLIN 有資料可讀。
POLLRDNORM 有普通資料可讀。
POLLRDBAND 有優先資料可讀。
POLLPRI 有緊迫資料可讀。
POLLOUT 寫資料不會導致阻塞。
POLLWRNORM 寫普通資料不會導致阻塞。
POLLWRBAND 寫優先資料不會導致阻塞。
POLLMSGSIGPOLL 訊息可用。
poll使用樣例
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include <poll.h>
#define MAX_BUFFER_SIZE 1024
#define IN_FILES 1
#define MAX(a,b) ((a>b)?(a):(b))
int main(int argc ,char **argv)
{
struct pollfd fds[3];
char buf[1024];
int i,res,real_read, maxfd;
if((fds[0].fd=open("/dev/stdin",O_RDONLY|O_NONBLOCK)) < 0)
{
fprintf(stderr,"open data1 error:%s",strerror(errno));
return 1;
}
for (i = 0; i < IN_FILES; i++)
{
fds[i].events = POLLIN | POLLPRI;
}
while(1) //|| fds[1].events || fds[2].events)
{
int ret = poll(fds, 1, 1000);
if (ret < 0)
{
printf("Poll error : %s\n",strerror(errno));
return 1;
}
if(ret == 0){
printf("Poll timeout\n");
continue;
}
for (i = 0; i< 1; i++)
{
if (fds[i].revents)
{
memset(buf, 0, MAX_BUFFER_SIZE);
real_read = read(fds[i].fd, buf, MAX_BUFFER_SIZE);
if (real_read < 0)
{
if (errno != EAGAIN)
{
printf("read eror : %s\n",strerror(errno));
continue;
}
}
else if (!real_read)
{
close(fds[i].fd);
fds[i].events = 0;
}
else
{
if (i == 0)
{
buf[real_read] = '\0';
printf("%s", buf);
if ((buf[0] == 'q') || (buf[0] == 'Q'))
{
printf("quit\n");
return 1;
}
}
else
{
buf[real_read] = '\0';
printf("%s", buf);
}
}
}
}
}
exit(0);
}
muduo Reactor關鍵結構
muduo Reactor最核心的事件分發機制, 即將IO multiplexing拿到的IO事件分發給各個檔案描述符(fd)的事件處理函式。
Channel
Chanel目前我對它的理解是,它負責管理一個檔案描述符(file descript)IO事件.
Channel會封裝C的poll,把不同的IO事件分發到不同的回撥:ReadCallBack、WriteCallBack等
每個Channel物件自始至終只屬於一個EventLoop,因此每個Channel物件都只屬於某一個IO執行緒。 每個Channel物件自始至終只負責一個檔案描述符(fd) 的IO事件分發
#ifndef NET_CHANNEL_H
#define NET_CHANNEL_H
#include <functional>
#include "EventLoop.hh"
class Channel {
public:
typedef std::function<void()> EventCallBack;
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent();
void setReadCallBack(const EventCallBack& cb) { m_readCallBack = cb; }
void setWriteCallBack(const EventCallBack& cb) { m_writeCallBack = cb; }
void setErrorCallBack(const EventCallBack& cb) { m_errorCallBack = cb; }
int fd() const { return m_fd; }
int events() const { return m_events; }
void set_revents(int revt) { m_revents = revt; }
bool isNoneEvent() const { return m_events == kNoneEvent; }
void eableReading() { m_events |= kReadEvent; update(); }
int index() { return m_index; }
void set_index(int idx) { m_index =idx; }
EventLoop* ownerLoop() { return m_pLoop; }
private:
Channel& operator=(const Channel&);
Channel(const Channel&);
void update();
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* m_pLoop;
const int m_fd;
int m_events; // 等待的事件
int m_revents; // 實際發生了的事件
int m_index;
EventCallBack m_readCallBack;
EventCallBack m_writeCallBack;
EventCallBack m_errorCallBack;
};
#endif
//Channel.cpp
#include <poll.h>
#include "Channel.hh"
#include "Logger.hh"
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
Channel::Channel(EventLoop* loop, int fd)
: m_pLoop(loop),
m_fd(fd),
m_events(0),
m_revents(0),
m_index(-1)
{
}
Channel::~Channel()
{
}
void Channel::update()
{
m_pLoop->updateChannel(this);
}
void Channel::handleEvent()
{
if(m_revents & POLLNVAL)
{
LOG_WARN << "Channel::handleEvent() POLLNVAL";
}
if(m_revents & (POLLERR | POLLNVAL)){
if(m_errorCallBack) m_errorCallBack();
}
if(m_revents & (POLLIN | POLLPRI | POLLRDHUP)){
if(m_readCallBack) m_readCallBack();
}
if(m_revents & POLLOUT){
if(m_writeCallBack) m_writeCallBack();
}
}
值得一提的就是 Channel::update()它會呼叫EventLoop::updateChannel(), 後者會轉而調
用Poller::updateChannel()。Poller物件下面會講,通過Poller::updateChannel()註冊IO事件(即file descript).
Channel::handleEvent()是Channel的核心, 它由EventLoop::loop()調
用, 它的功能是根據revents發生事件的的值分別呼叫不同的使用者回撥。 這個函式以後還會擴充。
Poller
Poller class是IO multiplexing的封裝。 它現在是個具體類,而在muduo中是個抽象基類,因為muduo同時支援poll(2)和epoll(4)兩種IOmultiplexing機制。
Poller是EventLoop的間接成員,只供其自己在EventLoop的IO執行緒中呼叫,因此無須加鎖。其生命期與EventLoop相等。
Poller並不擁有管理檔案描述符事件的Channel, Channel在析構之前必須自己
unregister(EventLoop::removeChannel()) , 避免空懸指標
#ifndef _NET_POLLER_HH
#define _NET_POLLER_HH
#include <vector>
#include <map>
#include "TimeStamp.hh"
#include "EventLoop.hh"
#include "Channel.hh"
struct pollfd;
class Poller{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
~Poller();
TimeStamp poll(int timeoutMs, ChannelList* activeChannels);
void updateChannel(Channel* channel);
void assertInLoopThread() { m_pOwerLoop->assertInLoopThread(); }
private:
Poller& operator=(const Poller&);
Poller(const Poller&);
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel*> ChannelMap;
EventLoop* m_pOwerLoop;
PollFdList m_pollfds;
ChannelMap m_channels;
};
#endif
//Poller.cpp
#include "Poller.hh"
#include "Logger.hh"
#include <assert.h>
#include <poll.h>
#include <signal.h>
Poller::Poller(EventLoop* loop)
: m_pOwerLoop(loop)
{
}
Poller::~Poller()
{
}
TimeStamp Poller::poll(int timeoutMs, ChannelList* activeChannels)
{
LOG_TRACE << "Poller::poll()";
int numEvents = ::poll(/*&*m_pollfds.begin()*/m_pollfds.data(), m_pollfds.size(), timeoutMs);
TimeStamp now(TimeStamp::now());
if(numEvents > 0){
LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
}
else if(numEvents == 0){
LOG_TRACE << " nothing happended";
}
else{
LOG_SYSERR << "Poller::poll()";
}
return now;
}
/*
*fillActiveChannels()遍歷m_pollfds, 找出有活動事件的fd, 把它對應
*的Channel填入activeChannels。
*/
void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
for(PollFdList::const_iterator pfd = m_pollfds.begin();
pfd != m_pollfds.end() && numEvents > 0; ++pfd)
{
if(pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = m_channels.find(pfd->fd);
assert(ch != m_channels.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
activeChannels->push_back(channel);
}
}
}
void Poller::updateChannel(Channel* channel)
{
assertInLoopThread();
LOG_TRACE << "fd= " << channel->fd() << " events" << channel->events();
if(channel->index() < 0){
//a new one , add to pollfds
assert(m_channels.find(channel->fd()) == m_channels.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
m_pollfds.push_back(pfd);
int idx = static_cast<int>(m_pollfds.size()) - 1;
channel->set_index(idx);
m_channels[pfd.fd] = channel;
}
else{
//update existing one
assert(m_channels.find(channel->fd()) != m_channels.end());
assert(m_channels[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(m_pollfds.size()));
struct pollfd& pfd = m_pollfds[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if(channel->isNoneEvent()){
//ignore this pollfd
pfd.fd = -1;
}
}
}
EventLoop
EventLopp在上一篇文章寫過,這裡給出改動.
EventLoop 新增了quit()成員函式, 還加了幾個資料成員,並在建構函式裡初始化它們。注意EventLoop通過智慧指標來間接持有poller.
+class Poller;
+class Channel;
class EventLoop
------------
bool isInloopThread() const {return m_threadId == CurrentThread::tid(); }
+void quit();
+void updateChannel(Channel* channel);
static EventLoop* getEventLoopOfCurrentThread();
private:
EventLoop& operator=(const EventLoop&);
EventLoop(const EventLoop&);
void abortNotInLoopThread();
+typedef std::vector<Channel*> ChannelList;
bool m_looping;
+bool m_quit;
const pid_t m_threadId;
+std::unique_ptr<Poller> m_poller;
+ChannelList m_activeChannels;
};
//EventLoop.cpp
m_threadId(CurrentThread::tid()),
+ m_poller(new Poller(this))
{
------
+void EventLoop::quit()
+{
+ m_quit = true;
+ //wakeup();
+}
+
+void EventLoop::updateChannel(Channel* channel)
+{
+ assert(channel->ownerLoop() == this);
+ assertInLoopThread();
+ m_poller->updateChannel(channel);
+}
上一篇文章的EventLoop->loop()什麼也沒做,現在它有了實實在在的使命,它呼叫Poller::poll()獲得當前活動事件的Chanel列表, 然後依次呼叫每個Channel的handleEvent()函式
void EventLoop::loop()
{
assert(!m_looping);
assertInLoopThread();
m_looping = true;
m_quit = false;
LOG_TRACE << "EventLoop " << this << " start loopig";
while(!m_quit)
{
m_activeChannels.clear();
m_poller->poll(1000, &m_activeChannels);
for(ChannelList::iterator it = m_activeChannels.begin();
it != m_activeChannels.end(); ++it)
{
(*it)->handleEvent();
}
}
LOG_TRACE << "EventLoop " << this << " stop loopig";
m_looping = false;
}
Reactor時序圖
測試程式-單次觸發的定時器
程式利用timerfd_create 把時間變成了一個檔案描述符,該“檔案”在定時器超時的那一刻變得可讀,這樣就能很方便地融入到 select/poll 框架中,用統一的方式來處理 IO 事件和超時事件,這也正是 Reactor 模式的長處。
#include <errno.h>
#include <thread>
#include <strings.h>
#include "EventLoop.hh"
#include "Channel.hh"
#include "Poller.hh"
//Reactor Test
//單次觸發定時器
#include <sys/timerfd.h>
EventLoop* g_loop;
void timeout()
{
printf("timeout!\n");
g_loop->quit();
}
int main()
{
EventLoop loop;
g_loop = &loop;
int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK |TFD_CLOEXEC);
Channel channel(&loop, timerfd);
channel.setReadCallBack(timeout);
channel.eableReading();
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 3;
timerfd_settime(timerfd, 0, &howlong, NULL);
loop.loop();
close(timerfd);
}
./test.out
2018-10-31 22:25:54.532487 [TRACE] [EventLoop.cpp:16] [EventLoop] EventLoop Create 0x7FFEB9567CC0 in thread 3075
2018-10-31 22:25:54.533563 [TRACE] [Poller.cpp:64] [updateChannel] fd= 3 events3
2018-10-31 22:25:54.534000 [TRACE] [EventLoop.cpp:41] [loop] EventLoop 0x7FFEB9567CC0 start loopig
2018-10-31 22:25:54.534334 [TRACE] [Poller.cpp:20] [poll] Poller::poll()
2018-10-31 22:25:55.535827 [TRACE] [Poller.cpp:28] [poll] nothing happended
2018-10-31 22:25:55.536287 [TRACE] [Poller.cpp:20] [poll] Poller::poll()
2018-10-31 22:25:56.538334 [TRACE] [Poller.cpp:28] [poll] nothing happended
2018-10-31 22:25:56.538802 [TRACE] [Poller.cpp:20] [poll] Poller::poll()
2018-10-31 22:25:57.534175 [TRACE] [Poller.cpp:24] [poll] 1 events happended
timeout!
2018-10-31 22:25:57.534766 [TRACE] [EventLoop.cpp:55] [loop] EventLoop 0x7FFEB9567CC0 stop loopig