1. 程式人生 > >Redis2.2.2原始碼學習——aeEvent事件輪詢

Redis2.2.2原始碼學習——aeEvent事件輪詢

背景       

        Redis的事件主要分為檔案事件和定時器事件,作者對這兩種事件處理的高階之處在於預先計算最近一個要超時的定時器距離當前的事件間隔,在這個時間間隔內呼叫poll函式處理檔案事件,之後再處理定時器事件。

Redis在處理請求時完全是用單執行緒非同步模式,通過epoll監聽所有連線的讀寫事件,並通過相應的響應函式處理。Redis使用這樣的設計來滿足需求,很大程度上因為它服務的是高併發的短連線,且請求處理事件非常短暫。這個模型下,一旦有請求阻塞了服務,整個Redis的服務將受影響。Redis採用單執行緒模型可以避免系統上下文切換的開銷。” (引用:Redis原始碼分析.pdf  鄒雨晗)

事件輪詢流程


原始碼分析

Version Redis2.2.2


/* File event structure */
typedef struct aeFileEvent {
    int mask;				///* 檔案事件型別 讀/寫 one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;		
    aeFileProc *wfileProc;	
    void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id;	/* time event identifier. *///由aeEventLoop.timeEventNextId進行管理
    long when_sec;	/* seconds */
    long when_ms;	/* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd;		//已出現的事件的檔案號對應的事件描述在aeEventLoop.events[]中的下標
    int mask;	//檔案事件型別 AE_WRITABLE||AE_READABLE
} aeFiredEvent;

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;						//監聽的最大檔案號
    long long timeEventNextId;		//定時器事件的ID編號管理(分配ID號所用)
    aeFileEvent events[AE_SETSIZE]; //註冊的檔案事件,這些是需要程序關注的檔案
    aeFiredEvent fired[AE_SETSIZE]; //poll結果,待處理的檔案事件的檔案號和事件型別
    aeTimeEvent *timeEventHead;		//定時器時間連結串列
    int stop;						//時間輪詢是否結束?
    void *apidata; //檔案事件的輪詢資料和結果資料:poll; 三種輪詢方式:epoll(linux),select(windows),kqueue
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

【三種檔案事件輪詢對已的apidata】
//select()
typedef struct aeApiState {
    fd_set rfds, wfds;	//檔案輪詢的fdset
    /* We need to have a copy of the fd sets as it's not safe to reuse
     * FD sets after select(). */
    fd_set _rfds, _wfds;//作為select中的引數(臨時資料)
} aeApiState;
//kqueue()
typedef struct aeApiState {
    int kqfd;
    struct kevent events[AE_SETSIZE];
} aeApiState;
//epoll()
typedef struct aeApiState {
    int epfd;
    struct epoll_event events[AE_SETSIZE];
} aeApiState;

------------------------------------------------------------

【事件處理基本過程】

main
	1>initserver():
		訊號註冊
		伺服器引數初始化:server(全域性變數)
			。。。
			aeCreateEventLoop()
				構建aeEventLoop物件eventLoop
				初始化eventloop引數
					aeApiCreate(eventLoop)
						初始化eventLoop->apidata : 三種檔案事件輪詢對應不同的資料結構
			aeCreateTimeEvent()
				構建aeTimeEvent物件te
				aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
					設定te的超時時間 = currenttime + milliseonds(延時 1ms 是不是有點太小了?!)
				將te插入到eventLoop->timeEventHead連結串列中
			。。。
	。。。
	2>aeSetBeforeSleepProc(server.el,beforeSleep);

    3>aeMain(server.el) //以死迴圈的方式進入輪詢,知道stop被置為非0
		while (!eventLoop->stop) {
            eventLoop->beforesleep(eventLoop);			//前期處理,回撥函式
			aeProcessEvents(eventLoop, AE_ALL_EVENTS);	//事件處理(見下面)
		}
	
    4>aeDeleteEventLoop(server.el);

	return
endOfMain

---------------------------------------------------------------------

【事件處理過程:aeProcessEvents】

Redis的事件主要分為檔案事件和定時器事件,作者對這兩種事件處理的高階之處在於預先計算最近一個要超時的定時器距離當前的事件間隔,
在這個時間間隔內呼叫poll函式處理檔案事件,之後再處理定時器事件。

aeProcessEvents
	1>shortest = aeSearchNearestTimer(eventLoop)
		遍歷定時器連結串列eventLoop->timeEventHead找到最近一個要超時的定時器
		返回這個定時器時間
	2>計算當前時間與shortest時間的差值:tvp,如果tvp小於0,那麼設定為0
	//計算出tvp,就相當於知道下一個定時器的超時時間間隔,在這個時間間隔內,我們可以用來檢測其他的事件,比如檔案事件
	3>numevents = aeApiPoll(eventLoop, tvp);	//檔案事件的輪詢,時間間隔為最近一個定時器的超時時間差tvp
		aeApiPoll有三種:select/epoll/kqueue
		以select為例:
			1.以eventLoop->apidata(記錄了要poll的檔案符)作為引數,呼叫select函式:
				retval = select(eventLoop->maxfd+1,&state->_rfds,&state->_wfds,NULL,tvp);//注意其中的tvp是select的最長阻塞時間。。。
			2.根據select的呼叫結果,甄選檔案事件:
				//原則為:檔案事件註冊了讀/寫,並且相應檔案的輪詢結果fdset的讀/寫被置位,那麼將該檔案號新增到eventLoop->fired陣列中,並設定相應的mask掩碼
				for (j = 0; j <= eventLoop->maxfd; j++) {
					int mask = 0;
					aeFileEvent *fe = &eventLoop->events[j];

					if (fe->mask == AE_NONE) continue;
					if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
						mask |= AE_READABLE;
					if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
						mask |= AE_WRITABLE;
					eventLoop->fired[numevents].fd = j;
					eventLoop->fired[numevents].mask = mask;
					numevents++;
				}
	//通過aeApiPoll(eventLoop, tvp),待處理的檔案事件的檔案號都儲存在了eventLoop->fired[numevents].fd中
	4>依次取出eventLoop->fired[numevents]中的fd和mask進行相應的處理:
		for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
			int rfired = 0;

			if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
				//限制同時對同一個檔案的讀寫:未度過或讀寫的是不同的檔案
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
		}
	5>最後,在執行計時器事件
		processTimeEvents(eventLoop)
			遍歷計時器連結串列,計算計時器事件和當前時間的差值,如果已超時(<=0)那麼:
			1.呼叫該計時器的hander
				retval = te->timeProc(eventLoop, id, te->clientData);
			2.判定retval // 有的定時器是重複使用的,有的是一次性的,關鍵是hander的返回值:
				if (retval != AE_NOMORE) {
					aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);//更新定時器時間(重用)
				} else {
					aeDeleteTimeEvent(eventLoop, id);	//刪除定時器
				}
	return
endOf_aeProcessEvents
		
    

				

後記

今天主要看了和事件輪詢相關的程式碼,感覺作者的想法挺好的,就是對於定時器事件來說,會有較大的事件誤差。p.s. 終於找到基於Linux的C原始碼,啃一啃可以加深對linux系統呼叫的認識,同時學習Redis的K-V儲存,兩全其美,繼續加油!