1. 程式人生 > >Libevent原始碼分析-----連線監聽器evconnlistener

Libevent原始碼分析-----連線監聽器evconnlistener

使用evconnlistener:

        基於event和event_base已經可以寫一個CS模型了。但是對於伺服器端來說,仍然需要使用者自行呼叫socket、bind、listen、accept等步驟。這個過程有點繁瑣,為此在2.0.2-alpha版本的Libevent推出了一些對應的封裝函式。

        使用者只需初始化struct sockaddr_in結構體變數,然後把它作為引數傳給函式evconnlistener_new_bind即可。該函式會完成上面說到的那4個過程。下面的程式碼是一個使用例子。

#include<netinet/in.h>

#include<sys/socket.h>

#include<unistd.h>


#include<stdio.h>

#include<string.h>


#include<event.h>

#include<listener.h>

#include<bufferevent.h>

#include<thread.h>



void listener_cb(evconnlistener *listener, evutil_socket_t fd,

struct sockaddr *sock, int socklen, void *arg);


void socket_read_cb(bufferevent *bev, void *arg);

void socket_error_cb(bufferevent *bev, short events, void *arg);


int main()

{

evthread_use_pthreads();//enable threads


struct sockaddr_in sin;

memset(&sin, 0, sizeof(struct sockaddr_in));

sin.sin_family = AF_INET;

sin.sin_port = htons(8989);


event_base *base = event_base_new();

evconnlistener *listener

= evconnlistener_new_bind(base, listener_cb, base,

LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE,

10, (struct sockaddr*)&sin,

sizeof(struct sockaddr_in));


event_base_dispatch(base);


evconnlistener_free(listener);

event_base_free(base);


return 0;

}



//有新的客戶端連線到伺服器

//當此函式被呼叫時,libevent已經幫我們accept了這個客戶端。該客戶端的

//檔案描述符為fd

void listener_cb(evconnlistener *listener, evutil_socket_t fd,

struct sockaddr *sock, int socklen, void *arg)

{

event_base *base = (event_base*)arg;


//下面程式碼是為這個fd建立一個bufferevent

bufferevent *bev = bufferevent_socket_new(base, fd,

BEV_OPT_CLOSE_ON_FREE);


bufferevent_setcb(bev, socket_read_cb, NULL, socket_error_cb, NULL);

bufferevent_enable(bev, EV_READ | EV_PERSIST);

}



void socket_read_cb(bufferevent *bev, void *arg)

{

char msg[4096];


size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 );


msg[len] = '\0';

printf("server read the data %s\n", msg);


char reply[] = "I has read your data";

bufferevent_write(bev, reply, strlen(reply) );

}



void socket_error_cb(bufferevent *bev, short events, void *arg)

{

if (events & BEV_EVENT_EOF)

printf("connection closed\n");

else if (events & BEV_EVENT_ERROR)

printf("some other error\n");


//這將自動close套接字和free讀寫緩衝區

bufferevent_free(bev);

}



        上面的程式碼是一個伺服器端的例子,客戶端程式碼可以使用《Libevent使用例子,從簡單到複雜》博文中的客戶端。這裡就不貼客戶端程式碼了。

        從上面程式碼可以看到,當伺服器端監聽到一個客戶端的連線請求後,就會呼叫listener_cb這個回撥函式。這個回撥函式是在evconnlistener_new_bind函式中設定的。現在來看一下這個函式的引數有哪些,下面是其函式原型。

//listener.h檔案

typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *);


struct evconnlistener *evconnlistener_new_bind(struct event_base *base,

evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,

const struct sockaddr *sa, int socklen);

        第一個引數是很熟悉的event_base,無論怎麼樣都是離不開event_base這個發動機的。

        第二個引數是一個函式指標,該函式指標的格式如程式碼所示。當有新的客戶端請求連線時,該函式就會呼叫。要注意的是:當這個回撥函式被呼叫時,Libevent已經幫我們accept了這個客戶端。所以,該回調函式有一個引數是檔案描述符fd。我們直接使用這個fd即可。真是方便。這個引數是可以為NULL的,此時使用者並不能接收到客戶端。當用戶呼叫evconnlistener_set_cb函式設定回撥函式後,就可以了。

        第三個引數是傳給回撥函式的使用者引數,作用就像event_new函式的最後一個引數。

        引數flags是一些標誌值,有下面這些:

  • LEV_OPT_LEAVE_SOCKETS_BLOCKING:預設情況下,當連線監聽器接收到新的客戶端socket連線後,會把該socket設定為非阻塞的。如果設定該選項,那麼就把之客戶端socket保留為阻塞的
  • LEV_OPT_CLOSE_ON_FREE:當連線監聽器釋放時,會自動關閉底層的socket
  • LEV_OPT_CLOSE_ON_EXEC:為底層的socket設定close-on-exec標誌
  • LEV_OPT_REUSEABLE: 在某些平臺,預設情況下當一個監聽socket被關閉時,其他socket不能馬上繫結到同一個埠,要等一會兒才行。設定該標誌後,Libevent會把該socket設定成reuseable。這樣,關閉該socket後,其他socket就能馬上使用同一個埠
  • LEV_OPT_THREADSAFE:為連線監聽器分配鎖。這樣可以確保執行緒安全

        引數backlog是系統呼叫listen的第二個引數。最後兩個引數就不多說了。

evconnlistener的封裝:

        接下來看一下Libevent是怎麼封裝evconnlistener的。

用到的結構體:

//listener.c檔案

struct evconnlistener_ops {//一系列的工作函式

int (*enable)(struct evconnlistener *);

int (*disable)(struct evconnlistener *);

void (*destroy)(struct evconnlistener *);

void (*shutdown)(struct evconnlistener *);

evutil_socket_t (*getfd)(struct evconnlistener *);

struct event_base *(*getbase)(struct evconnlistener *);

};


struct evconnlistener {

const struct evconnlistener_ops *ops;//操作函式

void *lock; //鎖變數,用於執行緒安全

evconnlistener_cb cb;//使用者的回撥函式

evconnlistener_errorcb errorcb;//發生錯誤時的回撥函式

void *user_data;//回撥函式的引數

unsigned flags;//屬性標誌

short refcnt;//引用計數

unsigned enabled : 1;//位域為1.即只需一個位元位來儲存這個成員

};


struct evconnlistener_event {

struct evconnlistener base;

struct event listener; //內部event,插入到event_base

};

        在evconnlistener_event結構體有一個event結構體。可以想象,在實現時必然是將伺服器端的socket fd賦值給struct event 型別變數listener的fd成員。然後將listener加入到event_base,這樣就完成了自動監聽工作。這也迴歸到之前學過的內容。

        下面看一下具體是怎麼實現的。

初始化伺服器socket:

//listener.c檔案

struct evconnlistener *

evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,

void *ptr, unsigned flags, int backlog, const struct sockaddr *sa,

int socklen)

{

struct evconnlistener *listener;

evutil_socket_t fd;

int on = 1;

int family = sa ? sa->sa_family : AF_UNSPEC;


//監聽個數不能為0

if (backlog == 0)

return NULL;


fd = socket(family, SOCK_STREAM, 0);

if (fd == -1)

return NULL;


//LEV_OPT_LEAVE_SOCKETS_BLOCKING選項是應用於accept到的客戶端socket

//所以對於伺服器端的socket,直接將之設定為非阻塞的

if (evutil_make_socket_nonblocking(fd) < 0) {

evutil_closesocket(fd);

return NULL;

}


if (flags & LEV_OPT_CLOSE_ON_EXEC) {

if (evutil_make_socket_closeonexec(fd) < 0) {

evutil_closesocket(fd);

return NULL;

}

}


if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on))<0) {

evutil_closesocket(fd);

return NULL;

}

if (flags & LEV_OPT_REUSEABLE) {

if (evutil_make_listen_socket_reuseable(fd) < 0) {

evutil_closesocket(fd);

return NULL;

}

}


if (sa) {

if (bind(fd, sa, socklen)<0) {//繫結

evutil_closesocket(fd);

return NULL;

}

}


listener = evconnlistener_new(base, cb, ptr, flags, backlog, fd);

if (!listener) {

evutil_closesocket(fd);

return NULL;

}


return listener;

}

        evconnlistener_new_bind函式申請一個socket,然後對之進行一些有關非阻塞、重用、保持連線的處理、繫結到特定的IP和埠。最後把業務邏輯交給evconnlistener_new處理。

//listener.c檔案

static const struct evconnlistener_ops evconnlistener_event_ops = {

event_listener_enable,

event_listener_disable,

event_listener_destroy,

NULL, /* shutdown */

event_listener_getfd,

event_listener_getbase

};



struct evconnlistener *

evconnlistener_new(struct event_base *base,

evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,

evutil_socket_t fd)

{

struct evconnlistener_event *lev;


if (backlog > 0) {

if (listen(fd, backlog) < 0)

return NULL;

} else if (backlog < 0) {

if (listen(fd, 128) < 0)

return NULL;

}


lev = mm_calloc(1, sizeof(struct evconnlistener_event));

if (!lev)

return NULL;


//賦值

lev->base.ops = &evconnlistener_event_ops;

lev->base.cb = cb;

lev->base.user_data = ptr;

lev->base.flags = flags;

lev->base.refcnt = 1;


if (flags & LEV_OPT_THREADSAFE) {//執行緒安全就需要分配鎖

EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);

}


//在多路IO複用函式中,新客戶端的連線請求也被當作讀事件

event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,

listener_read_cb, lev);


//會呼叫event_add,把event加入到event_base中

evconnlistener_enable(&lev->base);


return &lev->base;

}


int

evconnlistener_enable(struct evconnlistener *lev)

{

int r;

LOCK(lev);

lev->enabled = 1;

if (lev->cb)

r = lev->ops->enable(lev);//實際上是呼叫下面的event_listener_enable函式

else

r = 0;

UNLOCK(lev);

return r;

}


static int

event_listener_enable(struct evconnlistener *lev)

{

struct evconnlistener_event *lev_e =

EVUTIL_UPCAST(lev, struct evconnlistener_event, base);


//加入到event_base,完成監聽工作。

return event_add(&lev_e->listener, NULL);

}

        幾個函式的一路呼叫,思路還是挺清晰的。就是申請一個socket,進行一些處理,然後用之賦值給event。最後把之add到event_base中。event_base會對新客戶端的請求連線進行監聽。

        在evconnlistener_enable函式裡面,如果使用者沒有設定回撥函式,那麼就不會呼叫event_listener_enable。也就是說並不會add到event_base中。

        event_listener_enable函式裡面的巨集EVUTIL_UPCAST可以根據結構體成員變數的地址推算出結構體的起始地址。有關這個巨集,可以檢視”結構體偏移量”。

處理客戶端的連線請求:

        現在來看一下event的回撥函式listener_read_cb。

//listener.c檔案

static void

listener_read_cb(evutil_socket_t fd, short what, void *p)

{

struct evconnlistener *lev = p;

int err;

evconnlistener_cb cb;

evconnlistener_errorcb errorcb;

void *user_data;

LOCK(lev);

while (1) { //可能有多個客戶端同時請求連線

struct sockaddr_storage ss;

#ifdef WIN32

int socklen = sizeof(ss);

#else

socklen_t socklen = sizeof(ss);

#endif

evutil_socket_t new_fd = accept(fd, (struct sockaddr*)&ss, &socklen);

if (new_fd < 0)

break;

if (socklen == 0) {

/* This can happen with some older linux kernels in

* response to nmap. */

evutil_closesocket(new_fd);

continue;

}


if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))

evutil_make_socket_nonblocking(new_fd);


//使用者還沒設定連線監聽器的回撥函式

if (lev->cb == NULL) {

UNLOCK(lev);

return;

}


//由於refcnt被初始化為1.這裡有++了,所以一般情況下並不會進入下面的

//if判斷裡面。但如果程在下面UNLOCK之後,第二個線呼叫evconnlistener_free

//釋放這個evconnlistener時,就有可能使得refcnt為1了。即進入那個判斷體裡

//執行listener_decref_and_unlock。在下面會討論這個問題。

++lev->refcnt;

cb = lev->cb;

user_data = lev->user_data;

UNLOCK(lev);

cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,

user_data);//呼叫使用者設定的回撥函式,讓使用者處理這個fd

LOCK(lev);

if (lev->refcnt == 1) {

int freed = listener_decref_and_unlock(lev);

EVUTIL_ASSERT(freed);

return;

}

--lev->refcnt;

}


err = evutil_socket_geterror(fd);

if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) {//還可以accept

UNLOCK(lev);

return;

}


//當有錯誤發生時才會執行到這裡

if (lev->errorcb != NULL) {

++lev->refcnt;

errorcb = lev->errorcb;

user_data = lev->user_data;

UNLOCK(lev);

errorcb(lev, user_data);//呼叫使用者設定的錯誤回撥函式

LOCK(lev);

listener_decref_and_unlock(lev);

}

}

        這個函式所做的工作也比較簡單,就是accept客戶端,然後呼叫使用者設定的回撥函式。所以,使用者回撥函式的引數fd是一個已經連線好了的socket。

        上面函式說到了錯誤回撥函式,可以通過下面的函式設定連線監聽器的錯誤監聽函式。

//listener.h檔案

typedef void (*evconnlistener_errorcb)(struct evconnlistener *, void *);


//listener.c檔案

void

evconnlistener_set_error_cb(struct evconnlistener *lev,

evconnlistener_errorcb errorcb)

{

LOCK(lev);

lev->errorcb = errorcb;

UNLOCK(lev);

}

釋放evconnlistener:

        呼叫evconnlistener_free可以釋放一個evconnlistener。由於evconnlistener擁有一些系統資源,在釋放evconnlistener_free的時候會釋放這些系統資源。

//listener.c檔案

void

evconnlistener_free(struct evconnlistener *lev)

{

LOCK(lev);

lev->cb = NULL;

lev->errorcb = NULL;

if (lev->ops->shutdown)//這裡的shutdown為NULL

lev->ops->shutdown(lev);


//引用次數減一,並解鎖

listener_decref_and_unlock(lev);

}


static int

listener_decref_and_unlock(struct evconnlistener *listener)

{

int refcnt = --listener->refcnt;

if (refcnt == 0) {

//實際呼叫event_listener_destroy

listener->ops->destroy(listener);

UNLOCK(listener);

//釋放鎖

EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);

mm_free(listener);

return 1;

} else {

UNLOCK(listener);

return 0;

}

}


static void

event_listener_destroy(struct evconnlistener *lev)

{

struct evconnlistener_event *lev_e =

EVUTIL_UPCAST(lev, struct evconnlistener_event, base);


//把event從event_base中刪除

event_del(&lev_e->listener);

if (lev->flags & LEV_OPT_CLOSE_ON_FREE)//如果使用者設定了這個選項,那麼要關閉socket

evutil_closesocket(event_get_fd(&lev_e->listener));

}

        要注意一點,LEV_OPT_CLOSE_ON_FREE選項關閉的是伺服器端的監聽socket,而非那些連線客戶端的socket。

        現在來說一下那個listener_decref_and_unlock。前面註釋說到,在函式listener_read_cb中,一般情況下是不會呼叫listener_decref_and_unlock,但在多執行緒的時候可能會呼叫。這種特殊情況是:當主執行緒accept到一個新客戶端時,會解鎖,並呼叫使用者設定的回撥函式。此時,引用計數等於2。就在這個時候,第二個執行緒執行evconnlistener_free函式。該函式會執行listener_decref_and_unlock。明顯主執行緒還在用這個evconnlistener,肯定不能刪除。此時引用計數也等於2也不會刪除。但使用者已經呼叫了evconnlistener_free。Libevent必須要響應。當第二個執行緒執行完後,主執行緒搶到CPU,此時引用計數就變成1了,也就進入到if判斷裡面了。在判斷體裡面執行函式listener_decref_and_unlock,並且完成刪除工作。

        總得來說,Libevent封裝的這個evconnlistener和一系列操作函式,還是比較簡單的。思路也比較清晰。

參考:

        http://www.wangafu.net/~nickm/libevent-book/Ref8_listener.html

本文來自 luotuo44 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/luotuo44/article/details/38800363?utm_source=copy