Redis網路庫原始碼分析(3)之ae.c
一、aeCreateEventLoop & aeCreateFileEvent
上一篇文章中,我們已經將伺服器啟動,只是其中有些細節我們跳過了,比如aeCreateEventLoop
函式到底做了什麼? 接下來我們要分析ae.c
檔案,它是整個Redis
網路事件框架,其中定義了各個管理事件的函式,比如aeCreateFileEvent
,aeDeleteFileEvent
分別是註冊新的事件和刪除事件。
其實aeCreateEventLoop
的作用主要是給server->loop
申請空間。
//ae.c
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) { //給eventLoop申請空間
goto err;
}
eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize); //給events連結串列申請空間
eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize); //給fired連結串列申請空間
if (eventLoop->events == NULL || eventLoop->fired == NULL ) {
goto err;
}
eventLoop->setsize = setsize; //設定大小
eventLoop->lastTime = time(NULL); //設定lastTime=now
eventLoop->timeEventHead = NULL; //定時事件連結串列置空
eventLoop->timeEventNextId = 0; //定時事件的id為0
eventLoop->stop = 0; //stop為0
eventLoop->maxfd = -1; //最大檔案描述符為0
eventLoop->beforesleep = NULL; //beforesleep設定為NULL
if (aeApiCreate(eventLoop) == -1) { //給EPOLL申請空間
goto err;
}
/* Events with mask == AE_NONE are not set. So let's initialize the vector with it. */
for (int i = 0; i < setsize; i++) {
eventLoop->events[i].mask = AE_NONE; //將每一個fd的事件初始化為0
}
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
至此申請空間完成,我們整個EventLoop
結構如下圖所示:
我們完成了所有空間的申請和初始化工作:
loop->events : 是一個aeFileEvent 陣列,大小為 setsize 。
loop->fired : 是一個aeFiredEvent 陣列,大小也為 setsize 。
loop->timeEventHead :目前為NULL
loop->apidata:指向aeApiState,包含epfd和epoll_event陣列。
接著我們呼叫anetTcpServer
返回了listen_fd
,anetTcpServer
我們在anet.c
分析的時候再說,接下來重點是我們呼叫aeCreateFileEvent
將listen_fd
註冊到epfd
上的過程。
//ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd]; //利用fe指向eventLoop->events[listen_fd]
if (aeApiAddEvent(eventLoop, fd, mask) == -1) { //本質是呼叫epoll_ctl(epfd,EPOLL_CTL_ADD,fd,...);
return AE_ERR;
}
fe->mask |= mask; //如果fe->mask之前不是空,現在就相當於同時監控兩個事件
if (mask & AE_READABLE) {
fe->rfileProc = proc; //說明proc是讀操作的處理函式
}
if (mask & AE_WRITABLE) {
fe->wfileProc = proc; //說明proc是寫操作的處理函式
}
fe->clientData = clientData; //讓它們指向同一個client或者server例項
if (fd > eventLoop->maxfd) {
eventLoop->maxfd = fd; //如果新的fd大於maxfd,則更新maxfd
}
return AE_OK;
}
此時我們的整個EventLoop
變成了下面這個樣子:
可以看到:
1 : loop->events[4].mask = 1 , 表示讀,rfileProc 為 acceptTcpHandler。 因為它是listen_fd,負責接受連線。為什麼是 4 呢?因為 3 已經作為 epfd 的檔案描述符了。
2 : 我們將 fd = 4 & EPOLLIN事件註冊給了epfd。
現在就等著來新的連線了,因為這樣的話一旦檢測到listen_fd
上有資料可讀,那就會呼叫acceptTcpHandler
接受連線,這也是回掉機制的一種體現:我們現在已經給listen_fd
註冊了相應的回掉函數了,等著事件發生,然後去呼叫註冊好的函式。我們繼續往下走繼續看這個過程:
二、aeProcessEvents & acceptTcpHandler
繼續向下會進入aeMain
,之後一直輪詢呼叫aeProcessEvents
,接下來我們分析下aeProcessEvents
到底是怎麼處理各類事件的:
//ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) {
//如果flags什麼事件都沒有監聽,return 0
return 0;
}
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
/* 注意,我們即使沒有檔案事件,但是仍然想呼叫select/epoll,讓其阻塞直到我們想處理的
* 定時事件發生為止*/
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
//如果有定時事件處理
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) {
// for 迴圈查詢到最近需要發生的定時事件
shortest = aeSearchNearestTimer(eventLoop);
}
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next time event to fire? */
/* 計算我們需要等待的ms數,直到最近的定時事件發生*/
long long ms = (shortest->when_sec - now_sec) * 1000 + shortest->when_ms - now_ms;
if (ms > 0) {
//如果定時事件沒有過期,計算出需要等待的時間,作為epoll_wait的第四個引數
tvp->tv_sec = ms / 1000;
tvp->tv_usec = (ms % 1000) * 1000;
} else {
//否則置為0,epoll_wait就不會阻塞
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout to zero */
/*如果沒有找到定時事件 */
if (flags & AE_DONT_WAIT) { //設定了AE_DONT_WAIT操作,就不等
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else { //否則就阻塞等待直到事件發生
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp); //呼叫epoll_wait函式,返回需要處理的事件列表
for (int i = 0; i < numevents; i++) { //遍歷依次處理loop->fired
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[i].fd];
int mask = eventLoop->fired[i].mask;
int fd = eventLoop->fired[i].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1; //確保讀或者寫只執行一個
fe->rfileProc(eventLoop, fd, fe->clientData, mask); //執行讀處理
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
}
}
processed++;
}
}
/* Check time events */
/* 處理所有的時間事件 */
if (flags & AE_TIME_EVENTS) {
processed += processTimeEvents(eventLoop);
}
return processed; /* return the number of processed file/time events */
}
假設 listen_fd
此時發生了事件,那一定是有新的連線過來,fe->rfileProc(eventLoop, fd, fe->clientData, mask)
就會使用 acceptTcpHandler
接受連線:
static void acceptTcpHandler(aeEventLoop *loop, int fd, void *data, int mask)
{
char cip[64];
int cport;
server_t *server = (server_t *)data;
int cfd = anetTcpAccept(NULL, fd, cip, sizeof(cip), &cport); //呼叫accept接受連線
if (cfd != -1) {
printf("accepted ip %s:%d\n", cip, cport);
anetNonBlock(NULL, cfd); //設定socket非阻塞
anetEnableTcpNoDelay(NULL, cfd); //開啟TcpNoDelay選項
client_t *client = alloc_client(); //申請一個新的客戶端
if (!client) {
printf("alloc client error...close socket\n");
close(fd);
return;
}
client->loop = loop;
client->fd = cfd;
if (aeCreateFileEvent(loop, cfd, AE_READABLE, readEventHandler, client) == AE_ERR) {
//繼續呼叫aeCreateFileEvent給新連線的fd註冊可讀事件,並且註冊讀函式readEventHandler
if (errno == ERANGE) {
// or use aeResizeSetSize(server->loop, cfd) modify this limit
printf("so many client, close new.");
} else {
printf("create socket readable event error, close it.");
}
free_client(client);
}
}
}
處理到這裡,算是接受了一個連線,至於以後的讀寫操作,等到下次再分析~