1. 程式人生 > >reactor模式C++實現

reactor模式C++實現

copy from github上的一份實現。。。找不到連結了。。。
* epoll主要負責fd到event型別的對映
* EventDemultiplexer管理fd <-> event型別 <-> eventhandler具體怎麼做event的回撥方法,從而間接實現fd <–event型別–> eventhandler
的具體回撥方法方法
* Reactor負責註冊、管理、分配

核心程式碼

  • reactor.h
#include "event_handler.h"
#include "event.h"
#include "reactor_impl.h"
class ReactorImpl; // 為了隱藏具體實現麼。。 class Reactor { public: static Reactor& get_instance(); int regist(EventHandler* handler, Event evt); void remove(EventHandler* handler); void dispatch(int timeout = 0); private: Reactor(); ~Reactor(); Reactor(const Reactor&); Reactor& operator
=(const Reactor&); private: ReactorImpl* _reactor_impl; static Reactor reactor; };
  • reactor.cpp
#include "reactor.h"
#include <assert.h>
#include <new>

Reactor Reactor::reactor;

Reactor& Reactor::get_instance() {
    return reactor;
}

Reactor::Reactor() {
    _reactor_impl = new
(std::nothrow)ReactorImpl(); assert(_reactor_impl != NULL); } Reactor::~Reactor() { if (_reactor_impl != NULL) { delete _reactor_impl; _reactor_impl = NULL; } } int Reactor::regist(EventHandler* handler, Event evt) { return _reactor_impl->regist(handler, evt); } void Reactor::remove(EventHandler* handler) { return _reactor_impl->remove(handler); } void Reactor::dispatch(int timeout) { return _reactor_impl->dispatch(timeout); }
  • reactor_impl.h
#include <map>
#include "event.h"
#include "event_handler.h"
#include "event_demultiplexer.h"

class ReactorImpl {
public:
    ReactorImpl();
    ~ReactorImpl();
    int regist(EventHandler* handler, Event evt);
    void remove(EventHandler* handler);
    void dispatch(int timeout = 0);

private:
    EventDemultiplexer* _demultiplexer;
    std::map<Handle, EventHandler*> _handlers;
};
  • reactor_impl.cpp
#include "reactor_impl.h"
#include <new>
#include <assert.h>
#include "epoll_demultiplexer.h"

ReactorImpl::ReactorImpl() {
    _demultiplexer = new (std::nothrow)EpollDemultiplexer();
    assert(_demultiplexer != NULL);
}

ReactorImpl::~ReactorImpl() {
    std::map<Handle, EventHandler*>::iterator iter = _handlers.begin();
    for(; iter != _handlers.end(); ++iter) {
        delete iter->second;
    }

    if (_demultiplexer != NULL) {
        delete _demultiplexer;
    }
}

int ReactorImpl::regist(EventHandler* handler, Event evt) {
    int handle = handler->get_handle();
    if (_handlers.find(handle) == _handlers.end()) {
        _handlers.insert(std::make_pair(handle, handler));
    }
    return _demultiplexer->regist(handle, evt);
}

void ReactorImpl::remove(EventHandler* handler) {
    int handle = handler->get_handle();
    // not check?
    _demultiplexer->remove(handle);

    std::map<Handle, EventHandler*>::iterator iter = _handlers.find(handle);
    delete iter->second;
    _handlers.erase(iter);
}

void ReactorImpl::dispatch(int timeout) {
    _demultiplexer->wait_event(_handlers, timeout);
}
  • event.h
typedef unsigned int Event;
enum EventMask {
    ReadEvent  = 0x01,
    WriteEvent = 0x02,
    ErrorEvent = 0x04,
    EventMask  = 0xff
};
  • event_demultiplexer.h
#include <map>
#include "event_handler.h"
#include "event.h"

class EventDemultiplexer {
public:
    EventDemultiplexer() {}
    virtual ~EventDemultiplexer()  {}
    virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0) = 0;
    virtual int regist(Handle handle, Event evt) = 0;
    virtual int remove(Handle handle) = 0;
};
  • epoll_demultiplexer.h
#include <map>
#include "event_handler.h"
#include "event.h"
#include "event_demultiplexer.h"

class EpollDemultiplexer : public EventDemultiplexer {
public:
    EpollDemultiplexer();
    virtual ~EpollDemultiplexer();
    virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0);
    virtual int regist(Handle handle, Event evt);
    virtual int remove(Handle handle);
private:
    int _max_fd;
    int _epoll_fd;
};
  • epoll_demultiplexer.cpp
#include <vector>
#include <sys/epoll.h>
#include <iostream>
#include <errno.h>
#include <unistd.h>

#include "epoll_demultiplexer.h"

EpollDemultiplexer::EpollDemultiplexer() : _max_fd(0) {
    _epoll_fd = epoll_create(1024);
}

EpollDemultiplexer::~EpollDemultiplexer() {
    close(_epoll_fd);
}

int EpollDemultiplexer::wait_event(std::map<Handle, EventHandler*>& handlers, int timeout) {
    std::vector<struct epoll_event> events(_max_fd);
    int num = epoll_wait(_epoll_fd, &events[0], _max_fd, timeout);
    if (num < 0) {
        //std::cerr << "WARNING: epoll_wait error " << errno << std::endl;
        return num;
    }

    for (int i = 0; i < num; ++i) {
        Handle handle = events[i].data.fd;
        if ((EPOLLHUP|EPOLLERR) & events[i].events) {
            assert(handlers[handle] != NULL);
            (handlers[handle])->handle_error();
        } else {
            if ((EPOLLIN) & events[i].events) {
                assert(handlers[handle] != NULL);
                (handlers[handle])->handle_read();
            }
            if (EPOLLOUT & events[i].events) {
                (handlers[handle])->handle_write();
            }
        }
    }
    return num;
}

int EpollDemultiplexer::regist(Handle handle, Event evt) {
    struct epoll_event ev;
    ev.data.fd = handle;
    if (evt & ReadEvent) {
        ev.events |= EPOLLIN;
    }
    if (evt & WriteEvent) {
        ev.events |= EPOLLOUT;
    }
    ev.events |= EPOLLET;

    if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { 
        if (errno == ENOENT) {
            if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) {
                std::cerr << "epoll_ctl add error " << errno << std::endl;
                return -errno;
            }
            ++_max_fd;
        } else {
            ++_max_fd;
        }
    }
    return 0;
}

int EpollDemultiplexer::remove(Handle handle) {
    struct epoll_event ev;
    if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, handle, &ev) != 0) {
        std::cerr << "epoll_ctl del error" << errno << std::endl;
        return -errno;
    }
    --_max_fd;
    return 0;
}
  • event_handler.h
typedef int Handle;

class EventHandler {
public:
    EventHandler() {}
    virtual ~EventHandler() {}
    virtual Handle get_handle() const = 0;
    virtual void handle_read() = 0;
    virtual void handle_write() = 0;
    virtual void handle_error() = 0;
};
  • listen_handler.h
#include "event_handler.h"
#include "event.h"

class ListenHandler : public EventHandler {
public:
    ListenHandler(Handle fd);
    virtual ~ListenHandler();
    virtual Handle get_handle() const {
        return _listen_fd;
    }
    virtual void handle_read();
    virtual void handle_write();
    virtual void handle_error();
private:
    Handle _listen_fd;
};
  • listen_handler.cpp
#include "listen_handler.h"
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <new>
#include <assert.h>
#include "event_handler.h"
#include "reactor.h"
#include "socket_handler.h"

ListenHandler::ListenHandler(Handle fd) : _listen_fd(fd) {
    // do nothing
}

ListenHandler::~ListenHandler() {
    close(_listen_fd);
}

void ListenHandler::handle_read() {
    int fd = accept(_listen_fd, NULL, NULL);
    EventHandler* h = new (std::nothrow)SocketHandler(fd);
    assert(h != NULL);
    Reactor& r = Reactor::get_instance();
    r.regist(h, ReadEvent);
}

void ListenHandler::handle_write() {
    // do nothing
}

void ListenHandler::handle_error() {
    // do nothing
}
  • socket_handler.h
#include "event_handler.h"
#include "event.h"

class SocketHandler : public EventHandler {
public:
    SocketHandler(Handle fd);
    virtual ~SocketHandler();
    virtual Handle get_handle() const {
        return _socket_fd;
    }
    virtual void handle_read();
    virtual void handle_write();
    virtual void handle_error();
private:
    Handle _socket_fd;
    char* _buf;
    static const int MAX_SIZE = 1024;
};
  • socket_handler.cpp
#include "socket_handler.h"
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <new>
#include "reactor.h"

SocketHandler::SocketHandler(Handle fd) :
        _socket_fd(fd) {
    _buf = new (std::nothrow)char[MAX_SIZE];
    assert(_buf != NULL);
    memset(_buf, 0, MAX_SIZE);
}

SocketHandler::~SocketHandler() {
    close(_socket_fd);
    delete[] _buf;
}

void SocketHandler::handle_read() {
    if (read(_socket_fd, _buf, MAX_SIZE) > 0) {
        write(_socket_fd, _buf, strlen(_buf));
    } 
    handle_error();
}

void SocketHandler::handle_write() {
    // do nothing
}

void SocketHandler::handle_error() {
    Reactor& r = Reactor::get_instance();
    r.remove(this);
}

demo

  • client.cpp
#include "sys/socket.h"
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include <errno.h>
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"

int main() {
    int socketfd = -1;
    if ( (socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        std::cerr << "socket error " << errno << std::endl;
        exit(-1);
    }

    struct sockaddr_in seraddr;
    seraddr.sin_family = AF_INET;
    seraddr.sin_port = htons(53031);
    seraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO

    if (connect(socketfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
        std::cerr << "connect error " << errno << std::endl;
        exit(-2);
    }

    char wbuf[64] = {0};
    strcpy(wbuf, "hello world");
    int n = write(socketfd, wbuf, strlen(wbuf));

    char rbuf[64] = {0};
    memset(rbuf, 0, sizeof(rbuf));
    n = read(socketfd, rbuf, sizeof(rbuf));
    std::cout << "send [" << wbuf << "] reply [" << rbuf << "]" << std::endl;

    if (n < 0) {
        std::cerr << "read error " << errno << std::endl;
        exit(-3);
    }
    close(socketfd);
    return 0;

    Reactor& actor = Reactor::get_instance();
    EventHandler* handler = new ListenHandler(socketfd);
    actor.regist(handler, ReadEvent);

    while(true) {
        actor.dispatch(-1);
        std::cout << "client one loop" << std::endl;
    }

    return 0;
}
  • server.cpp
#include <sys/socket.h>
#include <arpa/inet.h>
#include <iostream>
#include <errno.h>
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"

int main() {
    int listenfd = -1;
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        std::cerr << "socket error " << errno << std::endl;
        exit(-1);
    }

    struct sockaddr_in seraddr;
    seraddr.sin_family = AF_INET;
    seraddr.sin_port = htons(53031);
    seraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
        std::cerr << "bind error " << errno << std::endl;
        exit(-2);
    }

    if (listen(listenfd, 5) < 0) {
        std::cerr << "listen error " << errno << std::endl;
        exit(-3);
    }

    Reactor& actor = Reactor::get_instance();
    EventHandler* handler = new ListenHandler(listenfd);
    actor.regist(handler, ReadEvent);

    while(true) {
        actor.dispatch(100);
        //std::cout << "one loop" << std::endl;
    }

    return 0;
}