redis原始碼分析—AE事件處理機制
阿新 • • 發佈:2018-12-19
redis中有兩類事件,一類是IO事件統一封裝成aeFileEvent,底層呼叫系統支援的多路複用層(evport、epoll、kqueue、select),一類是定時器事件,以aeTimeEvent描述,巧妙借用系統函式epoll_wait等阻塞函式設定阻塞時間以達到觸發條件
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ // 根據巨集定義包含系統支援的多路複用模型,從上到下按效能高低排序 #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
AE迴圈的主要函式:
建立AE迴圈
主要是結構體成員申請記憶體和設定初始值
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
主迴圈
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //迴圈監聽事件 // 阻塞之前的處理 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 事件處理,第二個引數決定處理哪類事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
事件處理函式
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
// 如果不處理定時器事件,也不處理檔案事件,就直接返回
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// 如果有監控檔案事件,或者要處理定時器事件並且沒有設定不阻塞標誌
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 如果處理定時器事件且沒有設定不阻塞標誌,取最快到達的定時器時間
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
// 計算剩餘時間
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
// 如果設定了不阻塞標誌,阻塞時間為0,表示不阻塞
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else { //否則永久阻塞
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
// 等待事件觸發或者超時
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
// 阻塞後處理函式
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 迴圈處理觸發的檔案事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
// 如果設定了AE_BARRIER標誌,我們優先處理寫事件
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
// 沒有設定AE_BARRIER標誌,優先處理讀事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
// 如果需要反轉讀寫處理順序,處理完寫事件後,可以處理讀事件
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
// 處理定時器事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
處理定時器事件函式
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
// 如果上次處理定時器事件的時間比當前時間還要大,說明時間有偏差,強制將所有的定時器儘快處理掉(將所有定時器的間隔秒數設定為0)
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最近處理定時器事件的時間
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1; // 獲取最大的定時器ID
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
// 刪除已經標記失效的定時器
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
// 保證不處理定時器處理過程中建立的定時器事件,暫時沒用,因為新增定時器事件總是在頭部新增
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 回撥定時器事件處理函式
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) { // 如果定時器還需要重複使用
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); // 設定新的觸發時間
} else { // 否則,標記定時器失效
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
建立File事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 檔案描述符不能大於setsize,否則eventLoop->events會溢位
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 這裡以描述作為下標記錄事件,可以避免迴圈查詢
aeFileEvent *fe = &eventLoop->events[fd];
// 呼叫底層封裝的多路複用介面新增事件監控
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 更新描述符監控的事件資訊和處理函式以及客戶端資訊
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
// 記錄當前最大的檔案描述符
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
還有一些其他的介面函式比較簡單,就沒必要一一分析了
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) // 刪除FileEvent
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) // 新增定時器事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) //刪除定時器事件
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) //獲取最接近的定時器事件