Redis原始碼解析:13Redis中的事件驅動機制
Redis中,處理網路IO時,採用的是事件驅動機制。但它沒有使用libevent或者libev這樣的庫,而是自己實現了一個非常簡單明瞭的事件驅動庫ae_event,主要程式碼僅僅400行左右。
沒有選擇libevent或libev的原因大概在於,這些庫為了迎合通用性造成程式碼龐大,而且其中的很多功能,比如監控子程序,複雜的定時器等,這些都不是Redis所需要的。
Redis中的事件驅動庫只關注網路IO,以及定時器。該事件庫處理下面兩類事件:
a:檔案事件(file event):用於處理Redis伺服器和客戶端之間的網路IO。
b:時間事件(time eveat):Redis伺服器中的一些操作(比如serverCron函式)需要在給定的時間點執行,而時間事件就是處理這類定時操作的。
事件驅動庫的程式碼主要是在src/ae.c中實現的。
一:檔案事件
Redis基於Reactor模式開發了自己的網路事件處理器,也就是檔案事件處理器。檔案事件處理器使用IO多路複用技術,同時監聽多個套接字,併為套接字關聯不同的事件處理函式。當套接字的可讀或者可寫事件觸發時,就會呼叫相應的事件處理函式。
Redis使用的IO多路複用技術主要有:select、epoll、evport和kqueue等。每個IO多路複用函式庫在Redis原始碼中都對應一個單獨的檔案,比如ae_select.c,ae_epoll.c, ae_kqueue.c等。
這些多路複用技術,根據不同的作業系統,Redis按照一定的優先順序,選擇其中的一種使用。在ae.c中,是這樣實現的:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
注意這裡是include的.c檔案,因此,使用哪種多路複用技術,是在編譯階段就決定了的。
檔案事件由結構體aeFileEvent表示,它的定義如下:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
其中mask表示描述符註冊的事件,可以是AE_READABLE,AE_WRITABLE或者是AE_READABLE|AE_WRITABLE。
rfileProc和wfileProc分別表示可讀和可寫事件的回撥函式。
clientData是使用者提供的資料,在呼叫回撥函式時被當做引數。注意,該資料是可讀和可寫事件共用的。
二:時間事件
Redis的時間事件主要有一次性事件和週期性事件兩種。一次性時間事件僅觸發一次,而週期性事件每隔一段時間就觸發一次。
時間事件由aeTimeEvent結構體表示,它的定義如下:
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
id用於標識時間事件,id號按照從小到大的順序遞增,新時間事件的id號比舊時間事件的id號要大;
when_sec和when_ms表示時間事件的下次觸發時間,實際上就是一個Unix時間戳,when_sec記錄它的秒數,when_ms記錄它的毫秒數。因此觸發時間是一個絕對值,而非相對值;
timeProc是時間事件處理器,也就是時間事件觸發時的回撥函式;
finalizerProc是刪除該時間事件時要呼叫的函式;
clientData是使用者提供的資料,在呼叫timeProc和finalizerProc時,作為引數;
所有的時間事件aeTimeEvent結構被組織成一個連結串列,next指標就執行連結串列中,當前aeTimeEvent結構的後繼結點。
aeTimeEvent結構連結串列是一個無序連結串列,也就是說它並不按照事件的觸發時間而排序。每當建立一個新的時間事件aeTimeEvent結構時,該結構就插入連結串列的頭部。因此,當監控時間事件時,需要遍歷整個連結串列,查詢所有已到達的時間事件,並呼叫相應的事件處理器。
在目前版本中,正常模式下的Redis伺服器只使用serverCron一個時間事件,而在benchmark模式下,伺服器也只使用兩個時間事件。因此,時間事件連結串列的這種設計雖然簡單粗暴,但是也能滿足效能需求。
三:事件迴圈結構
在事件驅動的實現中,需要有一個事件迴圈結構來監控排程所有的事件,比如Libevent庫中的event_base,libev中的ev_loop等。
在Redis中的事件驅動庫中,事件迴圈結構是由aeEventLoop結構體實現的,aeEventLoop結構是Redis中事件驅動機制的主要資料結構。它的定義如下:
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
events是aeFileEvent結構的陣列,每個aeFileEvent結構表示一個註冊的檔案事件。events陣列以描述符的值為下標。
fired是aeFiredEvent結構的陣列,aeFiredEvent結構表示一個觸發的檔案事件。結構中包含了描述符,以及其上已經觸發的事件。該陣列不是以描述符的值為下標,而是依次儲存所有觸發的檔案事件。當處理事件時,輪訓fired陣列中的每個元素,然後依次處理。
setsize表示eventLoop->events和eventLoop->fired陣列的大小。因此,setsize- 1就表示所能處理的最大的描述符的值。
lastTime:為了處理時間事件而記錄的Unix時間戳,主要為了在系統時間被調整時能夠儘快的處理時間事件;
timeEventHead:時間事件aeTimeEvent結構組成的連結串列的頭指標;
timeEventNextId:下個時間事件的ID,該ID依次遞增,因此當前時間事件的最大ID為timeEventNextId-1;
stop:是否停止事件監控;
maxfd:當前處理的最大的描述符的值,主要是在select中使用;
beforesleep:每次監控事件觸發之前,需要呼叫的函式;
apidata表示具體的底層多路複用所使用的資料結構,比如對於select來說,該結構中儲存了讀寫描述符陣列;對於epoll來說,該結構中儲存了epoll描述符,以及epoll_event結構陣列;
四:監控排程時間事件
監控排程時間事件是由函式processTimeEvents實現的,它的程式碼如下:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
首先判斷系統時間是否被調整了。將當前時間now,與上次記錄的時間戳eventLoop->lastTime相比較,如果now小於eventLoop->lastTime,說明系統時間被調整到過去了,比如由201603312030調整到了201603312000了,這種情況下,直接將所有事件的觸發時間的秒數清0,這意味著所有的時間事件都會立即觸發。之所以這麼做,是因為提前處理比延後處理的危險性要小;
然後更新eventLoop->lastTime為now;
接下來,先記錄當前的maxId。之所以這麼做,是因為有時間事件觸發後,要重新回到連結串列頭結點開始處理。而在時間事件的觸發回撥函式中,有可能註冊了新的時間事件,成為新的連結串列頭結點,這就可能導致會無限處理下去。為了防止這種情況發生,記錄當前的maxId,只處理當前的時間事件;
輪訓連結串列eventLoop->timeEventHead,針對其中的每一個事件節點te,如果te的id大於maxId,說明該事件,是在之前已經觸發的時間事件的回撥函式中註冊的,不處理這樣的事件,直接處理下一個;
然後得到當前時間,判斷當前時間是否已經超過了te的觸發時間,若是,說明該事件需要觸發,呼叫觸發回撥函式te->timeProc,該函式的返回值為retval;
如果retval是AE_NOMORE,說明觸發的時間事件是一次性事件,直接從連結串列中刪除;否則,說明該事件是週期性事件,將其觸發時間更改為當前時間加上retval;
事件觸發後,連結串列已經被修改了,要重新回到連結串列頭結點開始處理。因為Redis中只有一個時間事件,因此採用了這種簡單粗暴的演算法,更好的處理方式是處理完當前事件後,標記該節點需要刪除(比如在另一個連結串列中儲存該節點的指標),然後接著處理下一個節點,所有節點處理完之後,將標記為刪除的節點統一刪除即可。
最後返回觸發的事件總數。
五:監控排程所有事件
監控排程所有事件是由函式aeProcessEvents實現的,它的程式碼如下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
根據flags處理不同的事件:
如果flags為0,則該函式直接返回;
如果flags中設定了AE_ALL_EVENTS,則處理所有的檔案事件和時間事件;
如果flags中設定了AE_FILE_EVENTS,則處理所有的檔案事件;
如果flags中設定了AE_TIME_EVENTS,則處理所有的時間事件;
如果flags中設定了AE_DONT_WAIT,則呼叫多路複用函式時,不會阻塞等
待事件的觸發,將所有已觸發的事件處理完後立即返回。
目前在Redis中,呼叫aeProcessEvents時設定的flags只有AE_ALL_EVENTS和
AE_FILE_EVENTS|AE_DONT_WAIT兩種。
函式中,首先如果flags中既沒有設定AE_TIME_EVENTS,也沒有設定AE_FILE_EVENTS,則該函式直接返回0.
接下來,如果已經註冊過檔案事件,或者需要處理時間事件且不是AE_DONT_WAIT,則需要呼叫底層多路複用函式aeApiPoll。因此需要計算呼叫aeApiPoll函式時,最長阻塞時間tvp,該值是由最早要觸發的時間事件(如果有的話)決定的。
如果需要處理時間事件且不是AE_DONT_WAIT,這種情況下,不管有沒有檔案事件,都要阻塞一段時間,阻塞的時間根據shortest得到,shortest是通過呼叫aeSearchNearestTimer得到的最早要觸發的時間事件。得到shortest後,計算得出其觸發時間距離當前時間的差值,該差值就是阻塞時間tvp;
否則,如果註冊過檔案事件,並且flags中設定了AE_DONT_WAIT,則將tvp中的值設定為0,表示完全不阻塞;
如果註冊過檔案事件,但是flags中沒有設定AE_DONT_WAIT,則將tvp置為NULL,表示一直阻塞,直到有檔案事件觸發;
得到最長阻塞時間tvp之後,以tvp為引數呼叫aeApiPoll等待檔案事件的觸發。該函式由不同的底層多路複用函式實現,最終都返回觸發的檔案事件總數numevents,並將觸發的事件和描述符,依次記錄到eventLoop->fired中;
接下來,依次輪訓eventLoop->fired中的前numevents個元素,呼叫相應的事件回撥函式。注意,如果一個套接字又可讀又可寫的話,那麼伺服器將先處理可讀事件,然後在處理可寫事件。
觸發的檔案事件是依次處理的,如果某個檔案事件的處理時間過長,就會影響到下一個事件的處理。在事件驅動的實現中,要由使用者保證事件回撥函式能夠快速返回,而不阻塞。
注意,有這樣一種情況,比如描述符3和4都有事件觸發了,在3的事件回撥函式中,呼叫aeDeleteFileEvent將4的註冊事件刪除了。這樣在處理描述符4時,就不應該再次呼叫4的回撥函數了。所以,每次呼叫事件回撥函式之前,都判斷該描述符上的註冊事件是否還有效。而且如果可讀和可寫事件的回撥函式相同的話,只能呼叫一次該函式。
處理完檔案事件之後(或者沒有檔案事件,而僅僅阻塞了tvp的時間),如果flags中設定了AE_TIME_EVENTS,則呼叫processTimeEvents處理時間事件,因已經阻塞了tvp的時間,因此此時肯定有觸發的時間事件。最後,返回所有觸發的事件總數。
因為時間事件在檔案事件之後處理,並且事件之間不會出現搶佔,所以時間事件的實際處理時間,通常會比時間事件設定的到達時間稍晚一些。
再次強調一點:對檔案事件和時間事件的處理都是同步、有序、原子地執行的,伺服器不會中途中斷事件處理,也不會對事件進行搶佔。因此,不管是檔案事件的回撥函式,還是時間事件的回撥函式,都需要儘可地減少程式的阻塞時間,從而降低造成事件飢餓的可能性。比如,在命令回覆回撥函式中,將一個命令回覆寫入到客戶端套接字時,如果寫人位元組數超過了一個預設常量的話,命令回覆函式就會主動用break跳出寫人迴圈,將餘下的資料留到下次再寫。另外,時間事件也會將非常耗時的持久化操作放到子執行緒或者子程序執行。
六:事件迴圈監控
事件迴圈監控是由函式aeMain實現的,它的程式碼如下:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
只要eventLoop->stop不為1,則持續呼叫aeProcessEvents監控排程所有事件的觸發。正常情況下,在Redis伺服器中,eventLoop->stop永遠不可能為1。
在Redis伺服器的主函式中,所有初始化工作完成之後,就會呼叫該函式,監控所有事件的觸發。
七:例子:ECHO伺服器
下面是使用Redis的事件驅動庫,實現的一個簡單echo伺服器:
#define SERVER_PORT 9998
typedef struct
{
char clientaddr[INET_ADDRSTRLEN];
int port;
char buf[1024];
}Userbuf;
void setunblock(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL)) == -1)
{
perror("fcntl(F_GETFL) error");
return;
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl(F_SETFL) error");
return;
}
return;
}
void acceptfun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
{
int acceptfd = -1;
struct sockaddr_in cliaddr;
socklen_t addrlen = sizeof(cliaddr);
acceptfd = accept(fd, (struct sockaddr *)&cliaddr, &addrlen);
if (acceptfd < 0)
{
perror("accept error\n");
return;
}
Userbuf *usrbuf = calloc(1, sizeof(Userbuf));
printf("calloc %p\n", usrbuf);
inet_ntop(AF_INET, &cliaddr.sin_addr, usrbuf->clientaddr, INET_ADDRSTRLEN),
usrbuf->port = ntohs(cliaddr.sin_port);
printf("\naccept from <%s:%d>\n", usrbuf->clientaddr, usrbuf->port);
setunblock(acceptfd);
if (aeCreateFileEvent(eventLoop, acceptfd, AE_READABLE, readfun, usrbuf) != AE_OK)
{
perror("aeCreateFileEvent error");
close(acceptfd);
printf("free %p\n", usrbuf);
free(usrbuf);
return;
}
return;
}
void readfun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
{
char readbuf[1024] = {};
int len = -1;
Userbuf *usrbuf = (Userbuf *)clientData;
if ((len = read(fd, readbuf, 1024)) > 0)
{
printf("read from <%s:%d>: %s\n", usrbuf->clientaddr, usrbuf->port, readbuf);
memcpy(usrbuf->buf, readbuf, 1024);
if (aeCreateFileEvent(eventLoop, fd, AE_WRITABLE, writefun, clientData) != AE_OK)
{
printf("aeCreateFileEvent error\n");
goto END;
}
else
return;
}
else if (len == 0)
{
printf("close link from %s\n", usrbuf->buf);
goto END;
}
else
{
printf("read error from %s\n", usrbuf->buf);
goto END;
}
END:
close(fd);
aeDeleteFileEvent(eventLoop, fd, AE_READABLE);
aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE);
printf("free %p\n", clientData);
free(clientData);
return;
}
void writefun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
{
int len = 0;
char *buf = ((Userbuf *)clientData)->buf;
len = strlen(buf);
printf("write to client: %s\n", buf);
if(write(fd, buf, len) != len)
{
perror("write error");
close(fd);
aeDeleteFileEvent(eventLoop, fd, AE_READABLE);
aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE);
printf("free %p\n", clientData);
free(clientData);
}
aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE);
}
int main()
{
int listenfd;
aeEventLoop *eventloop = NULL;
struct sockaddr_in seraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
perror("socket error");
return -1;
}
seraddr.sin_family = AF_INET;
seraddr.sin_addr.s_addr = htonl(INADDR_ANY);
seraddr.sin_port = htons(SERVER_PORT);
if (bind(listenfd, (struct sockaddr *)&seraddr, sizeof(seraddr)) < 0)
{
perror("bind error");
close(listenfd);
return -1;
}
if (listen(listenfd, 5) < 0)
{
perror("listen error");
close(listenfd);
return -1;
}
eventloop = aeCreateEventLoop(1024);
if (eventloop == NULL)
{
printf("aeCreateEventLoop error\n");
close(listenfd);
return -1;
}
if (aeCreateFileEvent(eventloop, listenfd, AE_READABLE, acceptfun, NULL) != AE_OK)
{
perror("aeCreateFileEvent error");
close(listenfd);
aeDeleteEventLoop(eventloop);
return -1;
}
aeMain(eventloop);
return 0;
}
這裡要注意的是,對於同一個acceptfd,呼叫aeCreateFileEvent函式,分別註冊可讀事件和可寫事件時,其clientData是共享的。如果在註冊可寫事件時,修改了clientData,則可讀事件的clientData也相應改變,這是因為一個描述符只有一個aeFileEvent結構。
客戶端的程式碼根據Webbench改寫,具體程式碼見:
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/tests/hhunittest/test_ae_client.c
其他有關事件驅動的程式碼實現,可以參考:
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae.c
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae_epoll.c
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae_select.c