fflib併發框架學習1
阿新 • • 發佈:2019-02-02
把知然 博主的程式碼fork下來好久了,一直沒看,最近準備抽時間慢慢看下,順便做下筆記。
知然的程式碼風格很nice,ok,just watch the code.
CS模型,肯定要Client,Server通訊,遊戲裡一般都會用到TCP長連線,併發框架也就會封裝socket,多路複用,IOLoop等,之前看過libev的程式碼,而知然的更加精簡,哈哈。
先從最簡單的socket封裝:
typedef int socket_fd_t; class fd_i { public: virtual ~fd_i(){} virtual socket_fd_t socket() = 0; virtual int handle_epoll_read() = 0; virtual int handle_epoll_write() = 0; virtual int handle_epoll_del() = 0; virtual void close() = 0; };
這裡有一個fd的抽象層,fd就是socket的別名,從介面看這裡只會用到epoll,還有對epoll的幾個操作,只要操作過epoll的應該都很熟悉。下面看具體實現的socket:
我去,好神奇,居然還有自己del自己的safe_delete函式。#ifndef _SOCKET_I_ #define _SOCKET_I_ #include <string> #include <unistd.h> using namespace std; #include "netbase.h" namespace ff { class socket_i: public fd_i { public: socket_i(): m_data(NULL) {} virtual ~socket_i(){} virtual void open() = 0; virtual void async_send(const string& buff_) = 0; virtual void async_recv() = 0; virtual void safe_delete() { delete this; } virtual void set_data(void* p) { m_data = p; } template<typename T> T* get_data() const { return (T*)m_data; } private: void* m_data; }; typedef socket_i* socket_ptr_t; } #endif
這裡還是抽象層,肯定還有實現:
先看socket_impl_t的資料成員:#ifndef _SOCKET_IMPL_H_ #define _SOCKET_IMPL_H_ #include <list> #include <string> using namespace std; #include "net/socket_i.h" namespace ff { class epoll_i; class socket_controller_i; class task_queue_i; #define RECV_BUFFER_SIZE 8096 class socket_impl_t: public socket_i { public: typedef list<string> send_buffer_t; public: socket_impl_t(epoll_i*, socket_controller_i*, int fd, task_queue_i* tq_); ~socket_impl_t(); virtual int socket() { return m_fd; } virtual void close(); virtual void open(); virtual int handle_epoll_read(); virtual int handle_epoll_write(); virtual int handle_epoll_del(); virtual void async_send(const string& buff_); virtual void async_recv(); virtual void safe_delete(); int handle_epoll_read_impl(); int handle_epoll_write_impl(); int handle_epoll_error_impl(); void send_impl(const string& buff_); void close_impl(); socket_controller_i* get_sc() { return m_sc; } private: bool is_open() { return m_fd > 0; } int do_send(const string& msg, string& left_); private: epoll_i* m_epoll; socket_controller_i* m_sc; int m_fd; task_queue_i* m_tq; send_buffer_t m_send_buffer; }; } #endif
1.m_epoll是個epoll_i抽象類,是對io_demultiplexer_i的一層封裝:
class io_demultiplexer_i
{
public:
virtual ~io_demultiplexer_i(){}
virtual int event_loop() = 0;
virtual int close() = 0;
virtual int register_fd(fd_i*) = 0;
virtual int unregister_fd(fd_i*) = 0;
virtual int mod_fd(fd_i*) = 0;
};
fflib的命名規範很統一,既然有epoll_i,肯定就有epoll_impl_t,來看:
class epoll_impl_t: public epoll_i
{
public:
epoll_impl_t();
~epoll_impl_t();
virtual int event_loop();
virtual int close();
virtual int register_fd(fd_i*);
virtual int unregister_fd(fd_i*);
virtual int mod_fd(fd_i*);
int interupt_loop();//! 涓柇浜嬩歡寰幆
protected:
void fd_del_callback();
private:
volatile bool m_running;
int m_efd;
task_queue_i* m_task_queue;
int m_interupt_sockets[2];
//! 寰呴攢姣佺殑error socket
list<fd_i*> m_error_fd_set;
mutex_t m_mutex;
};
1、資料成員m_running就是一個eventloop是否繼續的標誌;在close的時候會被設定為false,這樣eventloop就會被打斷。
2、m_efd就是當前監聽的套接字fd;
3、m_task_queue表示一個任務佇列,也是關鍵的資料結構:
class task_queue_i
{
public:
typedef list<task_t> task_list_t;
public:
virtual ~task_queue_i(){}
virtual void close() = 0;
virtual void produce(const task_t& task_) =0;
virtual void multi_produce(const task_list_t& task_) =0;
virtual int consume(task_t& task_) = 0;
virtual int consume_all(task_list_t&) = 0;
virtual int run() = 0;
virtual int batch_run() = 0;
};
就是對task的pm模式封裝,那具體實現是task_queue_t:
class task_queue_t: public task_queue_i
{
public:
task_queue_t():
m_flag(true),
m_cond(m_mutex)
{
}
~task_queue_t()
{
}
void close()
{
lock_guard_t lock(m_mutex);
m_flag = false;
m_cond.broadcast();
}
void multi_produce(const task_list_t& task_)
{
lock_guard_t lock(m_mutex);
bool need_sig = m_tasklist.empty();
for(task_list_t::const_iterator it = task_.begin(); it != task_.end(); ++it)
{
m_tasklist.push_back(*it);
}
if (need_sig)
{
m_cond.signal();
}
}
void produce(const task_t& task_)
{
lock_guard_t lock(m_mutex);
bool need_sig = m_tasklist.empty();
m_tasklist.push_back(task_);
if (need_sig)
{
m_cond.signal();
}
}
int consume(task_t& task_)
{
lock_guard_t lock(m_mutex);
while (m_tasklist.empty())
{
if (false == m_flag)
{
return -1;
}
m_cond.wait();
}
task_ = m_tasklist.front();
m_tasklist.pop_front();
return 0;
}
int run()
{
task_t t;
while (0 == consume(t))
{
t.run();
}
return 0;
}
int consume_all(task_list_t& tasks_)
{
lock_guard_t lock(m_mutex);
while (m_tasklist.empty())
{
if (false == m_flag)
{
return -1;
}
m_cond.wait();
}
tasks_ = m_tasklist;
m_tasklist.clear();
return 0;
}
int batch_run()
{
task_list_t tasks;
int ret = consume_all(tasks);
while (0 == ret)
{
for (task_list_t::iterator it = tasks.begin(); it != tasks.end(); ++it)
{
(*it).run();
}
tasks.clear();
ret = consume_all(tasks);
}
return 0;
}
private:
volatile bool m_flag;
task_list_t m_tasklist;
mutex_t m_mutex;
condition_var_t m_cond;
};
看下來的確是很傳統的封裝的producter-consumer模式,正好溫習下。
4.m_interupt_sockets暫時還不知道什麼功能,等下看實現再說。
好下面來看具體實現:
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h>
#include "net/socket_op.h"
#include "net/netbase.h"
#include "net/epoll_impl.h"
using namespace ff;
epoll_impl_t::epoll_impl_t():
m_running(true),
m_efd(-1)
{
m_efd = ::epoll_create(CREATE_EPOLL_SIZE);
m_interupt_sockets[0] = -1;
m_interupt_sockets[1] = -1;
assert( 0 == ::socketpair(AF_LOCAL, SOCK_STREAM, 0, m_interupt_sockets));
struct epoll_event ee = { 0, { 0 } };
ee.data.ptr = this;
ee.events = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;
::epoll_ctl(m_efd, EPOLL_CTL_ADD, m_interupt_sockets[0], &ee);
}
epoll_impl_t::~epoll_impl_t()
{
::close(m_interupt_sockets[0]);
::close(m_interupt_sockets[1]);
::close(m_efd);
m_efd = -1;
}
int epoll_impl_t::event_loop()
{
int i = 0, nfds = 0;
struct epoll_event ev_set[EPOLL_EVENTS_SIZE];
do
{
nfds = ::epoll_wait(m_efd, ev_set, EPOLL_EVENTS_SIZE, EPOLL_WAIT_TIME);
if (nfds < 0 && EINTR == errno)
{
nfds = 0;
continue;
}
for (i = 0; i < nfds; ++i)
{
epoll_event& cur_ev = ev_set[i];
fd_i* fd_ptr = (fd_i*)cur_ev.data.ptr;
if (cur_ev.data.ptr == this)//! iterupte event
{
if (false == m_running)
{
return 0;
}
//! 鍒犻櫎閭d簺宸茬粡鍑虹幇error鐨剆ocket 瀵矽薄
fd_del_callback();
continue;
}
if (cur_ev.events & (EPOLLIN | EPOLLPRI))
{
fd_ptr->handle_epoll_read();
}
if(cur_ev.events & EPOLLOUT)
{
fd_ptr->handle_epoll_write();
}
if (cur_ev.events & (EPOLLERR | EPOLLHUP))
{
fd_ptr->close();
}
}
}while(nfds >= 0);
return 0;
}
int epoll_impl_t::close()
{
m_running = false;
interupt_loop();
return 0;
}
int epoll_impl_t::register_fd(fd_i* fd_ptr_)
{
struct epoll_event ee = { 0, { 0 } };
ee.data.ptr = fd_ptr_;
ee.events = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;
return ::epoll_ctl(m_efd, EPOLL_CTL_ADD, fd_ptr_->socket(), &ee);
}
int epoll_impl_t::unregister_fd(fd_i* fd_ptr_)
{
int ret = 0;
if (fd_ptr_->socket() > 0)
{
struct epoll_event ee;
ee.data.ptr = (void*)0;
ret = ::epoll_ctl(m_efd, EPOLL_CTL_DEL, fd_ptr_->socket(), &ee);
}
{
lock_guard_t lock(m_mutex);
m_error_fd_set.push_back(fd_ptr_);
}
interupt_loop();
return ret;
}
int epoll_impl_t::mod_fd(fd_i* fd_ptr_)
{
struct epoll_event ee = { 0, { 0 } };
ee.data.ptr = fd_ptr_;
ee.events = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;
return ::epoll_ctl(m_efd, EPOLL_CTL_MOD, fd_ptr_->socket(), &ee);
}
void epoll_impl_t::fd_del_callback()
{
lock_guard_t lock(m_mutex);
list<fd_i*>::iterator it = m_error_fd_set.begin();
for (; it != m_error_fd_set.end(); ++it)
{
(*it)->handle_epoll_del();
}
m_error_fd_set.clear();
}
int epoll_impl_t::interupt_loop()//! 涓柇浜嬩歡寰幆
{
struct epoll_event ee = { 0, { 0 } };
ee.data.ptr = this;
ee.events = EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLHUP | EPOLLET;;
return ::epoll_ctl(m_efd, EPOLL_CTL_MOD, m_interupt_sockets[0], &ee);
}
register_fd
就是註冊對應的fd到epoll上;
event_loop是最重要的,就是有epoll事件時,根據事件型別呼叫相應的handler,這裡的EventLoop工作很專一就是處理網路時間,木有libev那樣還做了timer功能。
好了,現在簡單知道socketimpl裡的資料成員的作用了,看下cpp實現:
#include <errno.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "net/socket_impl.h"
#include "net/epoll_i.h"
#include "net/socket_controller_i.h"
#include "net/socket_op.h"
#include "base/singleton.h"
#include "base/lock.h"
#include "base/task_queue_i.h"
using namespace ff;
socket_impl_t::socket_impl_t(epoll_i* e_, socket_controller_i* seh_, int fd_, task_queue_i* tq_):
m_epoll(e_),
m_sc(seh_),
m_fd(fd_),
m_tq(tq_)
{
}
socket_impl_t::~socket_impl_t()
{
delete m_sc;
}
void socket_impl_t::open()
{
socket_op_t::set_nonblock(m_fd);
m_sc->handle_open(this);
async_recv();
}
void socket_impl_t::close()
{
m_tq->produce(task_binder_t::gen(&socket_impl_t::close_impl, this));
}
void socket_impl_t::close_impl()
{
if (m_fd > 0)
{
m_epoll->unregister_fd(this);
::close(m_fd);
m_fd = -1;
}
}
int socket_impl_t::handle_epoll_read()
{
m_tq->produce(task_binder_t::gen(&socket_impl_t::handle_epoll_read_impl, this));
return 0;
}
int socket_impl_t::handle_epoll_read_impl()
{
if (is_open())
{
int nread = 0;
char recv_buffer[RECV_BUFFER_SIZE];
do
{
nread = ::read(m_fd, recv_buffer, sizeof(recv_buffer) - 1);
if (nread > 0)
{
recv_buffer[nread] = '\0';
m_sc->handle_read(this, recv_buffer, size_t(nread));
if (nread < int(sizeof(recv_buffer) - 1))
{
break;//! equal EWOULDBLOCK
}
}
else if (0 == nread) //! eof
{
this->close();
return -1;
}
else
{
if (errno == EINTR)
{
continue;
}
else if (errno == EWOULDBLOCK)
{
break;
}
else
{
this->close();
return -1;
}
}
} while(1);
}
return 0;
}
int socket_impl_t::handle_epoll_del()
{
m_tq->produce(task_binder_t::gen(&socket_controller_i::handle_error, this->get_sc(), this));
return 0;
}
int socket_impl_t::handle_epoll_write()
{
m_tq->produce(task_binder_t::gen(&socket_impl_t::handle_epoll_write_impl, this));
return 0;
}
int socket_impl_t::handle_epoll_write_impl()
{
int ret = 0;
string left_buff;
if (false == is_open() || true == m_send_buffer.empty())
{
return 0;
}
do
{
const string& msg = m_send_buffer.front();
ret = do_send(msg, left_buff);
if (ret < 0)
{
this ->close();
return -1;
}
else if (ret > 0)
{
m_send_buffer.pop_front();
m_send_buffer.push_front(left_buff);
return 0;
}
else
{
m_send_buffer.pop_front();
}
} while (false == m_send_buffer.empty());
m_sc->handle_write_completed(this);
return 0;
}
void socket_impl_t::async_send(const string& msg_)
{
m_tq->produce(task_binder_t::gen(&socket_impl_t::send_impl, this, msg_));
}
void socket_impl_t::send_impl(const string& src_buff_)
{
string buff_ = src_buff_;
if (false == is_open() || m_sc->check_pre_send(this, buff_))
{
return;
}
//! socket buff is full, cache the data
if (false == m_send_buffer.empty())
{
m_send_buffer.push_back(buff_);
return;
}
string left_buff;
int ret = do_send(buff_, left_buff);
if (ret < 0)
{
this ->close();
}
else if (ret > 0)
{
m_send_buffer.push_back(left_buff);
}
else
{
//! send ok
m_sc->handle_write_completed(this);
}
}
int socket_impl_t::do_send(const string& buff_, string& left_buff_)
{
size_t nleft = buff_.size();
const char* buffer = buff_.data();
ssize_t nwritten;
while(nleft > 0)
{
if((nwritten = ::send(m_fd, buffer, nleft, MSG_NOSIGNAL)) <= 0)
{
if (EINTR == errno)
{
nwritten = 0;
}
else if (EWOULDBLOCK == errno)
{
left_buff_.assign(buff_.c_str() + (buff_.size() - nleft), nleft);
return 1;
}
else
{
this->close();
return -1;
}
}
nleft -= nwritten;
buffer += nwritten;
}
return 0;
}
void socket_impl_t::async_recv()
{
m_epoll->register_fd(this);
}
void socket_impl_t::safe_delete()
{
struct lambda_t
{
static void exe(void* p_)
{
delete ((socket_impl_t*)p_);
}
};
m_tq->produce(task_t(&lambda_t::exe, this));
}
socket_impl_t是從EventLoop裡監聽到的new client,在這裡open的時候,會被設定為非阻塞,然後socket_controller_i也會呼叫自己的open,比如gateway_socket_controller_t就是開始心跳計時。
在幾個關鍵函式裡,都是非同步投遞,比如handle_epoll_read的實現是這樣的:
int socket_impl_t::handle_epoll_read()
{
m_tq->produce(task_binder_t::gen(&socket_impl_t::handle_epoll_read_impl, this));
return 0;
}
為毛是非同步的,很簡單,task_queue_i的run函式是個whileloop,開個執行緒把run函式作為thread的啟動函式就行了。
socket_impl_t只是負責資料的底層讀寫,具體的應用層處理會交給socket_controller_i來處理,比如分包、解析、分發資料。
整個實現非常透明,很值得一讀。