1. 程式人生 > >[libevent]事件主迴圈

[libevent]事件主迴圈

libevent事件處理的中心部分——事件主迴圈,根據系統提供的事件多路分發機制執行事件迴圈,對已註冊的就緒事件,呼叫註冊事件的回撥函式來處理事件。

事件處理主迴圈

libevent的事件主迴圈主要是通過event_base_loop ()函式完成的,其主要操作如下面的流程圖所示,event_base_loop所作的就是持續執行下面的迴圈。

 

 上圖的簡單描述就是:

  1. 校正系統當前時間。
  2. 將當前時間與存放時間的最小堆中的時間依次進行比較,將所有時間小於當前時間的定時器事件從堆中取出來加入到活動事件佇列中。
  3. 呼叫I/O封裝(比如:Epoll)的事件分發函式dispatch函式,以當前時間與時間堆中的最小值之間的差值(最小堆取最小值複雜度為
    O(1))作為Epoll/epoll_wait(Epoll.c/dispatch/407)timeout值,在其中將觸發的I/O和訊號事件加入到活動事件佇列中。
  4. 呼叫函式event_process_active(Event.c/1406)遍歷活動事件佇列,依次呼叫註冊的回撥函式處理相應事件。
int event_base_loop(struct event_base *base, int flags)
{
	const struct eventop *evsel = base->evsel;
	void *evbase = base->evbase;
	struct timeval tv;
	struct timeval *tv_p;
	int res, done;
	// 清空時間快取
	base->tv_cache.tv_sec = 0;
	// evsignal_base是全域性變數,在處理signal時,用於指名signal所屬的event_base例項
	if (base->sig.ev_signal_added)
		evsignal_base = base;
	done = 0;
	while (!done) { // 事件主迴圈
		// 檢視是否需要跳出迴圈,程式可以呼叫event_loopexit_cb()設定event_gotterm標記
		// 呼叫event_base_loopbreak()設定event_break標記
		if (base->event_gotterm) {
			base->event_gotterm = 0;
			break;
		}
		if (base->event_break) {
			base->event_break = 0;
			break;
		}
		// 校正系統時間,如果系統使用的是非MONOTONIC時間,使用者可能會向後調整了系統時間
		// 在timeout_correct函式裡,比較last wait time和當前時間,如果當前時間< last wait time
		// 表明時間有問題,這是需要更新timer_heap中所有定時事件的超時時間。
		timeout_correct(base, &tv);
		// 根據timer heap中事件的最小超時時間,計算系統I/O demultiplexer的最大等待時間
		tv_p = &tv;
		if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
			timeout_next(base, &tv_p);
		} else {
			// 依然有未處理的就緒時間,就讓I/O demultiplexer立即返回,不必等待
				// 下面會提到,在libevent中,低優先順序的就緒事件可能不能立即被處理
				evutil_timerclear(&tv);
		}
		// 如果當前沒有註冊事件,就退出
		if (!event_haveevents(base)) {
			event_debug(("%s: no events registered.", __func__));
			return (1);
		}
		// 更新last wait time,並清空time cache
		gettime(base, &base->event_tv);
		base->tv_cache.tv_sec = 0;
		// 呼叫系統I/O demultiplexer等待就緒I/O events,可能是epoll_wait,或者select等;
		// 在evsel->dispatch()中,會把就緒signal event、I/O event插入到啟用連結串列中
		res = evsel->dispatch(base, evbase, tv_p);
		if (res == -1)
			return (-1);
		// 將time cache賦值為當前系統時間
		gettime(base, &base->tv_cache);
		// 檢查heap中的timer events,將就緒的timer event從heap上刪除,並插入到啟用連結串列中
		timeout_process(base);
		// 呼叫event_process_active()處理啟用連結串列中的就緒event,呼叫其回撥函式執行事件處理
		// 該函式會尋找最高優先順序(priority值越小優先順序越高)的啟用事件連結串列,
		// 然後處理連結串列中的所有就緒事件;
		// 因此低優先順序的就緒事件可能得不到及時處理;
		if (base->event_count_active) {
			event_process_active(base);
			if (!base->event_count_active && (flags & EVLOOP_ONCE))
				done = 1;
		} else if (flags & EVLOOP_NONBLOCK)
			done = 1;
	}
	// 迴圈結束,清空時間快取
	base->tv_cache.tv_sec = 0;
	event_debug(("%s: asked to terminate loop.", __func__));
	return (0);
}

I/OTimer事件的統一

libeventTimerSignal事件都統一到了系統的I/O demultiplex機制中了,相信讀者從上面的流程和程式碼中也能窺出一斑了,下面就再囉嗦一次了。首先將Timer事件融合到系統I/O多路複用機制中,還是相當清晰的,因為系統的I/O機制像select()epoll_wait()都允許程式制定一個最大等待時間(也稱為最大超時時間)timeout,即使沒有I/O事件發生,它們也保證能在timeout時間內返回那麼根據所有Timer事件的最小超時時間來設定系統I/Otimeout時間;當系統I/O返回時,再啟用所有就緒的Timer事件就可以了,這樣就能將

Timer事件完美的融合到系統的I/O機制中了。

這是ReactorProactor模式(主動器模式,比如Windows上的IOCP)中處理Timer事件的經典方法了ACE採用的也是這種方法.堆是一種經典的資料結構,向堆中插入、刪除元素時間複雜度都是O(lgN)N為堆中元素的個數,而獲取最小key值(小根堆)的複雜度為O(1);因此變成了管理Timer事件的絕佳人選(當然是非唯一的),libevent就是採用的堆結構。

I/OSignal事件的統一

Signal是非同步事件的經典事例,將Signal事件統一到系統的I/O多路複用中就不像Timer事件那麼自然了,Signal事件的出現對於程序來講是完全隨機的,程序不能只是測試一個變數來判別是否發生了一個訊號,而是必須告訴核心“在此訊號發生時,請執行如下的操作”。如果當Signal發生時,並不立即呼叫eventcallback函式處理訊號,而是設法通知系統的I/O機制,讓其返回,然後再統一和I/O事件以及Timer一起處理,不就可以了嘛。是的,這也是libevent中使用的方法。 

//libevent中Signal事件的管理是通過結構體evsignal_info完成的,結構體位於evsignal.h檔案中,定義如下:
struct evsignal_info {  
	struct event ev_signal;  //為socket pair的讀socket向event_base註冊讀事件時使用的event結構體
	int ev_signal_pair[2];  //socket pair對
	int ev_signal_added;  //記錄ev_signal事件是否已經註冊了
	volatile sig_atomic_t evsignal_caught;  //是否有訊號發生的標記;是volatile型別,因為它會在另外的執行緒中被修改;
	struct event_list evsigevents[NSIG];  //陣列,evsigevents[signo]表示註冊到訊號signo的事件連結串列;
	sig_atomic_t evsigcaught[NSIG];  //具體記錄每個訊號觸發的次數,evsigcaught[signo]是記錄訊號signo被觸發的次數;
#ifdef HAVE_SIGACTION  
	struct sigaction **sh_old;  
#else  
	ev_sighandler_t **sh_old;  //記錄了原來的signal處理函式指標,當訊號signo註冊的event被清空時,需要重新設定其處理函式;
#endif  
	int sh_old_max;  
};  
//的初始化包括,建立socket pair,設定ev_signal事件(但並沒有註冊,而是等到有訊號註冊時才檢查並註冊),並將所有標記置零,初始化訊號的註冊事件連結串列指標等。

執行迴圈

一旦有了一個已經註冊了某些事件的event_base,就需要讓libevent等待事件並且通知事件的發生。

#define EVLOOP_ONCE             0x01
#define EVLOOP_NONBLOCK         0x02
#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
int event_base_loop(struct event_base *base, int flags);  

預設情況下,event_base_loop()函式執行直到event_base中沒有已經註冊的事件為止。執行迴圈的時候,函式重複地檢查是否有任何已經註冊的事件被觸發(比如說,讀事件的檔案描述符已經就緒,可以讀取了;或者超時事件的超時時間即將到達)。如果有事件被觸發,函式標記被觸發的事件為啟用的,並且執行這些事件。 

flags引數中設定一個或者多個標誌就可以改變event_base_loop()的行為。

  • 如果設定了EVLOOP_ONCE迴圈將等待某些事件成為啟用的,執行啟用的事件直到沒有更多的事件可以執行,然會返回。
  • 如果設定了EVLOOP_NONBLOCK迴圈不會等待事件被觸發:迴圈將僅僅檢測是否有事件已經就緒,可以立即觸發,如果有,則執行事件的回撥。
  • 完成工作後,如果正常退出,event_base_loop()返回0;如果因為後端中的某些未處理錯誤而退出,則返回-1

為方便起見,也可以呼叫

int event_base_dispatch(struct event_base *base);  

event_base_dispatch()等同於沒有設定標誌的event_base_loop()。所以,event_base_dispatch()將一直執行,直到沒有已經註冊的事件了,或者呼叫了event_base_loopbreak()或者event_base_loopexit()為止。

停止迴圈

如果想在移除所有已註冊的事件之前停止活動的事件迴圈,可以呼叫兩個稍有不同的函式。

int event_base_loopexit(struct event_base *base,const struct timeval *tv);
int event_base_loopbreak(struct event_base *base);  

event_base_loopexit()讓event_base在給定時間之後停止迴圈。如果tv引數為NULLevent_base會立即停止迴圈,沒有延時。如果event_base當前正在執行任何啟用事件的回撥,則回撥會繼續執行,直到執行完所有啟用事件的回撥之才退出event_base_loopbreak()讓event_base立即退出迴圈。它與event_base_loopexitbase,NULL)的不同在於,如果event_base當前正在執行啟用事件的回撥,它將在執行完當前正在處理的事件後立即退出。

有時候需要知道對event_base_dispatch()或者event_base_loop()的呼叫是正常退出的,還是因為呼叫event_base_loopexit()或者event_base_break()而退出的。可以呼叫下述函式來確定是否呼叫了loopexit或者break函式。

int event_base_got_exit(struct event_base *base);
int event_base_got_break(struct event_base *base);  

這兩個函式分別會在迴圈是因為呼叫event_base_loopexit()或者event_base_break()而退出的時候返回true,否則返回false。下次啟動事件迴圈的時候,這些值會被重設。