Libevent原始碼分析-----連線監聽器evconnlistener
使用evconnlistener:
基於event和event_base已經可以寫一個CS模型了。但是對於伺服器端來說,仍然需要使用者自行呼叫socket、bind、listen、accept等步驟。這個過程有點繁瑣,為此在2.0.2-alpha版本的Libevent推出了一些對應的封裝函式。
使用者只需初始化struct sockaddr_in結構體變數,然後把它作為引數傳給函式evconnlistener_new_bind即可。該函式會完成上面說到的那4個過程。下面的程式碼是一個使用例子。
#include<netinet/in.h> #include<sys/socket.h> #include<unistd.h> #include<stdio.h> #include<string.h> #include<event.h> #include<listener.h> #include<bufferevent.h> #include<thread.h> void listener_cb(evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *arg); void socket_read_cb(bufferevent *bev, void *arg); void socket_error_cb(bufferevent *bev, short events, void *arg); int main() { evthread_use_pthreads();//enable threads struct sockaddr_in sin; memset(&sin, 0, sizeof(struct sockaddr_in)); sin.sin_family = AF_INET; sin.sin_port = htons(8989); event_base *base = event_base_new(); evconnlistener *listener = evconnlistener_new_bind(base, listener_cb, base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE, 10, (struct sockaddr*)&sin, sizeof(struct sockaddr_in)); event_base_dispatch(base); evconnlistener_free(listener); event_base_free(base); return 0; } //有新的客戶端連線到伺服器 //當此函式被呼叫時,libevent已經幫我們accept了這個客戶端。該客戶端的 //檔案描述符為fd void listener_cb(evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *arg) { event_base *base = (event_base*)arg; //下面程式碼是為這個fd建立一個bufferevent bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, socket_read_cb, NULL, socket_error_cb, NULL); bufferevent_enable(bev, EV_READ | EV_PERSIST); } void socket_read_cb(bufferevent *bev, void *arg) { char msg[4096]; size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 ); msg[len] = '\0'; printf("server read the data %s\n", msg); char reply[] = "I has read your data"; bufferevent_write(bev, reply, strlen(reply) ); } void socket_error_cb(bufferevent *bev, short events, void *arg) { if (events & BEV_EVENT_EOF) printf("connection closed\n"); else if (events & BEV_EVENT_ERROR) printf("some other error\n"); //這將自動close套接字和free讀寫緩衝區 bufferevent_free(bev); }
上面的程式碼是一個伺服器端的例子,客戶端程式碼可以使用《Libevent使用例子,從簡單到複雜》博文中的客戶端。這裡就不貼客戶端程式碼了。
從上面程式碼可以看到,當伺服器端監聽到一個客戶端的連線請求後,就會呼叫listener_cb這個回撥函式。這個回撥函式是在evconnlistener_new_bind函式中設定的。現在來看一下這個函式的引數有哪些,下面是其函式原型。
//listener.h檔案 typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *); struct evconnlistener *evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, const struct sockaddr *sa, int socklen);
第一個引數是很熟悉的event_base,無論怎麼樣都是離不開event_base這個發動機的。
第二個引數是一個函式指標,該函式指標的格式如程式碼所示。當有新的客戶端請求連線時,該函式就會呼叫。要注意的是:當這個回撥函式被呼叫時,Libevent已經幫我們accept了這個客戶端。所以,該回調函式有一個引數是檔案描述符fd。我們直接使用這個fd即可。真是方便。這個引數是可以為NULL的,此時使用者並不能接收到客戶端。當用戶呼叫evconnlistener_set_cb函式設定回撥函式後,就可以了。
第三個引數是傳給回撥函式的使用者引數,作用就像event_new函式的最後一個引數。
引數flags是一些標誌值,有下面這些:
- LEV_OPT_LEAVE_SOCKETS_BLOCKING:預設情況下,當連線監聽器接收到新的客戶端socket連線後,會把該socket設定為非阻塞的。如果設定該選項,那麼就把之客戶端socket保留為阻塞的
- LEV_OPT_CLOSE_ON_FREE:當連線監聽器釋放時,會自動關閉底層的socket
- LEV_OPT_CLOSE_ON_EXEC:為底層的socket設定close-on-exec標誌
- LEV_OPT_REUSEABLE: 在某些平臺,預設情況下當一個監聽socket被關閉時,其他socket不能馬上繫結到同一個埠,要等一會兒才行。設定該標誌後,Libevent會把該socket設定成reuseable。這樣,關閉該socket後,其他socket就能馬上使用同一個埠
- LEV_OPT_THREADSAFE:為連線監聽器分配鎖。這樣可以確保執行緒安全
引數backlog是系統呼叫listen的第二個引數。最後兩個引數就不多說了。
evconnlistener的封裝:
接下來看一下Libevent是怎麼封裝evconnlistener的。
用到的結構體:
//listener.c檔案
struct evconnlistener_ops {//一系列的工作函式
int (*enable)(struct evconnlistener *);
int (*disable)(struct evconnlistener *);
void (*destroy)(struct evconnlistener *);
void (*shutdown)(struct evconnlistener *);
evutil_socket_t (*getfd)(struct evconnlistener *);
struct event_base *(*getbase)(struct evconnlistener *);
};
struct evconnlistener {
const struct evconnlistener_ops *ops;//操作函式
void *lock; //鎖變數,用於執行緒安全
evconnlistener_cb cb;//使用者的回撥函式
evconnlistener_errorcb errorcb;//發生錯誤時的回撥函式
void *user_data;//回撥函式的引數
unsigned flags;//屬性標誌
short refcnt;//引用計數
unsigned enabled : 1;//位域為1.即只需一個位元位來儲存這個成員
};
struct evconnlistener_event {
struct evconnlistener base;
struct event listener; //內部event,插入到event_base
};
在evconnlistener_event結構體有一個event結構體。可以想象,在實現時必然是將伺服器端的socket fd賦值給struct event 型別變數listener的fd成員。然後將listener加入到event_base,這樣就完成了自動監聽工作。這也迴歸到之前學過的內容。
下面看一下具體是怎麼實現的。
初始化伺服器socket:
//listener.c檔案
struct evconnlistener *
evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
void *ptr, unsigned flags, int backlog, const struct sockaddr *sa,
int socklen)
{
struct evconnlistener *listener;
evutil_socket_t fd;
int on = 1;
int family = sa ? sa->sa_family : AF_UNSPEC;
//監聽個數不能為0
if (backlog == 0)
return NULL;
fd = socket(family, SOCK_STREAM, 0);
if (fd == -1)
return NULL;
//LEV_OPT_LEAVE_SOCKETS_BLOCKING選項是應用於accept到的客戶端socket
//所以對於伺服器端的socket,直接將之設定為非阻塞的
if (evutil_make_socket_nonblocking(fd) < 0) {
evutil_closesocket(fd);
return NULL;
}
if (flags & LEV_OPT_CLOSE_ON_EXEC) {
if (evutil_make_socket_closeonexec(fd) < 0) {
evutil_closesocket(fd);
return NULL;
}
}
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on))<0) {
evutil_closesocket(fd);
return NULL;
}
if (flags & LEV_OPT_REUSEABLE) {
if (evutil_make_listen_socket_reuseable(fd) < 0) {
evutil_closesocket(fd);
return NULL;
}
}
if (sa) {
if (bind(fd, sa, socklen)<0) {//繫結
evutil_closesocket(fd);
return NULL;
}
}
listener = evconnlistener_new(base, cb, ptr, flags, backlog, fd);
if (!listener) {
evutil_closesocket(fd);
return NULL;
}
return listener;
}
evconnlistener_new_bind函式申請一個socket,然後對之進行一些有關非阻塞、重用、保持連線的處理、繫結到特定的IP和埠。最後把業務邏輯交給evconnlistener_new處理。
//listener.c檔案
static const struct evconnlistener_ops evconnlistener_event_ops = {
event_listener_enable,
event_listener_disable,
event_listener_destroy,
NULL, /* shutdown */
event_listener_getfd,
event_listener_getbase
};
struct evconnlistener *
evconnlistener_new(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
evutil_socket_t fd)
{
struct evconnlistener_event *lev;
if (backlog > 0) {
if (listen(fd, backlog) < 0)
return NULL;
} else if (backlog < 0) {
if (listen(fd, 128) < 0)
return NULL;
}
lev = mm_calloc(1, sizeof(struct evconnlistener_event));
if (!lev)
return NULL;
//賦值
lev->base.ops = &evconnlistener_event_ops;
lev->base.cb = cb;
lev->base.user_data = ptr;
lev->base.flags = flags;
lev->base.refcnt = 1;
if (flags & LEV_OPT_THREADSAFE) {//執行緒安全就需要分配鎖
EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
}
//在多路IO複用函式中,新客戶端的連線請求也被當作讀事件
event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
listener_read_cb, lev);
//會呼叫event_add,把event加入到event_base中
evconnlistener_enable(&lev->base);
return &lev->base;
}
int
evconnlistener_enable(struct evconnlistener *lev)
{
int r;
LOCK(lev);
lev->enabled = 1;
if (lev->cb)
r = lev->ops->enable(lev);//實際上是呼叫下面的event_listener_enable函式
else
r = 0;
UNLOCK(lev);
return r;
}
static int
event_listener_enable(struct evconnlistener *lev)
{
struct evconnlistener_event *lev_e =
EVUTIL_UPCAST(lev, struct evconnlistener_event, base);
//加入到event_base,完成監聽工作。
return event_add(&lev_e->listener, NULL);
}
幾個函式的一路呼叫,思路還是挺清晰的。就是申請一個socket,進行一些處理,然後用之賦值給event。最後把之add到event_base中。event_base會對新客戶端的請求連線進行監聽。
在evconnlistener_enable函式裡面,如果使用者沒有設定回撥函式,那麼就不會呼叫event_listener_enable。也就是說並不會add到event_base中。
event_listener_enable函式裡面的巨集EVUTIL_UPCAST可以根據結構體成員變數的地址推算出結構體的起始地址。有關這個巨集,可以檢視”結構體偏移量”。
處理客戶端的連線請求:
現在來看一下event的回撥函式listener_read_cb。
//listener.c檔案
static void
listener_read_cb(evutil_socket_t fd, short what, void *p)
{
struct evconnlistener *lev = p;
int err;
evconnlistener_cb cb;
evconnlistener_errorcb errorcb;
void *user_data;
LOCK(lev);
while (1) { //可能有多個客戶端同時請求連線
struct sockaddr_storage ss;
#ifdef WIN32
int socklen = sizeof(ss);
#else
socklen_t socklen = sizeof(ss);
#endif
evutil_socket_t new_fd = accept(fd, (struct sockaddr*)&ss, &socklen);
if (new_fd < 0)
break;
if (socklen == 0) {
/* This can happen with some older linux kernels in
* response to nmap. */
evutil_closesocket(new_fd);
continue;
}
if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
evutil_make_socket_nonblocking(new_fd);
//使用者還沒設定連線監聽器的回撥函式
if (lev->cb == NULL) {
UNLOCK(lev);
return;
}
//由於refcnt被初始化為1.這裡有++了,所以一般情況下並不會進入下面的
//if判斷裡面。但如果程在下面UNLOCK之後,第二個線呼叫evconnlistener_free
//釋放這個evconnlistener時,就有可能使得refcnt為1了。即進入那個判斷體裡
//執行listener_decref_and_unlock。在下面會討論這個問題。
++lev->refcnt;
cb = lev->cb;
user_data = lev->user_data;
UNLOCK(lev);
cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
user_data);//呼叫使用者設定的回撥函式,讓使用者處理這個fd
LOCK(lev);
if (lev->refcnt == 1) {
int freed = listener_decref_and_unlock(lev);
EVUTIL_ASSERT(freed);
return;
}
--lev->refcnt;
}
err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) {//還可以accept
UNLOCK(lev);
return;
}
//當有錯誤發生時才會執行到這裡
if (lev->errorcb != NULL) {
++lev->refcnt;
errorcb = lev->errorcb;
user_data = lev->user_data;
UNLOCK(lev);
errorcb(lev, user_data);//呼叫使用者設定的錯誤回撥函式
LOCK(lev);
listener_decref_and_unlock(lev);
}
}
這個函式所做的工作也比較簡單,就是accept客戶端,然後呼叫使用者設定的回撥函式。所以,使用者回撥函式的引數fd是一個已經連線好了的socket。
上面函式說到了錯誤回撥函式,可以通過下面的函式設定連線監聽器的錯誤監聽函式。
//listener.h檔案
typedef void (*evconnlistener_errorcb)(struct evconnlistener *, void *);
//listener.c檔案
void
evconnlistener_set_error_cb(struct evconnlistener *lev,
evconnlistener_errorcb errorcb)
{
LOCK(lev);
lev->errorcb = errorcb;
UNLOCK(lev);
}
釋放evconnlistener:
呼叫evconnlistener_free可以釋放一個evconnlistener。由於evconnlistener擁有一些系統資源,在釋放evconnlistener_free的時候會釋放這些系統資源。
//listener.c檔案
void
evconnlistener_free(struct evconnlistener *lev)
{
LOCK(lev);
lev->cb = NULL;
lev->errorcb = NULL;
if (lev->ops->shutdown)//這裡的shutdown為NULL
lev->ops->shutdown(lev);
//引用次數減一,並解鎖
listener_decref_and_unlock(lev);
}
static int
listener_decref_and_unlock(struct evconnlistener *listener)
{
int refcnt = --listener->refcnt;
if (refcnt == 0) {
//實際呼叫event_listener_destroy
listener->ops->destroy(listener);
UNLOCK(listener);
//釋放鎖
EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
mm_free(listener);
return 1;
} else {
UNLOCK(listener);
return 0;
}
}
static void
event_listener_destroy(struct evconnlistener *lev)
{
struct evconnlistener_event *lev_e =
EVUTIL_UPCAST(lev, struct evconnlistener_event, base);
//把event從event_base中刪除
event_del(&lev_e->listener);
if (lev->flags & LEV_OPT_CLOSE_ON_FREE)//如果使用者設定了這個選項,那麼要關閉socket
evutil_closesocket(event_get_fd(&lev_e->listener));
}
要注意一點,LEV_OPT_CLOSE_ON_FREE選項關閉的是伺服器端的監聽socket,而非那些連線客戶端的socket。
現在來說一下那個listener_decref_and_unlock。前面註釋說到,在函式listener_read_cb中,一般情況下是不會呼叫listener_decref_and_unlock,但在多執行緒的時候可能會呼叫。這種特殊情況是:當主執行緒accept到一個新客戶端時,會解鎖,並呼叫使用者設定的回撥函式。此時,引用計數等於2。就在這個時候,第二個執行緒執行evconnlistener_free函式。該函式會執行listener_decref_and_unlock。明顯主執行緒還在用這個evconnlistener,肯定不能刪除。此時引用計數也等於2也不會刪除。但使用者已經呼叫了evconnlistener_free。Libevent必須要響應。當第二個執行緒執行完後,主執行緒搶到CPU,此時引用計數就變成1了,也就進入到if判斷裡面了。在判斷體裡面執行函式listener_decref_and_unlock,並且完成刪除工作。
總得來說,Libevent封裝的這個evconnlistener和一系列操作函式,還是比較簡單的。思路也比較清晰。
參考:
http://www.wangafu.net/~nickm/libevent-book/Ref8_listener.html
本文來自 luotuo44 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/luotuo44/article/details/38800363?utm_source=copy