1. 程式人生 > >fflib併發框架學習1

fflib併發框架學習1

把知然 博主的程式碼fork下來好久了,一直沒看,最近準備抽時間慢慢看下,順便做下筆記。

知然的程式碼風格很nice,ok,just watch the code.

CS模型,肯定要Client,Server通訊,遊戲裡一般都會用到TCP長連線,併發框架也就會封裝socket,多路複用,IOLoop等,之前看過libev的程式碼,而知然的更加精簡,哈哈。

先從最簡單的socket封裝:

typedef int socket_fd_t;
class fd_i
{
public:
    virtual ~fd_i(){}

    virtual socket_fd_t socket() 	 = 0;
    virtual int handle_epoll_read()  = 0;
    virtual int handle_epoll_write() = 0;
    virtual int handle_epoll_del() 	 = 0;

    virtual void close() 			 = 0;
};

這裡有一個fd的抽象層,fd就是socket的別名,從介面看這裡只會用到epoll,還有對epoll的幾個操作,只要操作過epoll的應該都很熟悉。下面看具體實現的socket:
#ifndef _SOCKET_I_
#define _SOCKET_I_

#include <string>
#include <unistd.h>
using namespace std;

#include "netbase.h"

namespace ff {
    
class socket_i: public fd_i
{
public:
    socket_i():
        m_data(NULL) {}
    virtual ~socket_i(){}

    virtual void open() = 0;
    virtual void async_send(const string& buff_) = 0;
    virtual void async_recv() = 0;
    virtual void safe_delete() { delete this; }
    virtual void set_data(void* p) { m_data = p; }
    template<typename T>
    T* get_data() const { return (T*)m_data; }
    
private:
    void*   m_data;
};

typedef socket_i*  socket_ptr_t;

}

#endif
我去,好神奇,居然還有自己del自己的safe_delete函式。

這裡還是抽象層,肯定還有實現:

#ifndef _SOCKET_IMPL_H_
#define _SOCKET_IMPL_H_

#include <list>
#include <string>
using namespace std;

#include "net/socket_i.h"

namespace ff {

class epoll_i;
class socket_controller_i;
class task_queue_i;

#define  RECV_BUFFER_SIZE 8096

class socket_impl_t: public socket_i
{
public:
    typedef list<string>    send_buffer_t;

public:
    socket_impl_t(epoll_i*, socket_controller_i*, int fd, task_queue_i* tq_);
    ~socket_impl_t();

    virtual int socket() { return m_fd; }
    virtual void close();
    virtual void open();

    virtual int handle_epoll_read();
    virtual int handle_epoll_write();
    virtual int handle_epoll_del();

    virtual void async_send(const string& buff_);
    virtual void async_recv();
    virtual void safe_delete();
    
    int handle_epoll_read_impl();
    int handle_epoll_write_impl();
    int handle_epoll_error_impl();
    void send_impl(const string& buff_);
    void close_impl();
    
    socket_controller_i* get_sc() { return m_sc; }
private:
    bool is_open() { return m_fd > 0; }

    int do_send(const string& msg, string& left_);
private:
    epoll_i*                            m_epoll;
    socket_controller_i*                m_sc;
    int                                 m_fd;
    task_queue_i*                       m_tq;
    send_buffer_t                       m_send_buffer;
};

}

#endif
先看socket_impl_t的資料成員:

1.m_epoll是個epoll_i抽象類,是對io_demultiplexer_i的一層封裝:

class io_demultiplexer_i
{
public:
    virtual ~io_demultiplexer_i(){}

    virtual int event_loop() 		    = 0;
    virtual int close() 				= 0;
    virtual int register_fd(fd_i*)      = 0;
    virtual int unregister_fd(fd_i*)  	= 0;
    virtual int mod_fd(fd_i*)           = 0;
};

fflib的命名規範很統一,既然有epoll_i,肯定就有epoll_impl_t,來看:
class epoll_impl_t: public epoll_i
{
public:
    epoll_impl_t();
    ~epoll_impl_t();

    virtual int event_loop();
    virtual int close();
    virtual int register_fd(fd_i*);
    virtual int unregister_fd(fd_i*);
    virtual int mod_fd(fd_i*);

    int interupt_loop();//! 涓柇浜嬩歡寰幆
protected:
    void fd_del_callback();

private:
    volatile bool            m_running;
    int                      m_efd;
    task_queue_i*            m_task_queue;
    int                      m_interupt_sockets[2];
    //! 寰呴攢姣佺殑error socket
    list<fd_i*>   		     m_error_fd_set;
    mutex_t                  m_mutex;
};
1、資料成員m_running就是一個eventloop是否繼續的標誌;在close的時候會被設定為false,這樣eventloop就會被打斷。

2、m_efd就是當前監聽的套接字fd;

3、m_task_queue表示一個任務佇列,也是關鍵的資料結構:

class task_queue_i
{
public:
    typedef list<task_t> task_list_t;
public:
    virtual ~task_queue_i(){}
    virtual void close() = 0;
    virtual void produce(const task_t& task_) =0;
    virtual void multi_produce(const task_list_t& task_) =0;
    virtual int  consume(task_t& task_) = 0;
    virtual int  consume_all(task_list_t&) = 0;
    virtual int run() = 0;
    virtual int batch_run() = 0;
};

就是對task的pm模式封裝,那具體實現是task_queue_t:
class task_queue_t: public task_queue_i
{
public:
    task_queue_t():
        m_flag(true),
        m_cond(m_mutex)
    {
    }
    ~task_queue_t()
    {
    }
    void close()
    {
    	lock_guard_t lock(m_mutex);
    	m_flag = false;
    	m_cond.broadcast();
    }

    void multi_produce(const task_list_t& task_)
    {
        lock_guard_t lock(m_mutex);
        bool need_sig = m_tasklist.empty();

        for(task_list_t::const_iterator it = task_.begin(); it != task_.end(); ++it)
        {
            m_tasklist.push_back(*it);
        }

        if (need_sig)
        {
        	m_cond.signal();
        }
    }
    void produce(const task_t& task_)
    {        
        lock_guard_t lock(m_mutex);
        bool need_sig = m_tasklist.empty();

        m_tasklist.push_back(task_);
        if (need_sig)
		{
			m_cond.signal();
		}
    }

    int   consume(task_t& task_)
    {
        lock_guard_t lock(m_mutex);
        while (m_tasklist.empty())
        {
            if (false == m_flag)
            {
                return -1;
            }
            m_cond.wait();
        }

        task_ = m_tasklist.front();
        m_tasklist.pop_front();

        return 0;
    }

    int run()
    {
        task_t t;
        while (0 == consume(t))
        {
            t.run();
        }
        return 0;
    }

    int consume_all(task_list_t& tasks_)
    {
        lock_guard_t lock(m_mutex);

        while (m_tasklist.empty())
        {
            if (false == m_flag)
            {
                return -1;
            }
            m_cond.wait();
        }

        tasks_ = m_tasklist;
        m_tasklist.clear();

        return 0;
    }

    int batch_run()
	{
    	task_list_t tasks;
    	int ret = consume_all(tasks);
		while (0 == ret)
		{
			for (task_list_t::iterator it = tasks.begin(); it != tasks.end(); ++it)
			{
				(*it).run();
			}
			tasks.clear();
			ret = consume_all(tasks);
		}
		return 0;
	}
private:
    volatile bool                   m_flag;
    task_list_t                     m_tasklist;
    mutex_t                         m_mutex;
    condition_var_t                 m_cond;
};

看下來的確是很傳統的封裝的producter-consumer模式,正好溫習下。

4.m_interupt_sockets暫時還不知道什麼功能,等下看實現再說。

好下面來看具體實現:

#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h>

#include "net/socket_op.h"
#include "net/netbase.h"
#include "net/epoll_impl.h"

using namespace ff;

epoll_impl_t::epoll_impl_t():
    m_running(true),
    m_efd(-1)
{
    m_efd = ::epoll_create(CREATE_EPOLL_SIZE);
    m_interupt_sockets[0] = -1;
    m_interupt_sockets[1] = -1;
    assert( 0 == ::socketpair(AF_LOCAL, SOCK_STREAM, 0, m_interupt_sockets));
    struct epoll_event ee = { 0, { 0 } };
	ee.data.ptr  = this;
	ee.events    = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;
	::epoll_ctl(m_efd, EPOLL_CTL_ADD, m_interupt_sockets[0], &ee);
}

epoll_impl_t::~epoll_impl_t()
{
    ::close(m_interupt_sockets[0]);
    ::close(m_interupt_sockets[1]);
    ::close(m_efd);
    m_efd = -1;
}

int epoll_impl_t::event_loop()
{
    int i = 0, nfds = 0;
    struct epoll_event ev_set[EPOLL_EVENTS_SIZE];

    do
    {
        nfds  = ::epoll_wait(m_efd, ev_set, EPOLL_EVENTS_SIZE, EPOLL_WAIT_TIME);

        if (nfds < 0 && EINTR == errno)
        {
            nfds = 0;
            continue;
        }
        for (i = 0; i < nfds; ++i)
        {
            epoll_event& cur_ev = ev_set[i];
            fd_i* fd_ptr	    = (fd_i*)cur_ev.data.ptr;
            if (cur_ev.data.ptr == this)//! iterupte event
            {
            	if (false == m_running)
				{
            		return 0;
				}

            	//! 鍒犻櫎閭d簺宸茬粡鍑虹幇error鐨剆ocket 瀵矽薄
            	fd_del_callback();
            	continue;
            }
    
            if (cur_ev.events & (EPOLLIN | EPOLLPRI))
            {
                fd_ptr->handle_epoll_read();
            }

            if(cur_ev.events & EPOLLOUT)
            {
                fd_ptr->handle_epoll_write();
            }

            if (cur_ev.events & (EPOLLERR | EPOLLHUP))
            {
                fd_ptr->close();
            }
        }
        
    }while(nfds >= 0);

    return 0;
}

int epoll_impl_t::close()
{
    m_running = false;

    interupt_loop();
    
    return 0;
}

int epoll_impl_t::register_fd(fd_i* fd_ptr_)
{
    struct epoll_event ee = { 0, { 0 } };

    ee.data.ptr  = fd_ptr_;
    ee.events    = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;

    return ::epoll_ctl(m_efd, EPOLL_CTL_ADD, fd_ptr_->socket(), &ee);
}

int epoll_impl_t::unregister_fd(fd_i* fd_ptr_)
{
	int ret = 0;
	if (fd_ptr_->socket() > 0)
	{
		struct epoll_event ee;

		ee.data.ptr  = (void*)0;
		ret = ::epoll_ctl(m_efd, EPOLL_CTL_DEL, fd_ptr_->socket(), &ee);
	}

	{
		lock_guard_t lock(m_mutex);
		m_error_fd_set.push_back(fd_ptr_);
	}
    interupt_loop();
    return ret;
}

int epoll_impl_t::mod_fd(fd_i* fd_ptr_)
{
    struct epoll_event ee = { 0, { 0 } };

    ee.data.ptr  = fd_ptr_;
    ee.events    = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;

    return ::epoll_ctl(m_efd, EPOLL_CTL_MOD, fd_ptr_->socket(), &ee);
}

void epoll_impl_t::fd_del_callback()
{
    lock_guard_t lock(m_mutex);
    list<fd_i*>::iterator it = m_error_fd_set.begin();
    for (; it != m_error_fd_set.end(); ++it)
    {
        (*it)->handle_epoll_del();
    }
    m_error_fd_set.clear();
}

int epoll_impl_t::interupt_loop()//! 涓柇浜嬩歡寰幆
{
	struct epoll_event ee = { 0, { 0 } };

	ee.data.ptr  = this;
	ee.events    = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;

	return ::epoll_ctl(m_efd, EPOLL_CTL_MOD, m_interupt_sockets[0], &ee);
}


register_fd
就是註冊對應的fd到epoll上;

event_loop是最重要的,就是有epoll事件時,根據事件型別呼叫相應的handler,這裡的EventLoop工作很專一就是處理網路時間,木有libev那樣還做了timer功能。

好了,現在簡單知道socketimpl裡的資料成員的作用了,看下cpp實現:

#include <errno.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>

#include "net/socket_impl.h"
#include "net/epoll_i.h"
#include "net/socket_controller_i.h"
#include "net/socket_op.h"
#include "base/singleton.h"
#include "base/lock.h"
#include "base/task_queue_i.h"

using namespace ff;

socket_impl_t::socket_impl_t(epoll_i* e_, socket_controller_i* seh_, int fd_, task_queue_i* tq_):
    m_epoll(e_),
    m_sc(seh_),
    m_fd(fd_),
    m_tq(tq_)
{
}

socket_impl_t::~socket_impl_t()
{
    delete m_sc;
}

void socket_impl_t::open()
{
    socket_op_t::set_nonblock(m_fd);
    m_sc->handle_open(this);
    async_recv();
}

void socket_impl_t::close()
{
    m_tq->produce(task_binder_t::gen(&socket_impl_t::close_impl, this));
}

void socket_impl_t::close_impl()
{
    if (m_fd > 0)
    {
        m_epoll->unregister_fd(this);
        ::close(m_fd);
        m_fd = -1;
    }
}

int socket_impl_t::handle_epoll_read()
{
    m_tq->produce(task_binder_t::gen(&socket_impl_t::handle_epoll_read_impl, this));
    return 0;
}

int socket_impl_t::handle_epoll_read_impl()
{
    if (is_open())
    {
        int nread = 0;
        char recv_buffer[RECV_BUFFER_SIZE];
        do
        {
            nread = ::read(m_fd, recv_buffer, sizeof(recv_buffer) - 1);
            if (nread > 0)
            {
                recv_buffer[nread] = '\0';
                m_sc->handle_read(this, recv_buffer, size_t(nread));
                if (nread < int(sizeof(recv_buffer) - 1))
                {
                	break;//! equal EWOULDBLOCK
                }
            }
            else if (0 == nread) //! eof
            {
                this->close();
                return -1;
            }
            else
            {
                if (errno == EINTR)
                {
                    continue;
                }
                else if (errno == EWOULDBLOCK)
                {
                    break;
                }
                else
                {
                    this->close();
                    return -1;
                }
            }
        } while(1);
    }
    return 0;
}

int socket_impl_t::handle_epoll_del()
{
    m_tq->produce(task_binder_t::gen(&socket_controller_i::handle_error, this->get_sc(), this));
    return 0;
}

int socket_impl_t::handle_epoll_write()
{
    m_tq->produce(task_binder_t::gen(&socket_impl_t::handle_epoll_write_impl, this));
    return 0;
}

int socket_impl_t::handle_epoll_write_impl()
{
    int ret = 0;
    string left_buff;

    if (false == is_open() || true == m_send_buffer.empty())
    {
        return 0;
    }

    do
    {
        const string& msg = m_send_buffer.front();
        ret = do_send(msg, left_buff);

        if (ret < 0)
        {
            this ->close();
            return -1;
        }
        else if (ret > 0)
        {
            m_send_buffer.pop_front();
            m_send_buffer.push_front(left_buff);
            return 0;
        }
        else
        {
            m_send_buffer.pop_front();
        }
    } while (false == m_send_buffer.empty());

    m_sc->handle_write_completed(this);
    return 0;
}

void socket_impl_t::async_send(const string& msg_)
{
    m_tq->produce(task_binder_t::gen(&socket_impl_t::send_impl, this, msg_));
}

void socket_impl_t::send_impl(const string& src_buff_)
{
    string buff_ = src_buff_;

    if (false == is_open() || m_sc->check_pre_send(this, buff_))
    {
        return;
    }
    //! socket buff is full, cache the data
    if (false == m_send_buffer.empty())
    {
        m_send_buffer.push_back(buff_);
        return;
    }

    string left_buff;
    int ret = do_send(buff_, left_buff);

    if (ret < 0)
    {
        this ->close();
    }
    else if (ret > 0)
    {
        m_send_buffer.push_back(left_buff);
    }
    else
    {
        //! send ok
        m_sc->handle_write_completed(this);
    }
}

int socket_impl_t::do_send(const string& buff_, string& left_buff_)
{
    size_t nleft             = buff_.size();
    const char* buffer       = buff_.data();
    ssize_t nwritten;

    while(nleft > 0)
    {
        if((nwritten = ::send(m_fd, buffer, nleft, MSG_NOSIGNAL)) <= 0)
        {
            if (EINTR == errno)
            {
                nwritten = 0;
            }
            else if (EWOULDBLOCK == errno)
            {
                left_buff_.assign(buff_.c_str() + (buff_.size() - nleft), nleft);
                return 1;
            }
            else
            {
                this->close();
                return -1;
            }
        }

        nleft    -= nwritten;
        buffer   += nwritten;
    }

    return 0;
}

void socket_impl_t::async_recv()
{
    m_epoll->register_fd(this);
}

void socket_impl_t::safe_delete()
{
    struct lambda_t
    {
        static void exe(void* p_)
        {
            delete ((socket_impl_t*)p_);
        }
    };
    m_tq->produce(task_t(&lambda_t::exe, this));
}

socket_impl_t是從EventLoop裡監聽到的new client,在這裡open的時候,會被設定為非阻塞,然後socket_controller_i也會呼叫自己的open,比如gateway_socket_controller_t就是開始心跳計時。

在幾個關鍵函式裡,都是非同步投遞,比如handle_epoll_read的實現是這樣的:

int socket_impl_t::handle_epoll_read()
{
    m_tq->produce(task_binder_t::gen(&socket_impl_t::handle_epoll_read_impl, this));
    return 0;
}

為毛是非同步的,很簡單,task_queue_i的run函式是個whileloop,開個執行緒把run函式作為thread的啟動函式就行了。

socket_impl_t只是負責資料的底層讀寫,具體的應用層處理會交給socket_controller_i來處理,比如分包、解析、分發資料。

整個實現非常透明,很值得一讀。