1. 程式人生 > >高效能伺服器(libevent網路庫)

高效能伺服器(libevent網路庫)

簡介
libevent是一個事件出發的網路庫,使用與 windows,linux,bad, mac os 等的高效能跨平臺網路庫 ,

支援的多種I/O 網路模型

linux(epoll),poll dev/poll select freebsd(kqueue ),windows(iocp)


libevent是一個事件觸發的網路庫,適用於windows、linux、bsd等多種平臺,
內部使用select、epoll、kqueue等系統呼叫管理事件機制。
著名分散式快取軟體memcached也是libevent based,
而且libevent在使用上可以做到跨平臺,而且根據libevent官方網站上公佈的資料統計,似乎也有著非凡的效能。

編譯庫程式碼,編譯指令碼會判斷OS支援哪種型別的事件機制(select、epoll或kqueue),
然後條件編譯相應程式碼,供上層使用的介面仍然是保持統一的(否則也不能所謂的跨平臺了)。
在linux redhat as 4 u 2 上編譯相當容易,configure以後make,make install就可以了,
windows上編譯似乎有點小麻煩,不過稍微改點東西也就通過了。
從程式碼中看,libevent支援使用者使用三種類型的事件,分別是網路IO、定時器、訊號三種,
在定時器的實現上使用了RB tree的資料結構,以達到高效查詢、排序、刪除定時器的目的,
網路IO上,主要關注了一下linux上的epoll(因為目前的開發主要在linux平臺),
結果發現libevent的epoll居然用的EPOLLLT,水平觸發的方式用起來比較方便,不容易出錯,
但是在效率上可能比EPOLLET要低一些。
跟網路無關的,libevent也有一些緩衝區管理的函式,而且是c風格的函式,實用性不是太大。
libevent沒有提供快取的函式。
雖然libevent實用上的價值不大,但它提供的介面形式還是不錯的,實現類似的lib的時候仍然是可以參考的。
Libevent定時器的資料結構自version 1.4起已由紅黑樹改為最小堆(Min Heap),以提高效率;
網路IO和訊號的資料結構採用了雙向佇列(TAILQ)。在實現上主要有3種連結串列:
EVLIST_INSERTED, EVLIST_ACTIVE, EVLIST_TIMEOUT,一個ev在這3種連結串列之間被插入或刪除,
處於EVLIST_ACTIVE連結串列中的ev最後將會被排程執行。
Libevent提供了DNS,HTTP Server,RPC等元件,HTTP Server可以說是Libevent的經典應用。
從http.c可看到Libevent的很多標準寫法。
寫非阻塞式的HTTP Server很容易將socket處理與HTTP協議處理糾纏在一起,Libevent在這點上似乎也有值得推敲的地方。

--------------------------------------------------------------------------------------------------------------------------


Libevent安裝過程
Version:libevent-2.0.16
#http://cloud.github.com/downloads/libevent/libevent/libevent-2.0.16-stable.tar.gz
#tar -xzvf libevent-2.0.16.tar.gz
#cd libevent-2.0.16
#./configure -prefix=/usr/local
#make
#sudo make install

原始碼檔案組織結構
標頭檔案:
1.libevent共用的標頭檔案都在event2目錄裡 2.正常標頭檔案後面沒有特效字尾
3.比如字尾“xx_struct.h”這種型別的檔案裡的任何結構體要是直接以來的話會破壞程式對其
他版本libevent的二進位制前榮幸,有時候是以非常難以除錯的方式出現
內部標頭檔案:
1.xxx-internal.h 後最的檔案內部使用的標頭檔案 2.目的是內部資料結構和函式,資訊隱藏
libevent框架
1.event.c裡有對event的整體框架實現
對系統I/O多路複用機制的封裝
1.epool.c : 對epoll的封裝
2.select.c : 對select的封裝
3.devpoll.c : 對dev/poll的封裝
4.kqueue.c : 對kqueue的封裝
定時事件管理
1.min-heap.h 定時器事件管理 堆結構
訊號事件管理
signal.c : 對新號事件的處理
補助功能函式
evutil.h 和 evutil.c util.h : 一些補助功能函式,
包括建立socket pair 和一些時間操作函式 加,減,等緩衝區管理
evbuffer.h bufferevent.h 對緩衝區的封裝
lievent 裡用到的基本資料結構
compat\sys 下的queue.h 檔案對 連結串列,雙向連結串列,

Libevent 庫的結構
Libevent_core
包含所有核心的事件和緩衝功能
event_base,evbuffer,bufferevent,和幾個附加補助功能
Libevent_extra
定義了程式可能需要,也可能不需要的協議特定功能,包括HTTP、DNS和RPC
Libevent
這個庫沒用以後版本會刪掉
Libevent_pthreads
pthread可一直執行緒庫的執行緒和鎖定實現
Libevent_openssl這個庫為使用bufferevent和OpenSSL進行加密的通訊提供支援。
注意bufferevent 目前只支援 tcp 不支援udp



Libevent主要功能主鍵
Evutil : 網路補助工具
Event,eventbase : Libevent核心事件和事件管理
Bufferevent :為libevent基於事件的核心提供使用方便的封裝除了通知程式套接字(注意 : 目前只針對於tcp udp不支援)
Evbuffer : 在bufferevent層之下實現了緩衝功能,並且提供了方便有效的訪問函式。

Libevent普通簡單例子實現Source code

int main( )
{
int listen_fd;
struct event ev_accept;
//--相當於建立一個事件堆 以後事件可以往這裡註冊了--
base = event_base_new();
listen_fd = socket( AF_INET,SOCK_STREAM,0 );
if( listen_fd <0 )
return;
int reuseaddr_on = 1;
if( setsockopt( listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr_on,sizeof(reuseaddr_on) ) == -1 )
{
cout<<"Error : Setsockopt failed" <<endl;
}
//--SetSocket_listenaddr--
struct sockaddr_in listen_addr;
memset( &listen_addr,0,sizeof( listen_addr ) );
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons( 80800 );
if( bind( listen_fd,(struct sockaddr*)&listen_addr,sizeof( listen_addr ) ) < 0)
{
cout<<"Error : Bind failed"<<endl;
}
if( listen( listen_fd,500 ) < 0 )
{
cout<<"Error : Listen failed"<<endl; return ;
}
if( SetNonBlock(listen_fd) < 0 )
{
cout<<"Error : SetNonBlock failed"<<endl; return;
}
//--初始化事件ev_accept 設定accept回撥函式和 和事件型別--
event_set( &ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL );
//--設定完的ev_accept事件註冊到 base裡--
event_base_set( base,&ev_accept );
//--正事新增事件 相當於註冊完的事件啟用--
event_add( &ev_accept,NULL );
//--事件堆run部分--
event_base_dispatch(base);
return 1;
}



accpet callback codeSource code

void on_accept(int fd,short ev,void* arg)
{
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof( client_addr );
client_fd = accept( fd,(struct sockaddr*)&client_addr,&client_len );
if( client_fd == -1 )
{
cout<<"Error : accept client failed"<<endl;
}
if (SetNonBlock(client_fd) < 0)
{
cout<<"Error : Set client socket nonblock"<<endl;
}
static int index = 0;
cout << "客戶端 "<< index <<"-----" << inet_ntoa( client_addr.sin_addr ) <<" 已連結 ~~~~~" <<endl;
index++;
//--accept到的 新客戶端 註冊一個recv 事件--
stMyClient* pClient = new stMyClient();
//--使用剛連線客戶端的檔案描述符 監視 recv 訊息--
event_set( &pClient->ev_read,client_fd,EV_READ|EV_PERSIST,on_read,pClient );
event_base_set( base,&pClient->ev_read );
event_add( &pClient->ev_read,NULL );
}

recv callback codeSource code

void on_read( int fd,short ev,void* arg )
{
struct stMyClient* client = (stMyClient*)arg;
char buff[65535];
memset( buff,0x00,sizeof( buff ) );
int nSize = read( fd,buff,65535 );
if( nSize == 0 )
{
cout<<"Client disconnected "<<endl;
close(fd);
event_del( &client->ev_read );
delete client;
return;
}
else if( nSize < 0 )
{
cout<<"Socket failed disconnected "<<endl;
close(fd);
event_del( &client->ev_read );
delete client;
return;
}
cout<<"Read :"<<buff<<endl;
}

關於使用Bufferevent 和 多執行緒用法Source code

//--執行緒回撥--
void* ProcessThread( void* pthread )
{
CVitNetThread* p = (CVitNetThread*)pthread;
//--p->GetEvQueue() 這裡獲取到的是event_base物件--
//--這裡相當於把事件堆繫結在當前執行緒裡--
event_base_dispatch(p->GetEvQueue());
return NULL;
}
void main()
{
//----listen bind 部分同上---
//
//----------------------------
m_pEvQueue = event_base_new();
event_assign( &m_incEvent, m_pEvQueue,fd, evType, accept_cb, this);
event_base_set( m_pEvQueue,&m_incEvent );
event_add( &m_incEvent,NULL );
//--注意這裡event_base堆裡一個事件都沒註冊情況下 不能建立執行緒--
//--因為event_base_dispatch();此函式判斷沒有事件註冊了的就會退出執行緒的--
int ret;
if ((ret = pthread_create(&m_incThreadID, NULL, ProcessThread,this)) != 0)
{
s_pLog->Log(LOG_ERROR, "%s %s => CreateThread failed",__FILE__,__FUNCTION__) ;return false;
}
}Source code

void accept_cb( int fd,short ev,void* arg )
{
CVitNetThread *p = (CVitNetThread*)arg;
int client_fd = -1;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof( client_addr );
client_fd = accept( fd,(struct sockaddr*)&client_addr,&client_len );
if( client_fd == -1 )
{
s_pLog->Log(LOG_ERROR, "%s %s => accept client failed fd = [%d]",__FILE__,__FUNCTION__,client_fd) ;return;
}
if (evutil_make_socket_nonblocking(client_fd) < 0)
{
s_pLog->Log(LOG_ERROR, "%s %s => Set client socket nonblock is failed",__FILE__,__FUNCTION__,client_fd) ;return;
}
//--這裡建立一個快取事件 物件 設定 recv write error 回撥函式--
//--這裡系統要是有recv到得資訊的時候會自動呼叫recv_cb回撥函式的
//--這裡系統要是有send資訊的時候自動呼叫write_cb回撥函式
//--這裡系統要是有出錯或延遲的時候回撥用此error_cb回撥函式
bufferevent* bufferev = bufferevent_new( client_fd,recv_cb,write_cb,error_cb,p );
if( bufferev == NULL )
{
s_pLog->Log(LOG_ERROR, "%s %s => bufferev bis NULL",__FILE__,__FUNCTION__) ;return;
}
//--bufferev 事件 註冊到 訊息堆裡 event_base 這裡( p->GetEvQueue()返回一個event_base物件)
bufferevent_base_set( p->GetEvQueue(),bufferev );
bufferevent_enable( bufferev, EV_READ | EV_WRITE );
}

bufferevent recv,errorSource code

void recv_cb( struct bufferevent *ev, void *arg )
{
CVitNetThread *p = (CVitNetThread*)arg;
if( p == NULL )
{
s_pLog->Log(LOG_ERROR, "%s %s => argument is NULL",__FILE__,__FUNCTION__) ;return;
}
int fd = bufferevent_getfd(ev);
char buffer[READ_MAX]; memset( buffer, 0x00,sizeof( buffer ) );
int ret = bufferevent_read(ev, &buffer,evbuffer_get_length( ev->input ));
cout<<buffer<<endl;
}
void error_cb( struct bufferevent *ev, short events ,void *arg )
{
CVitNetThread *p = (CVitNetThread*)arg;
int fd = bufferevent_getfd(ev);
if( events & BEV_EVENT_CONNECTED )
cout<<"Connect ok"<<endl;
else if( events & (BEV_EVENT_ERROR | BEV_EVENT_EOF) )
cout<<"disconnect"<<endl;
else if( events & BEV_EVENT_TIMEOUT )
cout<<"TimeOut"<<endl;
else if( events & BEV_EVENT_WRITING )
cout<<"Wrting"<<endl;
bufferevent_free(ev);
}