reactor模式C++實現
阿新 • • 發佈:2019-01-03
copy from github上的一份實現。。。找不到連結了。。。
* epoll主要負責fd到event型別的對映
* EventDemultiplexer管理fd <-> event型別 <-> eventhandler具體怎麼做event的回撥方法,從而間接實現fd <–event型別–> eventhandler
的具體回撥方法方法
* Reactor負責註冊、管理、分配
核心程式碼
- reactor.h
#include "event_handler.h"
#include "event.h"
#include "reactor_impl.h"
class ReactorImpl; // 為了隱藏具體實現麼。。
class Reactor {
public:
static Reactor& get_instance();
int regist(EventHandler* handler, Event evt);
void remove(EventHandler* handler);
void dispatch(int timeout = 0);
private:
Reactor();
~Reactor();
Reactor(const Reactor&);
Reactor& operator =(const Reactor&);
private:
ReactorImpl* _reactor_impl;
static Reactor reactor;
};
- reactor.cpp
#include "reactor.h"
#include <assert.h>
#include <new>
Reactor Reactor::reactor;
Reactor& Reactor::get_instance() {
return reactor;
}
Reactor::Reactor() {
_reactor_impl = new (std::nothrow)ReactorImpl();
assert(_reactor_impl != NULL);
}
Reactor::~Reactor() {
if (_reactor_impl != NULL) {
delete _reactor_impl;
_reactor_impl = NULL;
}
}
int Reactor::regist(EventHandler* handler, Event evt) {
return _reactor_impl->regist(handler, evt);
}
void Reactor::remove(EventHandler* handler) {
return _reactor_impl->remove(handler);
}
void Reactor::dispatch(int timeout) {
return _reactor_impl->dispatch(timeout);
}
- reactor_impl.h
#include <map>
#include "event.h"
#include "event_handler.h"
#include "event_demultiplexer.h"
class ReactorImpl {
public:
ReactorImpl();
~ReactorImpl();
int regist(EventHandler* handler, Event evt);
void remove(EventHandler* handler);
void dispatch(int timeout = 0);
private:
EventDemultiplexer* _demultiplexer;
std::map<Handle, EventHandler*> _handlers;
};
- reactor_impl.cpp
#include "reactor_impl.h"
#include <new>
#include <assert.h>
#include "epoll_demultiplexer.h"
ReactorImpl::ReactorImpl() {
_demultiplexer = new (std::nothrow)EpollDemultiplexer();
assert(_demultiplexer != NULL);
}
ReactorImpl::~ReactorImpl() {
std::map<Handle, EventHandler*>::iterator iter = _handlers.begin();
for(; iter != _handlers.end(); ++iter) {
delete iter->second;
}
if (_demultiplexer != NULL) {
delete _demultiplexer;
}
}
int ReactorImpl::regist(EventHandler* handler, Event evt) {
int handle = handler->get_handle();
if (_handlers.find(handle) == _handlers.end()) {
_handlers.insert(std::make_pair(handle, handler));
}
return _demultiplexer->regist(handle, evt);
}
void ReactorImpl::remove(EventHandler* handler) {
int handle = handler->get_handle();
// not check?
_demultiplexer->remove(handle);
std::map<Handle, EventHandler*>::iterator iter = _handlers.find(handle);
delete iter->second;
_handlers.erase(iter);
}
void ReactorImpl::dispatch(int timeout) {
_demultiplexer->wait_event(_handlers, timeout);
}
- event.h
typedef unsigned int Event;
enum EventMask {
ReadEvent = 0x01,
WriteEvent = 0x02,
ErrorEvent = 0x04,
EventMask = 0xff
};
- event_demultiplexer.h
#include <map>
#include "event_handler.h"
#include "event.h"
class EventDemultiplexer {
public:
EventDemultiplexer() {}
virtual ~EventDemultiplexer() {}
virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0) = 0;
virtual int regist(Handle handle, Event evt) = 0;
virtual int remove(Handle handle) = 0;
};
- epoll_demultiplexer.h
#include <map>
#include "event_handler.h"
#include "event.h"
#include "event_demultiplexer.h"
class EpollDemultiplexer : public EventDemultiplexer {
public:
EpollDemultiplexer();
virtual ~EpollDemultiplexer();
virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0);
virtual int regist(Handle handle, Event evt);
virtual int remove(Handle handle);
private:
int _max_fd;
int _epoll_fd;
};
- epoll_demultiplexer.cpp
#include <vector>
#include <sys/epoll.h>
#include <iostream>
#include <errno.h>
#include <unistd.h>
#include "epoll_demultiplexer.h"
EpollDemultiplexer::EpollDemultiplexer() : _max_fd(0) {
_epoll_fd = epoll_create(1024);
}
EpollDemultiplexer::~EpollDemultiplexer() {
close(_epoll_fd);
}
int EpollDemultiplexer::wait_event(std::map<Handle, EventHandler*>& handlers, int timeout) {
std::vector<struct epoll_event> events(_max_fd);
int num = epoll_wait(_epoll_fd, &events[0], _max_fd, timeout);
if (num < 0) {
//std::cerr << "WARNING: epoll_wait error " << errno << std::endl;
return num;
}
for (int i = 0; i < num; ++i) {
Handle handle = events[i].data.fd;
if ((EPOLLHUP|EPOLLERR) & events[i].events) {
assert(handlers[handle] != NULL);
(handlers[handle])->handle_error();
} else {
if ((EPOLLIN) & events[i].events) {
assert(handlers[handle] != NULL);
(handlers[handle])->handle_read();
}
if (EPOLLOUT & events[i].events) {
(handlers[handle])->handle_write();
}
}
}
return num;
}
int EpollDemultiplexer::regist(Handle handle, Event evt) {
struct epoll_event ev;
ev.data.fd = handle;
if (evt & ReadEvent) {
ev.events |= EPOLLIN;
}
if (evt & WriteEvent) {
ev.events |= EPOLLOUT;
}
ev.events |= EPOLLET;
if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) {
if (errno == ENOENT) {
if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) {
std::cerr << "epoll_ctl add error " << errno << std::endl;
return -errno;
}
++_max_fd;
} else {
++_max_fd;
}
}
return 0;
}
int EpollDemultiplexer::remove(Handle handle) {
struct epoll_event ev;
if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, handle, &ev) != 0) {
std::cerr << "epoll_ctl del error" << errno << std::endl;
return -errno;
}
--_max_fd;
return 0;
}
- event_handler.h
typedef int Handle;
class EventHandler {
public:
EventHandler() {}
virtual ~EventHandler() {}
virtual Handle get_handle() const = 0;
virtual void handle_read() = 0;
virtual void handle_write() = 0;
virtual void handle_error() = 0;
};
- listen_handler.h
#include "event_handler.h"
#include "event.h"
class ListenHandler : public EventHandler {
public:
ListenHandler(Handle fd);
virtual ~ListenHandler();
virtual Handle get_handle() const {
return _listen_fd;
}
virtual void handle_read();
virtual void handle_write();
virtual void handle_error();
private:
Handle _listen_fd;
};
- listen_handler.cpp
#include "listen_handler.h"
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <new>
#include <assert.h>
#include "event_handler.h"
#include "reactor.h"
#include "socket_handler.h"
ListenHandler::ListenHandler(Handle fd) : _listen_fd(fd) {
// do nothing
}
ListenHandler::~ListenHandler() {
close(_listen_fd);
}
void ListenHandler::handle_read() {
int fd = accept(_listen_fd, NULL, NULL);
EventHandler* h = new (std::nothrow)SocketHandler(fd);
assert(h != NULL);
Reactor& r = Reactor::get_instance();
r.regist(h, ReadEvent);
}
void ListenHandler::handle_write() {
// do nothing
}
void ListenHandler::handle_error() {
// do nothing
}
- socket_handler.h
#include "event_handler.h"
#include "event.h"
class SocketHandler : public EventHandler {
public:
SocketHandler(Handle fd);
virtual ~SocketHandler();
virtual Handle get_handle() const {
return _socket_fd;
}
virtual void handle_read();
virtual void handle_write();
virtual void handle_error();
private:
Handle _socket_fd;
char* _buf;
static const int MAX_SIZE = 1024;
};
- socket_handler.cpp
#include "socket_handler.h"
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <new>
#include "reactor.h"
SocketHandler::SocketHandler(Handle fd) :
_socket_fd(fd) {
_buf = new (std::nothrow)char[MAX_SIZE];
assert(_buf != NULL);
memset(_buf, 0, MAX_SIZE);
}
SocketHandler::~SocketHandler() {
close(_socket_fd);
delete[] _buf;
}
void SocketHandler::handle_read() {
if (read(_socket_fd, _buf, MAX_SIZE) > 0) {
write(_socket_fd, _buf, strlen(_buf));
}
handle_error();
}
void SocketHandler::handle_write() {
// do nothing
}
void SocketHandler::handle_error() {
Reactor& r = Reactor::get_instance();
r.remove(this);
}
demo
- client.cpp
#include "sys/socket.h"
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include <errno.h>
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"
int main() {
int socketfd = -1;
if ( (socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
std::cerr << "socket error " << errno << std::endl;
exit(-1);
}
struct sockaddr_in seraddr;
seraddr.sin_family = AF_INET;
seraddr.sin_port = htons(53031);
seraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO
if (connect(socketfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
std::cerr << "connect error " << errno << std::endl;
exit(-2);
}
char wbuf[64] = {0};
strcpy(wbuf, "hello world");
int n = write(socketfd, wbuf, strlen(wbuf));
char rbuf[64] = {0};
memset(rbuf, 0, sizeof(rbuf));
n = read(socketfd, rbuf, sizeof(rbuf));
std::cout << "send [" << wbuf << "] reply [" << rbuf << "]" << std::endl;
if (n < 0) {
std::cerr << "read error " << errno << std::endl;
exit(-3);
}
close(socketfd);
return 0;
Reactor& actor = Reactor::get_instance();
EventHandler* handler = new ListenHandler(socketfd);
actor.regist(handler, ReadEvent);
while(true) {
actor.dispatch(-1);
std::cout << "client one loop" << std::endl;
}
return 0;
}
- server.cpp
#include <sys/socket.h>
#include <arpa/inet.h>
#include <iostream>
#include <errno.h>
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"
int main() {
int listenfd = -1;
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
std::cerr << "socket error " << errno << std::endl;
exit(-1);
}
struct sockaddr_in seraddr;
seraddr.sin_family = AF_INET;
seraddr.sin_port = htons(53031);
seraddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
std::cerr << "bind error " << errno << std::endl;
exit(-2);
}
if (listen(listenfd, 5) < 0) {
std::cerr << "listen error " << errno << std::endl;
exit(-3);
}
Reactor& actor = Reactor::get_instance();
EventHandler* handler = new ListenHandler(listenfd);
actor.regist(handler, ReadEvent);
while(true) {
actor.dispatch(100);
//std::cout << "one loop" << std::endl;
}
return 0;
}