1. 程式人生 > >redis事件迴圈處理框架

redis事件迴圈處理框架

redis伺服器啟動以後,先進行初始化配置, 然後進入主事件迴圈中監聽事件到來,並進行處理.
函式aeProcessEvents進行事件監聽和事件處理,其實現為(ae.c):

162 /* Process every pending time event, then every pending file event
163  * (that may be registered by time event callbacks just processed).
164  * Without special flags the function sleeps until some file event
165
* fires, or when the next time event occurrs (if any). 166 * 167 * If flags is 0, the function does nothing and returns. 168 * if flags has AE_ALL_EVENTS set, all the kind of events are processed. 169 * if flags has AE_FILE_EVENTS set, file events are processed. 170 * if flags has AE_TIME_EVENTS set
, time events are processed. 171 * if flags has AE_DONT_WAIT set the function returns ASAP until all 172 * the events that's possible to process without to wait are processed. 173 * 174 * The function returns the number of events processed. */ 175 int aeProcessEvents(aeEventLoop *eventLoop, int flags) 176
{ 177 int maxfd = 0, numfd = 0, processed = 0; 178 fd_set rfds, wfds, efds; 179 aeFileEvent *fe = eventLoop->fileEventHead; 180 aeTimeEvent *te; 181 long long maxId; 182 AE_NOTUSED(flags); 183 184 /* Nothing to do? return ASAP */ 185 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 186 187 FD_ZERO(&rfds); 188 FD_ZERO(&wfds); 189 FD_ZERO(&efds); 190 191 /* Check file events */ 192 if (flags & AE_FILE_EVENTS) { 193 while (fe != NULL) { 194 if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds); 195 if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds); 196 if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds); 197 if (maxfd < fe->fd) maxfd = fe->fd; 198 numfd++; 199 fe = fe->next; 200 } 201 } 202 /* Note that we want call select() even if there are no 203 * file events to process as long as we want to process time 204 * events, in order to sleep until the next time event is ready 205 * to fire. */ 206 if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 207 int retval; 208 aeTimeEvent *shortest = NULL; 209 struct timeval tv, *tvp; 210 211 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) 212 shortest = aeSearchNearestTimer(eventLoop); 213 if (shortest) { 214 long now_sec, now_ms; 215 216 /* Calculate the time missing for the nearest 217 * timer to fire. */ 218 aeGetTime(&now_sec, &now_ms); 219 tvp = &tv; 220 tvp->tv_sec = shortest->when_sec - now_sec; 221 if (shortest->when_ms < now_ms) { 222 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; 223 tvp->tv_sec --; 224 } else { 225 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; 226 } 227 } else { 228 /* If we have to check for events but need to return 229 * ASAP because of AE_DONT_WAIT we need to se the timeout 230 * to zero */ 231 if (flags & AE_DONT_WAIT) { 232 tv.tv_sec = tv.tv_usec = 0; 233 tvp = &tv; 234 } else { 235 /* Otherwise we can block */ 236 tvp = NULL; /* wait forever */ 237 } 238 } 239 240 retval = select(maxfd+1, &rfds, &wfds, &efds, tvp); 241 if (retval > 0) { 242 fe = eventLoop->fileEventHead; 243 while(fe != NULL) { 244 int fd = (int) fe->fd; 245 246 if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) || 247 (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) || 248 (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))) 249 { 250 int mask = 0; 251 252 if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) 253 mask |= AE_READABLE; 254 if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) 255 mask |= AE_WRITABLE; 256 if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)) 257 mask |= AE_EXCEPTION; 258 fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); 259 processed++; 260 /* After an event is processed our file event list 261 * may no longer be the same, so what we do 262 * is to clear the bit for this file descriptor and 263 * restart again from the head. */ 264 fe = eventLoop->fileEventHead; 265 FD_CLR(fd, &rfds); 266 FD_CLR(fd, &wfds); 267 FD_CLR(fd, &efds); 268 } else { 269 fe = fe->next; 270 } 271 } 272 } 273 } 274 /* Check time events */ 275 if (flags & AE_TIME_EVENTS) { 276 te = eventLoop->timeEventHead; 277 maxId = eventLoop->timeEventNextId-1; 278 while(te) { 279 long now_sec, now_ms; 280 long long id; 281 282 if (te->id > maxId) { 283 te = te->next; 284 continue; 285 } 286 aeGetTime(&now_sec, &now_ms); 287 if (now_sec > te->when_sec || 288 (now_sec == te->when_sec && now_ms >= te->when_ms)) 289 { 290 int retval; 291 292 id = te->id; 293 retval = te->timeProc(eventLoop, id, te->clientData); 294 /* After an event is processed our time event list may 295 * no longer be the same, so we restart from head. 296 * Still we make sure to don't process events registered 297 * by event handlers itself in order to don't loop forever. 298 * To do so we saved the max ID we want to handle. */ 299 if (retval != AE_NOMORE) { 300 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); 301 } else { 302 aeDeleteTimeEvent(eventLoop, id); 303 } 304 te = eventLoop->timeEventHead; 305 } else { 306 te = te->next; 307 } 308 } 309 } 310 return processed; /* return the number of processed file/time events */ 311 }

首先Line185檢查要處理的事件型別, 如果flags引數沒有設定定時事件和檔案事件, 該函式直接返回. 該函式被呼叫的時候,傳入flags的實參值為AE_ALL_EVENTS,, 所以定時事件和檔案事件都需要處理.該函式是在單執行緒裡面使用非同步IO的方式(使用select系統呼叫)處理觸發的事件, 接下來初始化三個檔案描述符集.Line191:201獲得有效的檔案事件物件,根據其mask欄位的值,決定是否監聽指定的觸發型別.對於redis剛啟動狀態下, 只有初始化建立的一個檔案事件物件, 其maskAE_READABLE, 該物件用來監聽客戶端連線. 注意的是, 這個物件將永遠有效,即不會被刪除, 且其mask欄位不會改變, 一直是AE_READABLE.對於有效的檔案事件, 更新區域性變數maxfd, 以滿足select系統呼叫的規範.Line206的條件判斷肯定會滿足, 因為至少存在一個檔案事件等待客戶端連線, 及numfd用永遠大於0.Line211:238獲得最先超時的定時事件物件,並通過和當前時間進行比較產生差值(區域性變數tvp), 這個差值就是select系統呼叫正確的超時時間.Line240進行select系統呼叫, 如果沒有超時,並且沒有檔案事件被出發,該呼叫將阻塞,當其返回時, 如果有檔案事件被觸發,返回值大於0, 否則如果是超時返回,其返回值為0, 產生錯誤將返回-1.Line241:273是檔案事件被觸發的處理邏輯,通過遍歷檔案事件物件連結串列,檢查該物件是否被觸發,如果被觸發,記錄其觸發型別,儲存在區域性變數mask中.然後呼叫該檔案事件物件的fileProc指標進行處理, fileProc處理完畢以後, 有可能該檔案事件物件從連結串列中被刪除,比如客戶端呼叫了quit redis命令終止了連線, 或者redis向客戶端返回了響應命令.Line271更新區域性變數fe重新指向檔案事件物件連結串列頭,以免該指標指向一個被刪除的物件, 同時也是重新遍歷該連結串列進行正確賦值.Line272:274將該觸發的檔案事件物件描述符從其監聽集合中清除, 以免這裡造成死迴圈.如果Line246:249的條件不滿足,說明該檔案事件物件沒有被觸發或者被觸發但是已經呼叫其fileProc處理過了.這時Line269檢查連結串列中下一個檔案事件物件.

對於初始啟動的redis伺服器, 只有新的客戶端連線會觸發檔案事件物件,該物件的fileProc對應的處理函式為acceptHandler(見redis啟動初始化過程說明),該函式的實現為(redis.c):

971 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
972     int cport, cfd;
973     char cip[128];
974     REDIS_NOTUSED(el);
975     REDIS_NOTUSED(mask);
976     REDIS_NOTUSED(privdata);
977 
978     cfd = anetAccept(server.neterr, fd, cip, &cport);
979     if (cfd == AE_ERR) {
980         redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
981         return;
982     }
983     redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
984     if (createClient(cfd) == REDIS_ERR) {
985         redisLog(REDIS_WARNING,"Error allocating resoures for the client");
986         close(cfd); /* May be already closed, just ingore errors */
987         return;
988     }
989 }

Line978呼叫anetAccept返回新連線的客戶端的socket檔案描述符,這時候的accept系統呼叫不會阻塞,因為是新客戶端的連線觸發了select系統呼叫.Line984呼叫createClient為該客戶端建立對應的redisClient型別的物件.

函式createClient的實現為(redis.c):

933 static int createClient(int fd) {
934     redisClient *c = malloc(sizeof(*c));
935 
936     anetNonBlock(NULL,fd);
937     anetTcpNoDelay(NULL,fd);
938     if (!c) return REDIS_ERR;
939     selectDb(c,0);
940     c->fd = fd;
941     c->querybuf = sdsempty();
942     c->argc = 0;
943     c->bulklen = -1;
944     c->sentlen = 0;
945     c->lastinteraction = time(NULL);
946     if ((c->reply = listCreate()) == NULL) oom("listCreate");
947     listSetFreeMethod(c->reply,decrRefCount);
948     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
949         readQueryFromClient, c, NULL) == AE_ERR) {
950         freeClient(c);
951         return REDIS_ERR;
952     }
953     if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
954     return REDIS_OK;
955 }

首先看一下redisClient結構,其標識一個客戶端,其定義為(redis.c):

69 typedef struct redisClient {
70     int fd;
71     dict *dict;
72     sds querybuf;
73     sds argv[REDIS_MAX_ARGS];
74     int argc;
75     int bulklen;    /* bulk read len. -1 if not in bulk read mode */
76     list *reply;
77     int sentlen;
78     time_t lastinteraction; /* time of the last interaction, used for timeout */
79 } redisClient;

欄位fd是客戶端的socket描述符,欄位dict指向全域性物件server中的dict物件欄位,server預設有16個dict物件.新的客戶端連線到來時,會選擇server的第0個dict物件.dict型別是描述記憶體中的儲存資料的結構.欄位querybuf用來儲存客戶端協議命令及其引數,而欄位argcargv代表解析後的協議命令的引數個數和具體的引數值.型別sds是一個char *型別的指標,用來和型別sdshdr配合使用,以實現動態string型別物件.欄位bulklen標識bulk型別協議命令中value的位元組長度,欄位replylist型別的物件,記錄redis響應命令,需要注意的是redis伺服器傳送給客戶端的響應的各個組成部分可能是多個網路傳輸,所以這個是一個連結串列記錄響應的各個組成部分.自段sentlen是再發送響應時,實際向socket寫入的資料,用於根據響應命令資料是否傳送完成.欄位lastinteraction標識該客戶端和redis伺服器最近的互動時間戳, 如果客戶端長時間沒有和redis伺服器進行互動, redis伺服器有可能會中斷同該客戶端的連線.

回到函式createClient中,Line941為欄位querybuf建立一個空的sds物件. 空的sds由函式sdsempty建立, 非空的sds由函式sdsnew建立.為欄位reply建立一個list型別的物件,該物件初始狀態下, 連結串列節點為空.Line948:952呼叫函式aeCreateFileEvent為該客戶端建立一個檔案事件物件,插入到全域性的檔案事件管理連結串列中.redis協議命令有客戶端傳送到伺服器,伺服器向客戶端傳送響應結果, 因此設定該檔案事件物件的觸發型別為AE_READABLE, 並且其觸發後的處理函式為readQueryFromClient.Line953將該客戶端物件插入到全域性連結串列中.

Line240:273中的檔案事件處理完畢後, Line274:309將處理定時事件處理邏輯.和檔案事件處理類似,定時事件也是迴圈檢查全域性的定時時間物件連結串列,Line286:289判斷該定時事件物件是否超時,如果已經超時,則處理該定時事件物件,如果未超時,檢查全域性連結串列中的下一個定時事件物件.Line293為呼叫該定時事件物件的處理函式,函式指標timeProc指向的是serverCron(見redis初始化處理部分).該函式的實現為(redis.c);

466 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
467     int j, size, used, loops = server.cronloops++;
468     REDIS_NOTUSED(eventLoop);
469     REDIS_NOTUSED(id);
470     REDIS_NOTUSED(clientData);
471 
472     /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
473      * we resize the hash table to save memory */
474     for (j = 0; j < server.dbnum; j++) {
475         size = dictGetHashTableSize(server.dict[j]);
476         used = dictGetHashTableUsed(server.dict[j]);
477         if (!(loops % 5) && used > 0) {
478             redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size);
479             // dictPrintStats(server.dict);
480         }
481         if (size && used && size > REDIS_HT_MINSLOTS &&
482             (used*100/size < REDIS_HT_MINFILL)) {
483             redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
484             dictResize(server.dict[j]);
485             redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
486         }
487     }
488 
489     /* Show information about connected clients */
490     if (!(loops % 5)) redisLog(REDIS_DEBUG,"%d clients connected",listLength(server.clients));
491 
492     /* Close connections of timedout clients */
493     if (!(loops % 10))
494         closeTimedoutClients();
495 
496     /* Check if a background saving in progress terminated */
497     if (server.bgsaveinprogress) {
498         int statloc;
499         if (wait4(-1,&statloc,WNOHANG,NULL)) {
500             int exitcode = WEXITSTATUS(statloc);
501             if (exitcode == 0) {
502                 redisLog(REDIS_NOTICE,
503                     "Background saving terminated with success");
504                 server.dirty = 0;
505                 server.lastsave = time(NULL);
506             } else {
507                 redisLog(REDIS_WARNING,
508                     "Background saving error");
509             }
510             server.bgsaveinprogress = 0;
511         }
512     } else {
513         /* If there is not a background saving in progress check if
514          * we have to save now */
515          time_t now = time(NULL);
516          for (j = 0; j < server.saveparamslen; j++) {
517             struct saveparam *sp = server.saveparams+j;
518 
519             if (server.dirty >= sp->changes &&
520                 now-server.lastsave > sp->seconds) {
521                 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
522                     sp->changes, sp->seconds);
523                 saveDbBackground("dump.rdb");
524                 break;
525             }
526          }
527     }
528     return 1000;
529 }

Line467更新全域性的計數值,記錄定時事件處理的次數.Line472:487依次檢查server.dbnum個(預設是16)記憶體資料庫(是一個雜湊表資料結構),Line475:476獲得該記憶體資料庫的size和used的值,這裡的size也就是哈系表結構中的BUCKETS.在redis使用過程中,used可能小於,等於或者大於size的值.Line481:486檢查如果雜湊表比較稀疏,將呼叫dictResize調整其容量,該函式的實現為(dict.c):

113 /* Resize the table to the minimal size that contains all the elements,
114  * but with the invariant of a USER/BUCKETS ration near to <= 1 */
115 int dictResize(dict *ht)
116 {
117     int minimal = ht->used;
118 
119     if (minimal < DICT_HT_INITIAL_SIZE)
120         minimal = DICT_HT_INITIAL_SIZE;
121     return dictExpand(ht, minimal);
122 }

調整的規則是,使size大小盡可能接近與used. 在函式dictExpand會看到, size的值是2的冪,所以sized可能大於或者等於used.函式dictExpand在處理redis協議命令時再具體分析.

回到函式serverCron, Line493:494嘗試終止空閒的客戶端連線,函式closeTimedoutClients的實現為(redis.c):

448 void closeTimedoutClients(void) {
449     redisClient *c;
450     listIter *li;
451     listNode *ln;
452     time_t now = time(NULL);
453 
454     li = listGetIterator(server.clients,AL_START_HEAD);
455     if (!li) return;
456     while ((ln = listNextElement(li)) != NULL) {
457         c = listNodeValue(ln);
458         if (now - c->lastinteraction > server.maxidletime) {
459             redisLog(REDIS_DEBUG,"Closing idle client");
460             freeClient(c);
461         }
462     }
463     listReleaseIterator(li);
464 }

首先Line454:455建立了一個連結串列遍歷器來訪問全域性的客戶端物件連結串列server.clients,因為該連結串列是個list型別,是個雙向連結串列,因此遍歷器在建立的時候可以指定從頭訪問該連結串列或者從尾部訪問該連結串列.Line456:462依次訪問連結串列中的客戶端物件,並檢查其空閒時間是否超過了系統配置時間server.maxidletime(預設是5分鐘),如果是,則呼叫函式freeClient終止該客戶端連線.最後Line463釋放動態建立的遍歷器物件.函式freeClient的實現為(redis.c):

701 static void freeClient(redisClient *c) {
702     listNode *ln;
703 
704     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
705     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
706     sdsfree(c->querybuf);
707     listRelease(c->reply);
708     freeClientArgv(c);
709     close(c->fd);
710     ln = listSearchKey(server.clients,c);
711     assert(ln != NULL);
712     listDelNode(server.clients,ln);
713     free(c);
714 }

Line704:705釋放該客戶端物件相關的檔案事件物件,對於不同的觸發型別AE_READABLEAE_WRITABLE, 同一客戶端物件會建立不同的檔案事件物件.Line706:708是釋放該客戶端物件欄位指向的動態記憶體,以免記憶體洩漏.Line709關閉socket套接字,釋放TCP連線.Line710:712從全域性連結串列中刪除該客戶端物件.Line713釋放該客戶端物件的記憶體,因為客戶端物件都是由堆上動態分配的記憶體.

回到函式serverCron, Line496:527是對於儲存記憶體資料到磁碟的相關處理.Line497檢查是否子程序正在寫磁碟操作,如果是,阻塞當前程序等待子程序寫磁碟操作結束.子程序成功結束後,當前程序會更新全域性物件server,設定記憶體資料髒標識為0,設定寫磁碟時間戳為當前時間,並且清除正在寫磁碟標識.如果沒有子程序正在寫磁碟操作,Line516:526判斷當前是否可以進行非同步的寫磁碟操作,有三個預設的判斷標準,如果一小時之內記憶體資料庫發生過至少一次的更新操作或者如果五分鐘之內記憶體資料庫發生過至少100次的更新操作或者一分鐘之內記憶體資料庫至少發生過10000次的更系操作,滿足任何一個標準即呼叫函式saveDbBackground進行非同步寫磁碟操作.該函式的實現為(redis.c):

1169 static int saveDbBackground(char *filename) {
1170     pid_t childpid;
1171 
1172     if (server.bgsaveinprogress) return REDIS_ERR;
1173     if ((childpid = fork()) == 0) {
1174         /* Child */
1175         close(server.fd);
1176         if (saveDb(filename) == REDIS_OK) {
1177             exit(0);
1178         } else {
1179             exit(1);
1180         }
1181     } else {
1182         /* Parent */
1183         redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1184         server.bgsaveinprogress = 1;
1185         return REDIS_OK;
1186     }
1187     return REDIS_OK; /* unreached */
1188 }

Line1173:1181建立子程序,並呼叫函式saveDb進行寫磁碟操作,該函式返回後,結束子程序.Line1182:1185仍然處於可執行狀態,設定全域性的server.bgsaveinprogress標識並返回.

函式saveDb的實現為(redis.c):

1055 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1056 static int saveDb(char *filename) {
1057     dictIterator *di = NULL;
1058     dictEntry *de;
1059     uint32_t len;
1060     uint8_t type;
1061     FILE *fp;
1062     char tmpfile[256];
1063     int j;
1064 
1065     snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1066     fp = fopen(tmpfile,"w");
1067     if (!fp) {
1068         redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1069         return REDIS_ERR;
1070     }
1071     if (fwrite("REDIS0000",9,1,fp) == 0) goto werr;
1072     for (j = 0; j < server.dbnum; j++) {
1073         dict *d = server.dict[j];
1074         if (dictGetHashTableUsed(d) == 0) continue;
1075         di = dictGetIterator(d);
1076         if (!di) {
1077             fclose(fp);
1078             return REDIS_ERR;
1079         }
1080 
1081         /* Write the SELECT DB opcode */
1082         type = REDIS_SELECTDB;
1083         len = htonl(j);
1084         if (fwrite(&type,1,1,fp) == 0) goto werr;
1085         if (fwrite(&len,4,1,fp) == 0) goto werr;
1086 
1087         /* Iterate this DB writing every entry */
1088         while((de = dictNext(di)) != NULL) {
1089             sds key = dictGetEntryKey(de);
1090             robj *o = dictGetEntryVal(de);
1091 
1092             type = o->type;
1093             len = htonl(sdslen(key));
1094             if (fwrite(&type,1,1,fp) == 0) goto werr;
1095             if (fwrite(&len,4,1,fp) == 0) goto werr;
1096             if (fwrite(key,sdslen(key),1,fp) == 0) goto werr;
1097             if (type == REDIS_STRING) {
1098                 /* Save a string value */
1099                 sds sval = o->ptr;
1100                 len = htonl(sdslen(sval));
1101                 if (fwrite(&len,4,1,fp) == 0) goto werr;
1102                 if (sdslen(sval) &&
1103                     fwrite(sval,sdslen(sval),1,fp) == 0) goto werr;
1104             } else if (type == REDIS_LIST) {
1105                 /* Save a list value */
1106                 list *list = o->ptr;
1107                 listNode *ln = list->head;
1108 
1109                 len = htonl(listLength(list));
1110                 if (fwrite(&len,4,1,fp) == 0) goto werr;
1111                 while(ln) {
1112                     robj *eleobj = listNodeValue(ln);
1113                     len = htonl(sdslen(eleobj->ptr));
1114                     if (fwrite(&len,4,1,fp) == 0) goto werr;
1115                     if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
1116                         goto werr;
1117                     ln = ln->next;
1118                 }
1119             } else if (type == REDIS_SET) {
1120                 /* Save a set value */
1121                 dict *set = o->ptr;
1122                 dictIterator *di = dictGetIterator(set);
1123                 dictEntry *de;
1124 
1125                 if (!set) oom("dictGetIteraotr");
1126                 len = htonl(dictGetHashTableUsed(set));
1127                 if (fwrite(&len,4,1,fp) == 0) goto werr;
1128                 while((de = dictNext(di)) != NULL) {
1129                     robj *eleobj;
1130 
1131                     eleobj = dictGetEntryKey(de);
1132                     len = htonl(sdslen(eleobj->ptr));
1133                     if (fwrite(&len,4,1,fp) == 0) goto werr;
1134                     if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
1135                         goto werr;
1136                 }
1137                 dictReleaseIterator(di);
1138             } else {
1139                 assert(0 != 0);
1140             }
1141         }
1142         dictReleaseIterator(di);
1143     }
1144     /* EOF opcode */
1145     type = REDIS_EOF;
1146     if (fwrite(&type,1,1,fp) == 0) goto werr;
1147     fclose(fp);
1148    
1149     /* Use RENAME to make sure the DB file is changed atomically only
1150      * if the generate DB file is ok. */
1151     if (rename(tmpfile,filename) == -1) {
1152         redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1153         unlink(tmpfile);
1154         return REDIS_ERR;
1155     }
1156     redisLog(REDIS_NOTICE,"DB saved on disk");
1157     server.dirty = 0;
1158     server.lastsave = time(NULL);
1159     return REDIS_OK;
1160 
1161 werr:
1162     fclose(fp);
1163     unlink(tmpfile);
1164     redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1165     if (di) dictReleaseIterator(di);
1166     return REDIS_ERR;
1167 }

在分析寫磁碟操作之前,說明一下redis記憶體資料庫的邏輯結構,redis是一個儲存key/value的NoSQL型別的資料庫,key的型別是string,redis中的string其實就是二進位制,只不過不不包含’\n’和空格,因為這兩個位元組被用作解析redis協議命令的分隔符.value支援三種類型,包括string,listset,其中listset是複合型別.redis的記憶體資料庫的邏輯結構由全域性物件serverdict欄位管理,涉及到的相關型別有dict,dictType,sds,sdshdr,robj

型別dict的定義為(dict.h):

31 typedef struct dict {
32     dictEntry **table;
33     dictType *type;
34     unsigned int size;
35     unsigned int sizemask;
36     unsigned int used;
37     void *privdata;
38 } dict;

欄位table組織成一個雜湊表結構,其BUCKETS的數量即欄位size的值,size的取值為16,32,64數列.欄位sizemask的值為size-1. 欄位used為雜湊表中節點的個數,需要說明的是節點的個數指的是key的個數,因為節點有可能是複合型別listset,同一個key下面有多個子節點.欄位type包含一組函式指標,用於操作雜湊表中的節點.

型別dictType的定義(dict.h):

22 typedef struct dictType {
23     unsigned int (*hashFunction)(const void *key);
24     void *(*keyDup)(void *privdata, const void *key);
25     void *(*valDup)(void *privdata, const void *obj);
26     int (*keyCompare)(void *privdata, const void *key1, const void *key2);
27     void (*keyDestructor)(void *privdata, void *key);
28     void (*valDestructor)(void *privdata, void *obj);
29 } dictType;

這一組函式指標用來操作雜湊表中的節點,比如雜湊函式hashFunction會根據key的值對映到指定索引的BUCKET上,其他節點操作還包括key和value的複製,key的比較,key和value的銷燬操作等.

型別dictEntry的定義為(dict.h):

16 typedef struct dictEntry {
17     void *key;
18     void *val;
19     struct dictEntry *next;
20 } dictEntry;

型別dictEntry標識雜湊表中的節點.當不同節點的key具有相同的雜湊值,這些衝突節點就以連結串列的形式組織起來.欄位key的型別為sds,欄位value的型別為robj.欄位next指向衝突連結串列中的下一個節點.

型別sds其實就是一個char *指標,通常和型別sdshdr配合使用. 該型別的定義為(sds.h):

33 typedef char *sds;
34 
35 struct sdshdr {
36     long len;
37     long