Libevent原始碼分析-----bufferevent工作流程探究
和之前的《Libevent工作流程探究》一樣,這裡也是用一個例子來探究bufferevent的工作流程。具體的例子可以參考《》,這裡就不列出了。其實要做的例子也就是bufferevent_socket_new、bufferevent_setcb、bufferevent_enable這幾個函式。
因為本文會用到《Libevent工作流程探究》中提到的說法,比如將一個event插入到event_base中。所以讀者最好先讀一下那篇博文。此外,因為bufferevent結構體本身會使用evbuffer結構體和還會呼叫相應的一些操作,所以讀者還應該先閱讀《evbuffer結構與基本操作bufferevent結構體:
bufferevent其實也就是在event_base的基礎上再進行一層封裝,其本質還是離不開event和event_base,從bufferevent的結構體就可以看到這一點。
bufferevent結構體中有兩個event,分別用來監聽同一個fd的可讀事件和可寫事件。為什麼不用一個event同時監聽可讀和可寫呢?這是因為監聽可寫是困難的,下面會說到原因。讀者也可以自問一下,自己之前有沒有試過用最原始的event監聽一個fd的可寫。
由於socket 是全雙工的,所以在bufferevent結構體中,也有兩個evbuffer成員,分別是讀緩衝區和寫緩衝區。 bufferevent結構體定義如下:
//bufferevent_struct.h檔案 struct bufferevent { struct event_base *ev_base; //操作結構體,成員有一些函式指標。類似struct eventop結構體 const struct bufferevent_ops *be_ops; struct event ev_read;//讀事件event struct event ev_write;//寫事件event struct evbuffer *input;//讀緩衝區 struct evbuffer *output; //寫緩衝區 struct event_watermark wm_read;//讀水位 struct event_watermark wm_write;//寫水位 bufferevent_data_cb readcb;//可讀時的回撥函式指標 bufferevent_data_cb writecb;//可寫時的回撥函式指標 bufferevent_event_cb errorcb;//錯誤發生時的回撥函式指標 void *cbarg;//回撥函式的引數 struct timeval timeout_read;//讀事件event的超時值 struct timeval timeout_write;//寫事件event的超時值 /** Events that are currently enabled: currently EV_READ and EV_WRITE are supported. */ short enabled; };
如果看過Libevent的參考手冊的話,應該還會知道bufferevent除了用於socket外,還可以用於socketpair 和 filter。如果用面向物件的思維,應從這個三個應用中抽出相同的部分作為父類,然後派生出三個子類。
Libevent雖然是用C語言寫的,不過它還是提取出一些公共部分,然後定義一個bufferevent_private結構體,用於儲存這些公共部分成員。從集合的角度來說,bufferevent_private應該是bufferevent的一個子集,即一部分。但在Libevent中,bufferevent確實bufferevent_private的一個成員。下面是bufferevent_private結構體。
//bufferevent-internal.h檔案
struct bufferevent_private {
struct bufferevent bev;
//設定input evbuffer的高水位時,需要一個evbuffer回撥函式配合工作
struct evbuffer_cb_entry *read_watermarks_cb;
/** If set, we should free the lock when we free the bufferevent. */
//鎖是Libevent自動分配的,還是使用者分配的
unsigned own_lock : 1;
...
//這個socket是否處理正在連線伺服器狀態
unsigned connecting : 1;
//標誌連線被拒絕
unsigned connection_refused : 1;
//標誌是什麼原因把 讀 掛起來
bufferevent_suspend_flags read_suspended;
//標誌是什麼原因把 寫 掛起來
bufferevent_suspend_flags write_suspended;
enum bufferevent_options options;
int refcnt;// bufferevent的引用計數
//鎖變數
void *lock;
};
新建一個bufferevent:
函式bufferevent_socket_new可以完成這個工作。
//bufferevent-internal.h檔案
struct bufferevent_ops {
const char *type;//型別名稱
off_t mem_offset;//成員bev的偏移量
//啟動。將event加入到event_base中
int (*enable)(struct bufferevent *, short);
//關閉。將event從event_base中刪除
int (*disable)(struct bufferevent *, short);
//銷燬
void (*destruct)(struct bufferevent *);
//調整event的超時值
int (*adj_timeouts)(struct bufferevent *);
/** Called to flush data. */
int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
//獲取成員的值。具體看實現
int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
};
//bufferevent_sock.c檔案
const struct bufferevent_ops bufferevent_ops_socket = {
"socket",
evutil_offsetof(struct bufferevent_private, bev),
be_socket_enable,
be_socket_disable,
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
be_socket_ctrl,
};
//由於有幾個不同型別的bufferevent,而且它們的enable、disable等操作是不同的。所以
//需要的一些函式指標指明某個型別的bufferevent應該使用哪些操作函式。結構體bufferevent_ops_socket
//就應運而生。對於socket,其操作函式如上。
//bufferevent_sock.c檔案
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
int options)
{
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
...//win32
//結構體記憶體清零,所有成員都為0
if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
//如果options中需要執行緒安全,那麼就會申請鎖
//會新建一個輸入和輸出快取區
if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
options) < 0) {
mm_free(bufev_p);
return NULL;
}
bufev = &bufev_p->bev;
//設定將evbuffer的資料向fd傳
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
//將fd與event相關聯。同一個fd關聯兩個event
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
//設定evbuffer的回撥函式,使得外界給寫緩衝區新增資料時,能觸發
//寫操作,這個回撥對於寫事件的監聽是很重要的
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
//凍結讀緩衝區的尾部,未解凍之前不能往讀緩衝區追加資料
//也就是說不能從socket fd中讀取資料
evbuffer_freeze(bufev->input, 0);
//凍結寫緩衝區的頭部,未解凍之前不能把寫緩衝區的頭部資料刪除
//也就是說不能把資料寫到socket fd
evbuffer_freeze(bufev->output, 1);
return bufev;
}
留意函式裡面的evbuffer_add_cb呼叫,後面會說到。
函式在最後面會凍結兩個緩衝區。其實,雖然這裡凍結了,但實際上Libevent在讀資料或者寫資料之前會解凍的讀完或者寫完資料後,又會馬上凍結。這主要防止資料被意外修改。使用者一般不會直接呼叫evbuffer_freeze或者evbuffer_unfreeze函式。一切的凍結和解凍操作都由Libevent內部完成。還有一點要注意,因為這裡只是把寫緩衝區的頭部凍結了。所以還是可以往寫緩衝區的尾部追加資料。同樣,此時也是可以從讀緩衝區讀取資料。這個是必須的。因為在Libevent內部不解凍的時候,使用者需要從讀緩衝區中獲取資料(這相當於從socket fd中讀取資料),使用者也需要把資料寫到寫緩衝區中(這相當於把資料寫入到socket fd中)。
在bufferevent_socket_new函式裡面會呼叫函式bufferevent_init_common完成公有部分的初始化。
//bufferevent.c檔案
int
bufferevent_init_common(struct bufferevent_private *bufev_private,
struct event_base *base,
const struct bufferevent_ops *ops,
enum bufferevent_options options)
{
struct bufferevent *bufev = &bufev_private->bev;
//分配輸入緩衝區
if (!bufev->input) {
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
}
//分配輸出緩衝區
if (!bufev->output) {
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
return -1;
}
}
bufev_private->refcnt = 1;//引用次數為1
bufev->ev_base = base;
/* Disable timeouts. */
//預設情況下,讀和寫event都是不支援超時的
evutil_timerclear(&bufev->timeout_read);
evutil_timerclear(&bufev->timeout_write);
bufev->be_ops = ops;
/*
* Set to EV_WRITE so that using bufferevent_write is going to
* trigger a callback. Reading needs to be explicitly enabled
* because otherwise no data will be available.
*/
//可寫是預設支援的
bufev->enabled = EV_WRITE;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE) {
//申請鎖。
if (bufferevent_enable_locking(bufev, NULL) < 0) {
/* cleanup */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
bufev->input = NULL;
bufev->output = NULL;
return -1;
}
}
#endif
...//延遲呼叫的初始化,一般不需要用到
bufev_private->options = options;
//將evbuffer和bufferevent相關聯
evbuffer_set_parent(bufev->input, bufev);
evbuffer_set_parent(bufev->output, bufev);
return 0;
}
程式碼中可以看到,預設是enable EV_WRITE的。
設定回撥函式:
函式bufferevent_setcb完成這個工作。該函式相當簡單,也就是進行一些賦值操作。
//bufferevent.c檔案
void
bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
{
//bufferevent結構體內部有一個鎖變數
BEV_LOCK(bufev);
bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = eventcb;
bufev->cbarg = cbarg;
BEV_UNLOCK(bufev);
}
如果不想設定某個操作的回撥函式,直接設定為NULL即可。
令bufferevent可以工作:
相信讀者也知道,即使呼叫了bufferevent_socket_new和bufferevent_setcb,這個bufferevent還是不能工作,必須呼叫bufferevent_enable。為什麼會這樣的呢?
如果看過之前的那些博文,相信讀者知道,一個event能夠工作,不僅僅需要new出來,還要呼叫event_add函式,把這個event新增到event_base中。在本文前面的程式碼中,並沒有看到event_add函式的呼叫。所以還需要呼叫一個函式,把event新增到event_base中。函式bufferevent_enable就是完成這個工作的。
//bufferevent.c檔案
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;
//增加引用並加鎖
//增加引用是為了防止其他執行緒呼叫bufferevent_free,釋放了bufferevent
_bufferevent_incref_and_lock(bufev);
//掛起了讀,此時不能監聽讀事件
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
//掛起了寫,此時不能監聽寫事情
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
//呼叫對應型別的enbale函式。因為不同型別的bufferevent有不同的enable函式
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
//減少引用並解鎖
_bufferevent_decref_and_unlock(bufev);
return r;
}
上面程式碼可以看到,最終會呼叫對應bufferevent型別的enable函式,對於socket bufferevent,其enable函式是be_socket_enable,程式碼如下:
//bufferevent.c檔案
int
_bufferevent_add_event(struct event *ev, const struct timeval *tv)
{
if (tv->tv_sec == 0 && tv->tv_usec == 0)
return event_add(ev, NULL);
else
return event_add(ev, tv);
}
//bufferevent_sock.c檔案
#define be_socket_add(ev, t) \
_bufferevent_add_event((ev), (t))
static int
be_socket_enable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
return -1;
}
if (event & EV_WRITE) {
if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
return -1;
}
return 0;
}
如果讀者熟悉Libevent的超時事件,那麼可以知道Libevent是在event_add函式裡面確定一個event的超時的。上面程式碼也展示了這一點,如果讀或者寫event設定了超時(即其超時值不為0),那麼就會作為引數傳給event_add函式。如果讀者不熟悉的Libevent的超時事件的話,可以參考《超時event的處理》。
使用者可以呼叫函式bufferevent_set_timeouts,設定讀或者寫事件的超時。程式碼如下:
//bufferevent.c檔案
int
bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *tv_read,
const struct timeval *tv_write)
{
int r = 0;
BEV_LOCK(bufev);
if (tv_read) {
bufev->timeout_read = *tv_read;
} else {
evutil_timerclear(&bufev->timeout_read);
}
if (tv_write) {
bufev->timeout_write = *tv_write;
} else {
evutil_timerclear(&bufev->timeout_write);
}
if (bufev->be_ops->adj_timeouts)
r = bufev->be_ops->adj_timeouts(bufev);
BEV_UNLOCK(bufev);
return r;
}
//bufferevent_sock.c檔案
static int
be_socket_adj_timeouts(struct bufferevent *bufev)
{
int r = 0;
//使用者監聽了讀事件
if (event_pending(&bufev->ev_read, EV_READ, NULL))
if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
r = -1;
//使用者監聽了寫事件
if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
r = -1;
}
return r;
}
從上面程式碼可以看到:使用者不僅僅可以設定超時值,還可以修改超時值,也是通過這個函式進行修的。當然也是可以刪除超時的,直接把超時引數設定成NULL即可。
至此,已經完成了bufferevent的初始化工作,只需呼叫event_base_dispatch函式,啟動發動機就可以工作了。處理讀事件:
接下來的任務:底層的socket fd接收資料後,bufferevent是怎麼工作的。
讀事件的水位:
在講解讀事件之前,先來看一下水位問題,函式bufferevent_setwatermark可以設定讀和寫的水位。這裡只講解讀事件的水位。
水位有兩個:低水位和高水位。
低水位比較容易懂,就是當可讀的資料量到達這個低水位後,才會呼叫使用者設定的回撥函式。比如使用者想每次讀取100位元組,那麼就可以把低水位設定為100。當可讀資料的位元組數小於100時,即使有資料都不會打擾使用者(即不會呼叫使用者設定的回撥函式)。可讀資料大於等於100位元組後,才會呼叫使用者的回撥函式。
高水位是什麼呢?其實,這和使用者的回撥函式沒有關係。它的意義是:把讀事件的evbuffer的資料量限制在高水位之下。比如,使用者認為讀緩衝區不能太大(太大的話,連結串列會很長)。那麼使用者就會設定讀事件的高水位。當讀緩衝區的資料量達到這個高水位後,即使socket fd還有資料沒有讀,也不會讀進這個讀緩衝區裡面。一句話說,就是控制evbuffer的大小。
雖然控制了evbuffer的大小,但socket fd可能還有資料。有資料就會觸發可讀事件,但處理可讀的時候,又會發現設定了高水位,不能讀取資料evbuffer。socket fd的資料沒有被讀完,又觸發……。這個貌似是一個死迴圈。實際上是不會出現這個死迴圈的,因為Libevent發現evbuffer的資料量到達高水位後,就會把可讀事件給掛起來,讓它不能再觸發了。Libevent使用函式bufferevent_wm_suspend_read把監聽讀事件的event掛起來。下面看一下Libevent是怎麼把一個event掛起來的。
//bufferevent-internal.h檔案
#define bufferevent_wm_suspend_read(b) \
bufferevent_suspend_read((b), BEV_SUSPEND_WM)
//bufferevent.c檔案
void
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (!bufev_private->read_suspended)//不能掛多次
bufev->be_ops->disable(bufev, EV_READ);//實際呼叫be_socket_disable函式
bufev_private->read_suspended |= what;//因何而被掛起
BEV_UNLOCK(bufev);
}
//bufferevent_sock.c檔案
static int
be_socket_disable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (event & EV_READ) {
if (event_del(&bufev->ev_read) == -1)
return -1;
}
/* Don't actually disable the write if we are trying to connect. */
if ((event & EV_WRITE) && ! bufev_p->connecting) {
if (event_del(&bufev->ev_write) == -1)//刪掉這個event
return -1;
}
return 0;
}
居然是直接刪除這個監聽讀事件的event,真的是掛了!!!
看來不能隨便設定高水位,因為它會暫停讀。如果只想設定低水位而不想設定高水位,那麼在呼叫bufferevent_setwatermark函式時,高水位的引數設為0即可。
那麼什麼時候取消掛起,讓bufferevent可以繼續讀socket 資料呢?從高水位的意義來說,當然是當evbuffer裡面的資料量小於高水位時,就能再次讀取socket資料了。現在來看一下Libevent是怎麼恢復讀的。看一下設定水位的函式bufferevent_setwatermark吧,它進行了一些為高水位埋下了一個回撥函式。對,就是evbuffer的回撥函式。前一篇博文說到,當evbuffer裡面的資料新增或者刪除時,是會觸發一些回撥函式的。當用戶移除evbuffer的一些資料量時,Libevent就會檢查這個evbuffer的資料量是否小於高水位,如果小於的話,那麼就恢復 讀事件。
不說這麼多了,上程式碼。
//bufferevent.c檔案
void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
if (highmark) {//高水位
/* There is now a new high-water mark for read.
enable the callback if needed, and see if we should
suspend/bufferevent_wm_unsuspend. */
//還沒設定高水位的回撥函式
if (bufev_private->read_watermarks_cb == NULL) {
bufev_private->read_watermarks_cb =
evbuffer_add_cb(bufev->input,
bufferevent_inbuf_wm_cb,
bufev);//添加回調函式
}
evbuffer_cb_set_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
//設定(修改)高水位時,evbuffer的資料量已經超過了水位值
//可能是把之前的高水位調高或者調低
//掛起操作和取消掛起操作都是冪等的(即多次掛起的作用等同於掛起一次)
if (evbuffer_get_length(bufev->input) > highmark)
bufferevent_wm_suspend_read(bufev);
else if (evbuffer_get_length(bufev->input) < highmark)//調低了
bufferevent_wm_unsuspend_read(bufev);
} else {
//高水位值等於0,那麼就要取消掛起 讀事件
//取消掛起操作是冪等的
/* There is now no high-water mark for read. */
if (bufev_private->read_watermarks_cb)
evbuffer_cb_clear_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED);
bufferevent_wm_unsuspend_read(bufev);
}
}
BEV_UNLOCK(bufev);
}
這個函式,不僅僅為高水位設定回撥函式,還會檢查當前evbuffer的資料量是否超過了高水位。因為這個設定水位函式可能是在bufferevent工作一段時間後才新增的,所以evbuffer是有可能已經有資料的了,因此需要檢查。如果超過了水位值,那麼就需要掛起讀。當然也存在另外一種可能:使用者之前設定過了一個比較大的高水位,掛起了讀。現在發現錯了,就把高水位調低一點,此時就需要恢復讀。
現在假設使用者移除了一些evbuffer的資料,進而觸發了evbuffer的回撥函式,當然也就呼叫了函式bufferevent_inbuf_wm_cb。下面看一下這個函式是怎麼恢復讀的。
//bufferevent.c檔案
static void
bufferevent_inbuf_wm_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
size_t size;
size = evbuffer_get_length(buf);
if (size >= bufev->wm_read.high)
bufferevent_wm_suspend_read(bufev);
else
bufferevent_wm_unsuspend_read(bufev);
}
//bufferevent-internal.h檔案
#define bufferevent_wm_unsuspend_read(b) \
bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)
//bufferevent.c檔案
void
bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev_private->read_suspended &= ~what;
if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
bufev->be_ops->enable(bufev, EV_READ);//重新把event插入到event_base中
BEV_UNLOCK(bufev);
}
因為使用者可以手動為這個evbuffer新增資料,此時也會呼叫bufferevent_inbuf_wm_cb函式。此時就要檢查evbuffer的資料量是否已經超過高水位了,而不能僅僅檢查是否低於高水位。
高水位導致讀的掛起和之後讀的恢復,一切工作都是由Libevent內部完成的,使用者不用做任何工作。
從socket中讀取資料:
從前面的一系列博文可以知道,如果一個socket可讀了,那麼監聽可讀事件的event的回撥函式就會被呼叫。這個回撥函式是在bufferevent_socket_new函式中被Libevent內部設定的,設定為bufferevent_readcb函式,使用者並不知情。
當socket有資料可讀時,Libevent就會監聽到,然後呼叫bufferevent_readcb函式處理。該函式會呼叫evbuffer_read函式,把資料從socket fd中讀取到evbuffer中。然後再呼叫使用者在bufferevent_setcb函式中設定的讀事件回撥函式。所以,當用戶的讀事件回撥函式被呼叫時,資料已經在evbuffer中了,使用者拿來就用,無需呼叫read這類會阻塞的函式。
下面看一下bufferevent_readcb函式的具體實現。static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
struct evbuffer *input;
int res = 0;
short what = BEV_EVENT_READING;
ev_ssize_t howmuch = -1, readmax=-1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_READ, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}
input = bufev->input;
//使用者設定了高水位
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - evbuffer_get_length(input);
/* we somehow lowered the watermark, stop reading */
if (howmuch <= 0) {
bufferevent_wm_suspend_read(bufev);
goto done;
}
}
//因為使用者可以限速,所以這麼要檢測最大的可讀大小。
//如果沒有限速的話,那麼將返回16384位元組,即16K
//預設情況下是沒有限速的。
readmax = _bufferevent_get_read_max(bufev_p);
if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
* uglifies this code. XXXX */
howmuch = readmax;
//一些原因導致讀 被掛起,比如加鎖了。
if (bufev_p->read_suspended)
goto done;
//解凍,使得可以在input的後面追加資料
evbuffer_unfreeze(input, 0);
res = evbuffer_read(input, fd, (int)howmuch); //從socket fd中讀取資料
evbuffer_freeze(input, 0);//凍結
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))//EINTER or EAGAIN
goto reschedule;
//不是 EINTER or EAGAIN 這兩個可以重試的錯誤,那麼就應該是其他致命的錯誤
//此時,應該報告給使用者
what |= BEV_EVENT_ERROR;/**< unrecoverable error encountered */
} else if (res == 0) {//斷開了連線
what |= BEV_EVENT_EOF;
}
if (res <= 0)
goto error;
//速率相關的操作
_bufferevent_decrement_read_buckets(bufev_p, res);
//evbuffer的資料量大於低水位值。
if (evbuffer_get_length(input) >= bufev->wm_read.low)
_bufferevent_run_readcb(bufev);//呼叫使用者設定的回撥函式
goto done;
reschedule:
goto done;
error:
//把監聽可讀事件的event從event_base的事件佇列中刪除掉.event_del
bufferevent_disable(bufev, EV_READ);//會呼叫be_socket_disable函式
_bufferevent_run_eventcb(bufev, what);//會呼叫使用者設定的錯誤處理函式
done:
_bufferevent_decref_and_unlock(bufev);
}
細心的讀者可能會發現:對使用者的讀事件回撥函式的觸發是邊緣觸發的。這也就要求,在回撥函式中,使用者應該儘可能地把evbuffer的所有資料都讀出來。如果想等到下一次回撥時再讀,那麼需要等到下一次socketfd接收到資料才會觸發使用者的回撥函式。如果之後socket fd一直收不到任何資料,那麼即使evbuffer還有資料,使用者的回撥函式也不會被呼叫了。
處理寫事件:
對一個可讀事件進行監聽是比較容易的,但對於一個可寫事件進行監聽則比較困難。為什麼呢?因為可讀監聽是監聽fd的讀緩衝區是否有資料了,如果沒有資料那麼就一直等待。對於可寫,首先要明白“什麼是可寫”,可寫就是fd的寫緩衝區(這個緩衝區在核心)還沒滿,可以往裡面放資料。這就有一個問題,如果寫緩衝區沒有滿,那麼就一直是可寫狀態。如果一個event監聽了可寫事件,那麼這個event就會一直被觸發(死迴圈)。因為一般情況下,如果不是發大量的資料這個寫緩衝區是不會滿的。
也就是說,不能監聽可寫事件。但我們確實要往fd中寫資料,那怎麼辦?Libevent的做法是:當我們確實要寫入資料時,才監聽可寫事件。也就是說我們呼叫bufferevent_write寫入資料時,Libevent才會把監聽可寫事件的那個event註冊到event_base中。當Libevent把資料都寫入到fd的緩衝區後,Libevent又會把這個event從event_base中刪除。比較煩瑣。
bufferevent_writecb函式不僅僅要處理上面說到的那個問題,還要處理另外一個坑爹的問題。那就是:判斷socket fd是不是已經連線上伺服器了。這是因為這個socket fd是非阻塞的,所以它呼叫connect時,可能還沒連線上就返回了。對於非阻塞socket fd,一般是通過判斷這個socket是否可寫,從而得知這個socket是否已經連線上伺服器。如果可寫,那麼它就已經成功連線上伺服器了。這個問題,這裡先提一下,後面會詳細講。
同前面的監聽可讀一樣,Libevent是在bufferevent_socket_new函式設定可寫的回撥函式,為bufferevent_writecb。
//bufferevent_sock.c檔案
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
int res = 0;
short what = BEV_EVENT_WRITING;
int connected = 0;
ev_ssize_t atmost = -1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}
...//判斷這個socket是否已經連線上伺服器了
//使用者可能設定了限速,如果沒有限速,那麼atmost將返回16384(16K)
atmost = _bufferevent_get_write_max(bufev_p);
//一些原因導致寫被掛起來了
if (bufev_p->write_suspended)
goto done;
//如果evbuffer有資料可以寫到sockfd中
if (evbuffer_get_length(bufev->output)) {
//解凍連結串列頭
evbuffer_unfreeze(bufev->output, 1);
//將output這個evbuffer的資料寫到socket fd 的緩衝區中
//會把已經寫到socket fd緩衝區的資料,從evbuffer中刪除
res = evbuffer_write_atmost(bufev->output, fd, atmost);
evbuffer_freeze(bufev->output, 1);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))//可以恢復的錯誤。一般是EINTR或者EAGAIN
goto reschedule;
what |= BEV_EVENT_ERROR;
} else if (res == 0) {//該socket已經斷開連線了
what |= BEV_EVENT_EOF;
}
if (res <= 0)
goto error;
}
//如果把寫緩衝區的資料都寫完成了。為了防止event_base不斷地觸發可寫
//事件,此時要把這個監聽可寫的event刪除。
//前面的atmost限制了一次最大的可寫資料。如果還沒寫所有的資料
//那麼就不能delete這個event,而是要繼續監聽可寫事情,知道把所有的
//資料都寫到socket fd中。
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
//如果evbuffer裡面的資料量已經寫得七七八八了,小於設定的低水位值,那麼
//就會呼叫使用者設定的寫事件回撥函式
if ((res || !connected) &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
_bufferevent_run_writecb(bufev);
}
goto done;
reschedule:
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
goto done;
error:
bufferevent_disable(bufev, EV_WRITE);//有錯誤。把這個寫event刪除
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decref_and_unlock(bufev);
}
上面程式碼的邏輯比較清晰,呼叫evbuffer_write_atmost函式把資料從evbuffer中寫到evbuffer緩衝區中,此時要注意函式的返回值,因為可能寫的時候發生錯誤。如果發生了錯誤,就要呼叫使用者設定的event回撥函式(網上也有人稱其為錯誤處理函式)。
之後,還要判斷evbuffer的資料是否已經全部寫到socket 的緩衝區了。如果已經全部寫了,那麼就要把監聽寫事件的event從event_base的插入佇列中刪除。如果還沒寫完,那麼就不能刪除,因為還要繼續監聽可寫事件,下次接著寫。
現在來看一下,把監聽寫事件的event從event_base的插入佇列中刪除後,如果下次使用者有資料要寫的時候,怎麼把這個event新增到event_base的插入佇列。
使用者一般是通過bufferevent_write函式把資料寫入到evbuffer(寫入evbuffer後,接著就會被寫入socket,所以呼叫bufferevent_write就相當於把資料寫入到socket。)。而這個bufferevent_write函式是直接呼叫evbuffer_add函式的。函式evbuffer_add沒有呼叫什麼可疑的函式,能夠把監聽可寫的event新增到event_base中。唯一的可能就是那個回撥函式。對就是evbuffer的回撥函式。關於evbuffer的回撥函式,可以參考這裡。
//bufferevent.c檔案
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
if (evbuffer_add(bufev->output, data, size) == -1)
return (-1);
return 0;
}
//buffer.c檔案
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
...
out:
evbuffer_invoke_callbacks(buf);//呼叫回撥函式
result = 0;
done:
return result;
}
還記得本文前面的bufferevent_socket_new函式嗎?該函式裡面會有
evbuffer_add_cb(bufev->output,bufferevent_socket_outbuf_cb, bufev);
當bufferevent的寫緩衝區output的資料發生變化時,函式bufferevent_socket_outbuf_cb就會被呼叫。現在馬上飛到這個函式。
//bufferevent_sock.c檔案
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (cbinfo->n_added && //evbuffer添加了資料
(bufev->enabled & EV_WRITE) && //預設情況下是enable EV_WRITE的
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&//這個event已經被踢出event_base了
!bufev_p->write_suspended) {//這個bufferevent的寫並沒有被掛起
//把這個event新增到event_base中
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
/* Should we log this? */
}
}
}
這個函式首先進行一些判斷,滿足條件後就會把這個監聽寫事件的event新增到event_base中。其中event_pending函式就是判斷這個bufev->ev_write是否已經被event_base刪除了。關於event_pending,可以參考這裡。
對於bufferevent_write,初次使用該函式的讀者可能會有疑問:呼叫該函式後,引數data指向的記憶體空間能不能馬上釋放,還是要等到Libevent把data指向的資料都寫到socket 快取區才能刪除?其實,從前一篇博文可以看到,evbuffer_add是直接複製一份使用者要傳送的資料到evbuffer快取區的。所以,呼叫完bufferevent_write,就可以馬上釋放參數data指向的記憶體空間。
網上的關於Libevent的一些使用例子,包括我寫的《 Libevent使用例子,從簡單到複雜》,都是在主執行緒中呼叫bufferevent_write函式寫入資料的。從上面的分析可以得知,是可以馬上把監聽可寫事件的event新增到event_base中。如果是在次執行緒呼叫該函式寫入資料呢?此時,主執行緒可能還睡眠在poll、epoll這類的多路IO複用函式上。這種情況下能不能及時喚醒主執行緒呢?其實是可以的,只要你的Libevent在一開始使用了執行緒功能。具體的分析過程可以參考《evthread_notify_base通知主執行緒》。上面程式碼中的be_socket_add會呼叫event_add,而在次執行緒呼叫event_add就會呼叫evthread_notify_base通知主執行緒。
bufferevent_socket_connect:
使用者可以在呼叫bufferevent_socket_new函式時,傳一個-1作為socket的檔案描述符,然後呼叫bufferevent_socket_connect函式連線伺服器,無需自己寫程式碼呼叫connect函式連線伺服器。
bufferevent_socket_connect函式會呼叫socket函式申請一個套接字fd,然後把這個fd設定成非阻塞的(這就導致了一些坑爹的事情)。接著就connect伺服器,因為該socket fd是非阻塞的,所以不會等待,而是馬上返回,連線這工作交給核心來完成。所以,返回後這個socket還沒有真正連線上伺服器。那麼什麼時候連線上呢?核心又是怎麼通知通知使用者呢?
一般來說,當可以往socket fd寫東西了,那就說明已經連線上了。也就是說這個socket fd變成可寫狀態,就連線上了。
所以,對於“非阻塞connect”比較流行的做法是:用select或者poll這類多路IO複用函式監聽該socket的可寫事件。當這個socket觸發了可寫事件,然後再對這個socket呼叫getsockopt函式,做進一步的判斷。
Libevent也是這樣實現的,下面來看一下bufferevent_socket_connect函式。//bufferevent_sock.c檔案
int
bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *sa, int socklen)
{
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
evutil_socket_t fd;
int r = 0;
int result=-1;
int ownfd = 0;
_bufferevent_incref_and_lock(bev);
if (!bufev_p)
goto done;
fd = bufferevent_getfd(bev);
if (fd < 0) {//該bufferevent還沒有設定fd
if (!sa)
goto done;
fd = socket(sa->sa_family, SOCK_STREAM, 0);
if (fd < 0)
goto done;
if (evutil_make_socket_nonblocking(fd)<0)//設定為非阻塞
goto done;
ownfd = 1;
}
if (sa) {
r = evutil_socket_connect(&fd, sa, socklen);//非阻塞connect
if (r < 0)
goto freesock;
}
...
//為bufferevent裡面的兩個event設定監聽的fd
//後面會呼叫bufferevent_enable
bufferevent_setfd(bev, fd);
if (r == 0) {//暫時還沒連線上,因為fd是非阻塞的
//此時需要監聽可寫事件,當可寫了,並且沒有錯誤的話,就成功連線上了
if (! be_socket_enable(bev, EV_WRITE)) {
bufev_p->connecting = 1;//標誌這個sockfd正在連線
result = 0;
goto done;
}
} else if (r == 1) {//已經連線上了
/* The connect succeeded already. How very BSD of it. */
result = 0;
bufev_p->connecting = 1;
event_active(&bev->ev_write, EV_WRITE, 1);//手動啟用這個event
} else {// connection refused
/* The connect failed already. How very BSD of it. */
bufev_p->connection_refused = 1;
bufev_p->connecting = 1;
result = 0;
event_active(&bev->ev_write, EV_WRITE, 1);//手動啟用這個event
}
goto done;
freesock:
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);//出現錯誤
if (ownfd)
evutil_closesocket(fd);
done:
_bufferevent_decref_and_unlock(bev);
return result;
}
這個函式比較多錯誤處理的程式碼,大致看一下就行了。有幾個地方要注意,即使connect的時候被拒絕,或者已經連線上了,都會手動啟用這個event。一個event即使沒有加入event_base,也是可以手動啟用的。具體原理參考這裡。
無論是手動啟用event,或者監聽到這個event可寫了,都是會呼叫bufferevent_writecb函式。現在再次看一下該函式,只看connect部分。
//bufferevent_sock.c檔案
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
int connected = 0;
_bufferevent_incref_and_lock(bufev);
...
//正在連線。因為這個sockfd可能是非阻塞的,所以可能之前的connect還沒
//連線上。而判斷該sockfd是否成功連線上了的一個方法是判斷這個sockfd是否可寫
if (bufev_p->connecting) {
//c等於1,說明已經連線成功
//c等於0,說明還沒連線上
//c等於-1,說明發生錯誤
int c = evutil_socket_finished_connecting(fd);
if (bufev_p->connection_refused) {//在bufferevent_socket_connect中被設定
bufev_p->connection_refused = 0;
c = -1;
}
if (c == 0)//還沒連線上,繼續監聽可寫吧
goto done;
//錯誤,或者已經連線上了
bufev_p->connecting = 0;//修改標誌值
if (c < 0) {//錯誤
event_del(&bufev->ev_write);
event_del(&bufev->ev_read);
_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
goto done;
} else {//連線上了。
connected = 1;
...//win32
//居然會呼叫使用者設定的錯誤處理函式。太神奇了
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
if (!(bufev->enabled & EV_WRITE) || //預設都是enable EV_WRITE的
bufev_p->write_suspended) {
event_del(&bufev->ev_write);//不再需要監聽可寫。因為已經連線上了
goto done;
}
}
}
...
done:
_bufferevent_decref_and_unlock(bufev);
}
可以看到無論是connect被拒絕、發生錯誤或者連線上了,都在這裡做統一的處理。
如果已經連線上了,那麼會呼叫使用者設定event回撥函式(網上也稱之為錯誤處理函式),通知使用者已經連線上了。並且,還會把監聽可寫事件的event從event_base中刪除,其理由在前面已經說過了。
函式evutil_socket_finished_connecting會檢查這個socket,從而得知這個socket是處於什麼狀態。在bufferevent_socket_connect函式中,出現的一些錯誤,比如被拒絕,也是能通過這個函式檢查出來的。所以可以在這裡做統一的處理。該函式的內部是使用。貼一下這個函式的程式碼吧。
//evutil.c檔案
//Return 1 for connected, 0 for not yet, -1 for error.
int
evutil_socket_finished_connecting(evutil_socket_t fd)
{
int e;
ev_socklen_t elen = sizeof(e);
//用來檢測這個fd是否已經連線上了,這個fd是非阻塞的
//如果e的值被設為0,那麼就說明連線上了。
//否則e被設定為對應的錯誤值。
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&e, &elen) < 0)
return -1;
if (e) {
if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//還沒連線上
return 0;
EVUTIL_SET_SOCKET_ERROR(e);
return -1;
}
return 1;
}
好長啊!終於寫完了。
謝絕推酷、第七城市的轉載!!!