1. 程式人生 > >Redis網路模型的原始碼分析

Redis網路模型的原始碼分析

Redis的網路模型是基於I/O多路複用程式來實現的。原始碼中包含四種多路複用函式庫epoll、select、evport、kqueue。在程式編譯時會根據系統自動選擇這四種庫其中之一。下面以epoll為例,來分析Redis的I/O模組的原始碼。 ## epoll系統呼叫方法 Redis網路事件處理模組的程式碼都是圍繞epoll那三個系統方法來寫的。先把這三個方法弄清楚,後面就不難了。 ### epfd = epoll_create(1024); **建立epoll例項** 引數:表示該 epoll 例項最多可監聽的 socket fd(檔案描述符)數量。 返回: epoll 專用的檔案描述符。 ### int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 管理epoll中的事件,對事件進行註冊、修改和刪除。 ``` 引數: epfd:epoll例項的檔案描述符; op:取值三種:EPOLL_CTL_ADD 註冊、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 刪除; fd:socket的檔案描述符; epoll_event *event:事件 ``` event代表一個事件,類似於Java NIO中的channel“通道”。epoll_event 的結構如下: ``` typedef union epoll_data { void *ptr; int fd; /* socket檔案描述符 */ __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events 就是各種待監聽操作的操作碼求與的結果,例如EPOLLIN(fd可讀)、EPOLLOUT(fd可寫) */ epoll_data_t data; /* User data variable */ }; ``` ### int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout); 等待事件是否就緒,類似於Java NIO中 select 方法。如果事件就緒,將就緒的event存入events陣列中。 ``` 引數 epfd:epoll例項的檔案描述符; events:已就緒的事件陣列; intmaxevents:每次能處理的事件數; timeout:阻塞時間,等待產生就緒事件的超時值。 ``` ## 原始碼分析 #### 事件 Redis事件系統中將事件分為兩種型別: - 檔案事件;網路套接字對應的事件; - 時間事件:Redis中一些定時操作事件,例如 serverCron 函式。 下面從事件的註冊、觸發兩個流程對原始碼進行分析 ### 繫結事件 #### 建立 eventLoop 在 initServer方法(由 redis.c 的 main 函式呼叫) 中,在建立 RedisDb 物件的同時,會初始化一個“eventLoop”物件,我稱之為事件處理器物件。結構體的關鍵成員變數如下所示: ``` struct aeEventLoop{ aeFileEvent *events;//已註冊的檔案事件陣列 aeFiredEvent *fired;//已就緒的檔案事件陣列 aeTimeEvent *timeEventHead;//時間事件陣列 ... } ``` 初始化 eventLoop 在 ae.c 的“aeCreateEventLoop”方法中執行。該方法中除了初始化 eventLoop 還呼叫如下方法初始化了一個 epoll 例項。 ``` /* * ae_epoll.c * 建立一個新的 epoll 例項,並將它賦值給 eventLoop */ static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; // 初始化事件槽空間 state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } // 建立 epoll 例項 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } // 賦值給 eventLoop eventLoop->apidata = state; return 0; } ``` 也正是在此處呼叫了系統方法“epoll_create”。這裡的state是一個aeApiState結構,如下所示: ``` /* * 事件狀態 */ typedef struct aeApiState { // epoll 例項描述符 int epfd; // 事件槽 struct epoll_event *events; } aeApiState; ``` 這個 state 由 eventLoop->apidata 來記錄。 #### 繫結ip埠與控制代碼 通過 listenToPort 方法開啟TCP埠,每個IP埠會對應一個檔案描述符 ipfd(因為伺服器可能會有多個ip地址) ``` // 開啟 TCP 監聽埠,用於等待客戶端的命令請求 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); ``` **注意:***eventLoop 和 ipfd 分別被 server.el 和 server.ipfd[] 引用。server 是結構體 RedisServer 的例項,是Redis的全域性變數。 #### 註冊事件 如下所示程式碼,為每一個檔案描述符繫結一個事件函式 ``` // initServer方法: for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // ae.c 中的 aeCreateFileEvent 方法 /* * 根據 mask 引數的值,監聽 fd 檔案的狀態, * 當 fd 可用時,執行 proc 函式 */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } if (fd >= eventLoop->setsize) return AE_ERR; // 取出檔案事件結構 aeFileEvent *fe = &eventLoop->events[fd]; // 監聽指定 fd 的指定事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 設定檔案事件型別,以及事件的處理器 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; // 私有資料 fe->clientData = clientData; // 如果有需要,更新事件處理器的最大 fd if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } ``` aeCreateFileEvent 函式中有一個方法呼叫:aeApiAddEvent,程式碼如下 ``` /* * ae_epoll.c * 關聯給定事件到 fd */ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. * * 如果 fd 沒有關聯任何事件,那麼這是一個 ADD 操作。 * * 如果已經關聯了某個/某些事件,那麼這是一個 MOD 操作。 */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // 註冊事件到 epoll ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } ``` 這裡實際上就是呼叫系統方法“epoll_ctl”,將事件(檔案描述符)註冊進 epoll 中。首先要封裝一個 epoll_event 結構,即 ee ,通過“epoll_ctl”將其註冊進 epoll 中。 除此之外,aeCreateFileEvent 還完成了下面兩個重要操作: - 將事件函式“acceptTcpHandler”存入了eventLoop中,即由eventLoop->events[fd]->rfileProc 來引用(也可能是wfileProc,分別代表讀事件和寫事件); - 將當操作碼新增進 eventLoop->events[fd]->mask 中(mask 類似於JavaNIO中的ops操作碼,代表事件型別)。 ### 事件監聽與執行 redis.c 的main函式會呼叫 ae.c 中的 main 方法,如下所示: ``` /* * 事件處理器的主迴圈 */ void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 如果有需要在事件處理前執行的函式,那麼執行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 開始處理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } ``` 上述程式碼會呼叫 aeProcessEvents 方法用於處理事件,方法如下所示 ``` /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 處理所有已到達的時間事件,以及所有已就緒的檔案事件。 * 函式的返回值為已處理事件的數量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 獲取最近的時間事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果時間事件存在的話 // 那麼根據最近可執行時間事件和現在時間的時間差來決定檔案事件的阻塞時間 long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ // 計算距今最近的時間事件還要多久才能達到 // 並將該時間距儲存在 tv 結構中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } // 時間差小於 0 ,說明事件已經可以執行了,將秒和毫秒設為 0 (不阻塞) if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // 執行到這一步,說明沒有時間事件 // 那麼根據 AE_DONT_WAIT 是否設定來決定是否阻塞,以及阻塞的時間長度 /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 設定檔案事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 檔案事件可以阻塞直到有事件到達為止 tvp = NULL; /* wait forever */ } } // 處理檔案事件,阻塞時間由 tvp 決定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 從已就緒陣列中獲取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 讀事件 if (fe->mask & mask & AE_READABLE) { // rfired 確保讀/寫事件只能執行其中一個 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 寫事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ // 執行時間事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; } ``` 該函式中程式碼大致分為三個主要步驟 - 根據時間事件與當前時間的關係,決定阻塞時間 tvp; - 呼叫aeApiPoll方法,將就緒事件都寫入eventLoop->fired[]中,返回就緒事件數目; - 遍歷eventLoop->fired[],遍歷每一個就緒事件,執行之前繫結好的方法rfileProc 或者wfileProc。 ae_epoll.c 中的 aeApiPoll 方法如下所示: ``` /* * 獲取可執行事件 */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 等待時間 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 有至少一個事件就緒? if (retval > 0) { int j; // 為已就緒事件設定相應的模式 // 並加入到 eventLoop 的 fired 陣列中 numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } // 返回已就緒事件個數 return numevents; } ``` 執行epoll_wait後,就緒的事件會被寫入 eventLoop->apidata->events 事件槽。後面的迴圈就是將事件槽中的事件寫入到 eventLoop->fired[] 中。具體描述:每一個事件都是一個 epoll_event 結構,用e來指代,則e.data.fd代表檔案描述符,e->events表示其操作碼,將操作碼轉化為mask,最後將fd 和 mask 都寫入eventLoop->fired[j]中。 之後,在外層的 aeProcessEvents 方法中會執行函式指標 rfileProc 或者 wfileProc 指向的方法,例如前文提到已註冊的“acceptTcpHandler”。 ## 總結 Redis的網路模組其實是一個簡易的Reactor模式。本文順著“服務端註冊事件——>接受客戶端連線——>監聽事件是否就緒——>執行事件”這樣的路線,來分析Redis原始碼,描述了Redis接受客戶端connect的過程。實際上NIO的思想都基本