Libevent原始碼分析-----超時event的處理
如何成為超時event:
Libevent允許建立一個超時event,使用evtimer_new巨集。
//event.h檔案
#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))
從巨集的實現來看,它一樣是用到了一般的event_new,並且不使用任何的檔案描述符。從超時event巨集的實現來看,無論是evtimer建立的event還是一般event_new建立的event,都能使得Libevent進行超時監聽。其實,使得Libevent對一個event進行超時監聽的原因是:在呼叫event_add的時候,第二引數不能為NULL,要設定一個超時值。如果為NULL,那麼Libevent將不會為這個event監聽超時。下文統一稱設定了超時值的event為超時event。
超時event的原理:
Libevent對超時進行監聽的原理不同於之前講到的對訊號的監聽,Libevent對超時的監聽的原理是,多路IO複用函式都是有一個超時值。如果使用者需要Libevent同時監聽多個超時event,那麼Libevent就把超時值最小的那個作為多路IO複用函式的超時值。自然,當時間一到,就會從多路IO複用函式返回。此時對超時event進行處理即可。
Libevent執行使用者同時監聽多個超時event,那麼就必須要對這個超時值進行管理。Libevent提供了小根堆和通用超時(common timeout)這兩種管理方式。下文為了敘述方便,就假定使用的是小根堆。
工作流程:
下面來看一下超時event的工作流程。
設定超時值:
首先呼叫event_add時要設定一個超時值,這樣才能成為一個超時event。
//event.c檔案 //在event_add中,會把第三個引數設為0.使得使用的是相對時間 static inline int event_add_internal(struct event *ev, const struct timeval *tv, int tv_is_absolute) { struct event_base *base = ev->ev_base; int res = 0; int notify = 0; //tv不為NULL,就說明是一個超時event,在小根堆中為其留一個位置 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) { if (min_heap_reserve(&base->timeheap, 1 + min_heap_size(&base->timeheap)) == -1) return (-1); /* ENOMEM == errno */ } ...//將IO或者訊號event插入到對應的佇列中。 if (res != -1 && tv != NULL) { struct timeval now; //使用者把這個event設定成EV_PERSIST。即永久event. //如果沒有這樣設定的話,那麼只會超時一次。設定了,那麼就 //可以超時多次。那麼就要記錄使用者設定的超時值。 if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) ev->ev_io_timeout = *tv; //該event之前被加入到超時佇列。使用者可以對同一個event呼叫多次event_add //並且可以每次都用不同的超時值。 if (ev->ev_flags & EVLIST_TIMEOUT) { /* XXX I believe this is needless. */ //之前為該event設定的超時值是所有超時中最小的。 //從下面的刪除可知,會刪除這個最小的超時值。此時多路IO複用函式 //的超時值引數就已經改變了。 if (min_heap_elt_is_top(ev)) notify = 1; //要通知主執行緒。可能是次執行緒為這個event呼叫本函式 //從超時佇列中刪除這個event。因為下次會再次加入。 //多次對同一個超時event呼叫event_add,那麼只能保留最後的那個。 event_queue_remove(base, ev, EVLIST_TIMEOUT); } //因為可以在次執行緒呼叫event_add。而主執行緒剛好在執行event_base_dispatch if ((ev->ev_flags & EVLIST_ACTIVE) && (ev->ev_res & EV_TIMEOUT)) {//該event被啟用的原因是超時 ... event_queue_remove(base, ev, EVLIST_ACTIVE); } //獲取現在的時間 gettime(base, &now); //雖然使用者在event_add時只需用一個相對時間,但實際上在Libevent內部 //還是要把這個時間轉換成絕對時間。從儲存的角度來說,存絕對時間只需 //一個變數。而相對時間則需兩個,一個存相對值,另一個存參照物。 if (tv_is_absolute) { //該引數指明時間是否為一個絕對時間 ev->ev_timeout = *tv; } else { //參照時間 + 相對時間 ev_timeout存的是絕對時間 evutil_timeradd(&now, tv, &ev->ev_timeout); } //將該超時event插入到超時佇列中 event_queue_insert(base, ev, EVLIST_TIMEOUT); //本次插入的超時值,是所有超時中最小的。那麼此時就需要通知主執行緒。 if (min_heap_elt_is_top(ev)) notify = 1; } //如果程式碼邏輯中是需要通知的。並且本執行緒不是主執行緒。那麼就通知主執行緒 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); return (res); }
對於同一個event,如果是IO event或者訊號event,那麼將無法多次新增。但如果是一個超時event,那麼是可以多次新增的。並且對應超時值會使用最後新增時指明的那個,之前的統統不要,即替換掉之前的超時值。
程式碼中出現了多次使用了notify變數。這主要是用在:次執行緒在執行這個函式,而主執行緒在執行event_base_dispatch。前面說到Libevent能對超時event進行監聽的原理是:多路IO複用函式有一個超時引數。在次執行緒新增的event的超時值更小,又或者替換了之前最小的超時值。在這種情況下,都是要通知主執行緒,告訴主執行緒,最小超時值已經變了。關於通知主執行緒evthread_notify_base,可以參考博文《evthread_notify_base通知主執行緒》。
程式碼中的第三個判斷體中用到了ev->ev_io_timeout。但event結構體中並沒有該變數。其實,ev_io_timeout是一個巨集定義。
//event-internal.h檔案
#define ev_io_timeout _ev.ev_io.ev_timeout
要注意的一點是,在呼叫event_add時設定的超時值是一個時間段(可以認為隔多長時間就觸發一次),相對於現在,即呼叫event_add的時間,而不是呼叫event_base_dispatch的時間。
呼叫多路IO複用函式等待超時:
現在來看一下event_base_loop函式,看其是怎麼處理超時event的。
//event.c檔案
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->running_loop = 1;
done = 0;
while (!done) {
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
// 根據Timer事件計算evsel->dispatch的最大等待時間(超時值最小)
timeout_next(base, &tv_p);
} else { //不進行等待
//把等待時間置為0,即可不進行等待,馬上觸發事件
evutil_timerclear(&tv);
}
res = evsel->dispatch(base, tv_p);
//處理超時事件,將超時事件插入到啟用連結串列中
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
}
}
done:
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
//選出超時值最小的那個
static int
timeout_next(struct event_base *base, struct timeval **tv_p)
{
/* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
// 堆的首元素具有最小的超時值,這個是小根堆的性質。
ev = min_heap_top(&base->timeheap);
//堆中沒有元素
if (ev == NULL) {
*tv_p = NULL;
goto out;
}
//獲取當然時間
if (gettime(base, &now) == -1) {
res = -1;
goto out;
}
// 如果超時時間<=當前時間,不能等待,需要立即返回
// 因為ev_timeout這個時間是由event_add呼叫時的絕對時間 + 相對時間。所以ev_timeout是
// 絕對時間。可能在呼叫event_add之後,過了一段時間才呼叫event_base_diapatch,所以
// 現在可能都過了使用者設定的超時時間。
if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
evutil_timerclear(tv); //清零,這樣可以讓dispatcht不會等待,馬上返回
goto out;
}
// 計算等待的時間=當前時間-最小的超時時間
evutil_timersub(&ev->ev_timeout, &now, tv);
out:
return (res);
}
上面程式碼的流程是:計算出本次呼叫多路IO複用函式的等待時間,然後呼叫多路IO複用函式中等待超時。
啟用超了時的event:
上面程式碼中的timeout_process函式就是處理超了時的event。
//event.c檔案
//把超時了的event,放到啟用佇列中。並且,其啟用原因設定為EV_TIMEOUT
static void
timeout_process(struct event_base *base)
{
/* Caller must hold lock. */
struct timeval now;
struct event *ev;
if (min_heap_empty(&base->timeheap)) {
return;
}
gettime(base, &now);
//遍歷小根堆的元素。之所以不是隻取堆頂那一個元素,是因為當主執行緒呼叫多路IO複用函式
//進入等待時,次執行緒可能添加了多個超時值更小的event
while ((ev = min_heap_top(&base->timeheap))) {
//ev->ev_timeout存的是絕對時間
//超時時間比此刻時間大,說明該event還沒超時。那麼餘下的小根堆元素更不用檢查了。
if (evutil_timercmp(&ev->ev_timeout, &now, >))
break;
//下面說到的del是等同於呼叫event_del.把event從這個event_base中(所有的佇列都)
//刪除。event_base不再監聽之。
//這裡是timeout_process函式。所以對於有超時的event,才會被del掉。
//對於有EV_PERSIST選項的event,在處理啟用event的時候,會再次新增進event_base的。
//這樣做的一個好處就是,再次新增的時候,又可以重新計算該event的超時時間(絕對時間)。
event_del_internal(ev);
//把這個event加入到event_base的啟用佇列中。
//event_base的啟用佇列又有該event了。所以如果該event是EV_PERSIST的,是可以
//再次新增進該event_base的
event_active_nolock(ev, EV_TIMEOUT, 1);
}
}
當從多路IO複用函式返回時,就檢查時間小根堆,看有多少個event已經超時了。如果超時了,那就把這個event加入到event_base的啟用佇列中。並且把這個超時del(刪除)掉,這主要是用於非PERSIST 超時event的。刪除一個event的具體操作可以檢視這裡。
把一個event新增進啟用佇列後的工作流程可以參考《Libevent工作流程探究》一文。
處理永久超時event:
現在來看一下如果該超時event有EV_PERSIST選項,在後面是怎麼再次新增進event_base,因為前面的程式碼註釋中已經說了,在選出超時event時,會把超時的event從event_base中delete掉。
//event.c檔案
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd,
short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
...
if (events & EV_PERSIST) {
ev->ev_closure = EV_CLOSURE_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_NONE;
}
return 0;
}
static int
event_process_active_single_queue(struct event_base *base,
struct event_list *activeq)
{
struct event *ev;
//遍歷同一優先順序的所有event
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
//下面這個if else 是用於IO event的。這裡貼出,是為了瞭解一些非超時event是
//怎麼處理永久事件(EV_PERSIST)的。
//如果是永久事件,那麼只需從active佇列中刪除。
if (ev->ev_events & EV_PERSIST)
event_queue_remove(base, ev, EVLIST_ACTIVE);
else //不是的話,那麼就要把這個event刪除掉。
event_del_internal(ev);
switch (ev->ev_closure) {
//這個case只對超時event的EV_PERSIST才有用。IO的沒有用
case EV_CLOSURE_PERSIST:
event_persist_closure(base, ev);
break;
default: //預設是EV_CLOSURE_NONE
case EV_CLOSURE_NONE:
//沒有設定EV_PERSIST的超時event,就只有一次的監聽機會
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
}
}
}
static inline void
event_persist_closure(struct event_base *base, struct event *ev)
{
//在event_add_internal函式中,如果是超時event並且有EV_PERSIST,那麼就會把ev_io_timeout設定成
//使用者設定的超時時間(相對時間)。否則為0。即不進入判斷體中。
//說明這個if只用於處理具有EV_PERSIST屬性的超時event
if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) {
struct timeval run_at, relative_to, delay, now;
ev_uint32_t usec_mask = 0;
gettime(base, &now);
//delay是使用者設定的超時時間。event_add的第二個引數
delay = ev->ev_io_timeout;
//是因為超時才執行到這裡,event可以同時監聽多種事件。如果是由於可讀而執行
//到這裡,那麼就說明還沒超時。
if (ev->ev_res & EV_TIMEOUT) { //如果是因為超時而啟用,那麼下次超時就是本次超時的
relative_to = ev->ev_timeout; // 加上 delay 時間。
} else {
relative_to = now; //重新計算超時值
}
evutil_timeradd(&relative_to, &delay, &run_at);
//無論relative是哪個時間,run_at都不應該小於now。
//如果小於,則說明是使用者手動修改了系統時間,使得gettime()函式獲取了一個
//之前的時間。比如現在是9點,使用者手動調回到7點。
if (evutil_timercmp(&run_at, &now, <)) {
//那麼就以新的系統時間為準
evutil_timeradd(&now, &delay, &run_at);
}
//把這個event再次新增到event_base中。注意,此時第三個引數為1,說明是一個絕對時間
event_add_internal(ev, &run_at, 1);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);//執行回撥函式
}
這段程式碼的處理流程是:如果使用者指定了EV_PERSIST,那麼在event_assign中就記錄下來。在event_process_active_single_queue函式中會針對永久event進行呼叫event_persist_closure函式對之進行處理。在event_persist_closure函式中,如果是一般的永久event,那麼就直接呼叫該event的回撥函式。如果是超時永久event,那麼就需要再次計算新的超時時間,並將這個event再次插入到event_base中。
這段程式碼也指明瞭,如果一個event因可讀而被啟用,那麼其超時時間就要重新計算。而不是之前的那個了。也就是說,如果一個event設定了3秒的超時,但1秒後就可讀了,那麼下一個超時值,就要重新計算設定,而不是2秒後。
從前面的原始碼分析也可以得到:如果一個event監聽可讀的同時也設定了超時值,並且一直沒有資料可讀,最後超時了,那麼這個event將會被刪除掉,不會再等。