redis事件迴圈處理框架
redis伺服器啟動以後,先進行初始化配置, 然後進入主事件迴圈中監聽事件到來,並進行處理.
函式aeProcessEvents
進行事件監聽和事件處理,其實現為(ae.c):
162 /* Process every pending time event, then every pending file event
163 * (that may be registered by time event callbacks just processed).
164 * Without special flags the function sleeps until some file event
165 * fires, or when the next time event occurrs (if any).
166 *
167 * If flags is 0, the function does nothing and returns.
168 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
169 * if flags has AE_FILE_EVENTS set, file events are processed.
170 * if flags has AE_TIME_EVENTS set , time events are processed.
171 * if flags has AE_DONT_WAIT set the function returns ASAP until all
172 * the events that's possible to process without to wait are processed.
173 *
174 * The function returns the number of events processed. */
175 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
176 {
177 int maxfd = 0, numfd = 0, processed = 0;
178 fd_set rfds, wfds, efds;
179 aeFileEvent *fe = eventLoop->fileEventHead;
180 aeTimeEvent *te;
181 long long maxId;
182 AE_NOTUSED(flags);
183
184 /* Nothing to do? return ASAP */
185 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
186
187 FD_ZERO(&rfds);
188 FD_ZERO(&wfds);
189 FD_ZERO(&efds);
190
191 /* Check file events */
192 if (flags & AE_FILE_EVENTS) {
193 while (fe != NULL) {
194 if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);
195 if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);
196 if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);
197 if (maxfd < fe->fd) maxfd = fe->fd;
198 numfd++;
199 fe = fe->next;
200 }
201 }
202 /* Note that we want call select() even if there are no
203 * file events to process as long as we want to process time
204 * events, in order to sleep until the next time event is ready
205 * to fire. */
206 if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
207 int retval;
208 aeTimeEvent *shortest = NULL;
209 struct timeval tv, *tvp;
210
211 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
212 shortest = aeSearchNearestTimer(eventLoop);
213 if (shortest) {
214 long now_sec, now_ms;
215
216 /* Calculate the time missing for the nearest
217 * timer to fire. */
218 aeGetTime(&now_sec, &now_ms);
219 tvp = &tv;
220 tvp->tv_sec = shortest->when_sec - now_sec;
221 if (shortest->when_ms < now_ms) {
222 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
223 tvp->tv_sec --;
224 } else {
225 tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
226 }
227 } else {
228 /* If we have to check for events but need to return
229 * ASAP because of AE_DONT_WAIT we need to se the timeout
230 * to zero */
231 if (flags & AE_DONT_WAIT) {
232 tv.tv_sec = tv.tv_usec = 0;
233 tvp = &tv;
234 } else {
235 /* Otherwise we can block */
236 tvp = NULL; /* wait forever */
237 }
238 }
239
240 retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
241 if (retval > 0) {
242 fe = eventLoop->fileEventHead;
243 while(fe != NULL) {
244 int fd = (int) fe->fd;
245
246 if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
247 (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
248 (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
249 {
250 int mask = 0;
251
252 if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
253 mask |= AE_READABLE;
254 if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
255 mask |= AE_WRITABLE;
256 if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
257 mask |= AE_EXCEPTION;
258 fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);
259 processed++;
260 /* After an event is processed our file event list
261 * may no longer be the same, so what we do
262 * is to clear the bit for this file descriptor and
263 * restart again from the head. */
264 fe = eventLoop->fileEventHead;
265 FD_CLR(fd, &rfds);
266 FD_CLR(fd, &wfds);
267 FD_CLR(fd, &efds);
268 } else {
269 fe = fe->next;
270 }
271 }
272 }
273 }
274 /* Check time events */
275 if (flags & AE_TIME_EVENTS) {
276 te = eventLoop->timeEventHead;
277 maxId = eventLoop->timeEventNextId-1;
278 while(te) {
279 long now_sec, now_ms;
280 long long id;
281
282 if (te->id > maxId) {
283 te = te->next;
284 continue;
285 }
286 aeGetTime(&now_sec, &now_ms);
287 if (now_sec > te->when_sec ||
288 (now_sec == te->when_sec && now_ms >= te->when_ms))
289 {
290 int retval;
291
292 id = te->id;
293 retval = te->timeProc(eventLoop, id, te->clientData);
294 /* After an event is processed our time event list may
295 * no longer be the same, so we restart from head.
296 * Still we make sure to don't process events registered
297 * by event handlers itself in order to don't loop forever.
298 * To do so we saved the max ID we want to handle. */
299 if (retval != AE_NOMORE) {
300 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
301 } else {
302 aeDeleteTimeEvent(eventLoop, id);
303 }
304 te = eventLoop->timeEventHead;
305 } else {
306 te = te->next;
307 }
308 }
309 }
310 return processed; /* return the number of processed file/time events */
311 }
首先Line185檢查要處理的事件型別, 如果flags
引數沒有設定定時事件和檔案事件, 該函式直接返回. 該函式被呼叫的時候,傳入flags
的實參值為AE_ALL_EVENTS
,, 所以定時事件和檔案事件都需要處理.該函式是在單執行緒裡面使用非同步IO的方式(使用select系統呼叫)處理觸發的事件, 接下來初始化三個檔案描述符集.Line191:201獲得有效的檔案事件物件,根據其mask
欄位的值,決定是否監聽指定的觸發型別.對於redis剛啟動狀態下, 只有初始化建立的一個檔案事件物件, 其mask
是AE_READABLE
, 該物件用來監聽客戶端連線. 注意的是, 這個物件將永遠有效,即不會被刪除, 且其mask
欄位不會改變, 一直是AE_READABLE
.對於有效的檔案事件, 更新區域性變數maxfd
, 以滿足select系統呼叫的規範.Line206的條件判斷肯定會滿足, 因為至少存在一個檔案事件等待客戶端連線, 及numfd
用永遠大於0.Line211:238獲得最先超時的定時事件物件,並通過和當前時間進行比較產生差值(區域性變數tvp
), 這個差值就是select系統呼叫正確的超時時間.Line240進行select系統呼叫, 如果沒有超時,並且沒有檔案事件被出發,該呼叫將阻塞,當其返回時, 如果有檔案事件被觸發,返回值大於0, 否則如果是超時返回,其返回值為0, 產生錯誤將返回-1.Line241:273是檔案事件被觸發的處理邏輯,通過遍歷檔案事件物件連結串列,檢查該物件是否被觸發,如果被觸發,記錄其觸發型別,儲存在區域性變數mask
中.然後呼叫該檔案事件物件的fileProc
指標進行處理, fileProc
處理完畢以後, 有可能該檔案事件物件從連結串列中被刪除,比如客戶端呼叫了quit
redis命令終止了連線, 或者redis向客戶端返回了響應命令.Line271更新區域性變數fe
重新指向檔案事件物件連結串列頭,以免該指標指向一個被刪除的物件, 同時也是重新遍歷該連結串列進行正確賦值.Line272:274將該觸發的檔案事件物件描述符從其監聽集合中清除, 以免這裡造成死迴圈.如果Line246:249的條件不滿足,說明該檔案事件物件沒有被觸發或者被觸發但是已經呼叫其fileProc
處理過了.這時Line269檢查連結串列中下一個檔案事件物件.
對於初始啟動的redis伺服器, 只有新的客戶端連線會觸發檔案事件物件,該物件的fileProc
對應的處理函式為acceptHandler
(見redis啟動初始化過程說明),該函式的實現為(redis.c):
971 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
972 int cport, cfd;
973 char cip[128];
974 REDIS_NOTUSED(el);
975 REDIS_NOTUSED(mask);
976 REDIS_NOTUSED(privdata);
977
978 cfd = anetAccept(server.neterr, fd, cip, &cport);
979 if (cfd == AE_ERR) {
980 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
981 return;
982 }
983 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
984 if (createClient(cfd) == REDIS_ERR) {
985 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
986 close(cfd); /* May be already closed, just ingore errors */
987 return;
988 }
989 }
Line978呼叫anetAccept
返回新連線的客戶端的socket檔案描述符,這時候的accept系統呼叫不會阻塞,因為是新客戶端的連線觸發了select系統呼叫.Line984呼叫createClient
為該客戶端建立對應的redisClient
型別的物件.
函式createClient
的實現為(redis.c):
933 static int createClient(int fd) {
934 redisClient *c = malloc(sizeof(*c));
935
936 anetNonBlock(NULL,fd);
937 anetTcpNoDelay(NULL,fd);
938 if (!c) return REDIS_ERR;
939 selectDb(c,0);
940 c->fd = fd;
941 c->querybuf = sdsempty();
942 c->argc = 0;
943 c->bulklen = -1;
944 c->sentlen = 0;
945 c->lastinteraction = time(NULL);
946 if ((c->reply = listCreate()) == NULL) oom("listCreate");
947 listSetFreeMethod(c->reply,decrRefCount);
948 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
949 readQueryFromClient, c, NULL) == AE_ERR) {
950 freeClient(c);
951 return REDIS_ERR;
952 }
953 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
954 return REDIS_OK;
955 }
首先看一下redisClient
結構,其標識一個客戶端,其定義為(redis.c):
69 typedef struct redisClient {
70 int fd;
71 dict *dict;
72 sds querybuf;
73 sds argv[REDIS_MAX_ARGS];
74 int argc;
75 int bulklen; /* bulk read len. -1 if not in bulk read mode */
76 list *reply;
77 int sentlen;
78 time_t lastinteraction; /* time of the last interaction, used for timeout */
79 } redisClient;
欄位fd
是客戶端的socket描述符,欄位dict指向全域性物件server
中的dict
物件欄位,server
預設有16個dict
物件.新的客戶端連線到來時,會選擇server
的第0個dict
物件.dict
型別是描述記憶體中的儲存資料的結構.欄位querybuf
用來儲存客戶端協議命令及其引數,而欄位argc
和argv
代表解析後的協議命令的引數個數和具體的引數值.型別sds
是一個char *型別的指標,用來和型別sdshdr
配合使用,以實現動態string型別物件.欄位bulklen
標識bulk型別協議命令中value的位元組長度,欄位reply
是list
型別的物件,記錄redis響應命令,需要注意的是redis伺服器傳送給客戶端的響應的各個組成部分可能是多個網路傳輸,所以這個是一個連結串列記錄響應的各個組成部分.自段sentlen
是再發送響應時,實際向socket寫入的資料,用於根據響應命令資料是否傳送完成.欄位lastinteraction
標識該客戶端和redis伺服器最近的互動時間戳, 如果客戶端長時間沒有和redis伺服器進行互動, redis伺服器有可能會中斷同該客戶端的連線.
回到函式createClient
中,Line941為欄位querybuf
建立一個空的sds
物件. 空的sds
由函式sdsempty
建立, 非空的sds
由函式sdsnew
建立.為欄位reply
建立一個list
型別的物件,該物件初始狀態下, 連結串列節點為空.Line948:952呼叫函式aeCreateFileEvent
為該客戶端建立一個檔案事件物件,插入到全域性的檔案事件管理連結串列中.redis協議命令有客戶端傳送到伺服器,伺服器向客戶端傳送響應結果, 因此設定該檔案事件物件的觸發型別為AE_READABLE, 並且其觸發後的處理函式為readQueryFromClient
.Line953將該客戶端物件插入到全域性連結串列中.
Line240:273中的檔案事件處理完畢後, Line274:309將處理定時事件處理邏輯.和檔案事件處理類似,定時事件也是迴圈檢查全域性的定時時間物件連結串列,Line286:289判斷該定時事件物件是否超時,如果已經超時,則處理該定時事件物件,如果未超時,檢查全域性連結串列中的下一個定時事件物件.Line293為呼叫該定時事件物件的處理函式,函式指標timeProc
指向的是serverCron
(見redis初始化處理部分).該函式的實現為(redis.c);
466 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
467 int j, size, used, loops = server.cronloops++;
468 REDIS_NOTUSED(eventLoop);
469 REDIS_NOTUSED(id);
470 REDIS_NOTUSED(clientData);
471
472 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
473 * we resize the hash table to save memory */
474 for (j = 0; j < server.dbnum; j++) {
475 size = dictGetHashTableSize(server.dict[j]);
476 used = dictGetHashTableUsed(server.dict[j]);
477 if (!(loops % 5) && used > 0) {
478 redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size);
479 // dictPrintStats(server.dict);
480 }
481 if (size && used && size > REDIS_HT_MINSLOTS &&
482 (used*100/size < REDIS_HT_MINFILL)) {
483 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
484 dictResize(server.dict[j]);
485 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
486 }
487 }
488
489 /* Show information about connected clients */
490 if (!(loops % 5)) redisLog(REDIS_DEBUG,"%d clients connected",listLength(server.clients));
491
492 /* Close connections of timedout clients */
493 if (!(loops % 10))
494 closeTimedoutClients();
495
496 /* Check if a background saving in progress terminated */
497 if (server.bgsaveinprogress) {
498 int statloc;
499 if (wait4(-1,&statloc,WNOHANG,NULL)) {
500 int exitcode = WEXITSTATUS(statloc);
501 if (exitcode == 0) {
502 redisLog(REDIS_NOTICE,
503 "Background saving terminated with success");
504 server.dirty = 0;
505 server.lastsave = time(NULL);
506 } else {
507 redisLog(REDIS_WARNING,
508 "Background saving error");
509 }
510 server.bgsaveinprogress = 0;
511 }
512 } else {
513 /* If there is not a background saving in progress check if
514 * we have to save now */
515 time_t now = time(NULL);
516 for (j = 0; j < server.saveparamslen; j++) {
517 struct saveparam *sp = server.saveparams+j;
518
519 if (server.dirty >= sp->changes &&
520 now-server.lastsave > sp->seconds) {
521 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
522 sp->changes, sp->seconds);
523 saveDbBackground("dump.rdb");
524 break;
525 }
526 }
527 }
528 return 1000;
529 }
Line467更新全域性的計數值,記錄定時事件處理的次數.Line472:487依次檢查server.dbnum
個(預設是16)記憶體資料庫(是一個雜湊表資料結構),Line475:476獲得該記憶體資料庫的size和used的值,這裡的size也就是哈系表結構中的BUCKETS.在redis使用過程中,used可能小於,等於或者大於size的值.Line481:486檢查如果雜湊表比較稀疏,將呼叫dictResize
調整其容量,該函式的實現為(dict.c):
113 /* Resize the table to the minimal size that contains all the elements,
114 * but with the invariant of a USER/BUCKETS ration near to <= 1 */
115 int dictResize(dict *ht)
116 {
117 int minimal = ht->used;
118
119 if (minimal < DICT_HT_INITIAL_SIZE)
120 minimal = DICT_HT_INITIAL_SIZE;
121 return dictExpand(ht, minimal);
122 }
調整的規則是,使size大小盡可能接近與used. 在函式dictExpand
會看到, size的值是2的冪,所以sized可能大於或者等於used.函式dictExpand
在處理redis協議命令時再具體分析.
回到函式serverCron
, Line493:494嘗試終止空閒的客戶端連線,函式closeTimedoutClients
的實現為(redis.c):
448 void closeTimedoutClients(void) {
449 redisClient *c;
450 listIter *li;
451 listNode *ln;
452 time_t now = time(NULL);
453
454 li = listGetIterator(server.clients,AL_START_HEAD);
455 if (!li) return;
456 while ((ln = listNextElement(li)) != NULL) {
457 c = listNodeValue(ln);
458 if (now - c->lastinteraction > server.maxidletime) {
459 redisLog(REDIS_DEBUG,"Closing idle client");
460 freeClient(c);
461 }
462 }
463 listReleaseIterator(li);
464 }
首先Line454:455建立了一個連結串列遍歷器來訪問全域性的客戶端物件連結串列server.clients
,因為該連結串列是個list
型別,是個雙向連結串列,因此遍歷器在建立的時候可以指定從頭訪問該連結串列或者從尾部訪問該連結串列.Line456:462依次訪問連結串列中的客戶端物件,並檢查其空閒時間是否超過了系統配置時間server.maxidletime
(預設是5分鐘),如果是,則呼叫函式freeClient
終止該客戶端連線.最後Line463釋放動態建立的遍歷器物件.函式freeClient
的實現為(redis.c):
701 static void freeClient(redisClient *c) {
702 listNode *ln;
703
704 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
705 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
706 sdsfree(c->querybuf);
707 listRelease(c->reply);
708 freeClientArgv(c);
709 close(c->fd);
710 ln = listSearchKey(server.clients,c);
711 assert(ln != NULL);
712 listDelNode(server.clients,ln);
713 free(c);
714 }
Line704:705釋放該客戶端物件相關的檔案事件物件,對於不同的觸發型別AE_READABLE
和AE_WRITABLE
, 同一客戶端物件會建立不同的檔案事件物件.Line706:708是釋放該客戶端物件欄位指向的動態記憶體,以免記憶體洩漏.Line709關閉socket套接字,釋放TCP連線.Line710:712從全域性連結串列中刪除該客戶端物件.Line713釋放該客戶端物件的記憶體,因為客戶端物件都是由堆上動態分配的記憶體.
回到函式serverCron
, Line496:527是對於儲存記憶體資料到磁碟的相關處理.Line497檢查是否子程序正在寫磁碟操作,如果是,阻塞當前程序等待子程序寫磁碟操作結束.子程序成功結束後,當前程序會更新全域性物件server
,設定記憶體資料髒標識為0,設定寫磁碟時間戳為當前時間,並且清除正在寫磁碟標識.如果沒有子程序正在寫磁碟操作,Line516:526判斷當前是否可以進行非同步的寫磁碟操作,有三個預設的判斷標準,如果一小時之內記憶體資料庫發生過至少一次的更新操作或者如果五分鐘之內記憶體資料庫發生過至少100次的更新操作或者一分鐘之內記憶體資料庫至少發生過10000次的更系操作,滿足任何一個標準即呼叫函式saveDbBackground
進行非同步寫磁碟操作.該函式的實現為(redis.c):
1169 static int saveDbBackground(char *filename) {
1170 pid_t childpid;
1171
1172 if (server.bgsaveinprogress) return REDIS_ERR;
1173 if ((childpid = fork()) == 0) {
1174 /* Child */
1175 close(server.fd);
1176 if (saveDb(filename) == REDIS_OK) {
1177 exit(0);
1178 } else {
1179 exit(1);
1180 }
1181 } else {
1182 /* Parent */
1183 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1184 server.bgsaveinprogress = 1;
1185 return REDIS_OK;
1186 }
1187 return REDIS_OK; /* unreached */
1188 }
Line1173:1181建立子程序,並呼叫函式saveDb
進行寫磁碟操作,該函式返回後,結束子程序.Line1182:1185仍然處於可執行狀態,設定全域性的server.bgsaveinprogress
標識並返回.
函式saveDb的實現為(redis.c):
1055 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1056 static int saveDb(char *filename) {
1057 dictIterator *di = NULL;
1058 dictEntry *de;
1059 uint32_t len;
1060 uint8_t type;
1061 FILE *fp;
1062 char tmpfile[256];
1063 int j;
1064
1065 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1066 fp = fopen(tmpfile,"w");
1067 if (!fp) {
1068 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1069 return REDIS_ERR;
1070 }
1071 if (fwrite("REDIS0000",9,1,fp) == 0) goto werr;
1072 for (j = 0; j < server.dbnum; j++) {
1073 dict *d = server.dict[j];
1074 if (dictGetHashTableUsed(d) == 0) continue;
1075 di = dictGetIterator(d);
1076 if (!di) {
1077 fclose(fp);
1078 return REDIS_ERR;
1079 }
1080
1081 /* Write the SELECT DB opcode */
1082 type = REDIS_SELECTDB;
1083 len = htonl(j);
1084 if (fwrite(&type,1,1,fp) == 0) goto werr;
1085 if (fwrite(&len,4,1,fp) == 0) goto werr;
1086
1087 /* Iterate this DB writing every entry */
1088 while((de = dictNext(di)) != NULL) {
1089 sds key = dictGetEntryKey(de);
1090 robj *o = dictGetEntryVal(de);
1091
1092 type = o->type;
1093 len = htonl(sdslen(key));
1094 if (fwrite(&type,1,1,fp) == 0) goto werr;
1095 if (fwrite(&len,4,1,fp) == 0) goto werr;
1096 if (fwrite(key,sdslen(key),1,fp) == 0) goto werr;
1097 if (type == REDIS_STRING) {
1098 /* Save a string value */
1099 sds sval = o->ptr;
1100 len = htonl(sdslen(sval));
1101 if (fwrite(&len,4,1,fp) == 0) goto werr;
1102 if (sdslen(sval) &&
1103 fwrite(sval,sdslen(sval),1,fp) == 0) goto werr;
1104 } else if (type == REDIS_LIST) {
1105 /* Save a list value */
1106 list *list = o->ptr;
1107 listNode *ln = list->head;
1108
1109 len = htonl(listLength(list));
1110 if (fwrite(&len,4,1,fp) == 0) goto werr;
1111 while(ln) {
1112 robj *eleobj = listNodeValue(ln);
1113 len = htonl(sdslen(eleobj->ptr));
1114 if (fwrite(&len,4,1,fp) == 0) goto werr;
1115 if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
1116 goto werr;
1117 ln = ln->next;
1118 }
1119 } else if (type == REDIS_SET) {
1120 /* Save a set value */
1121 dict *set = o->ptr;
1122 dictIterator *di = dictGetIterator(set);
1123 dictEntry *de;
1124
1125 if (!set) oom("dictGetIteraotr");
1126 len = htonl(dictGetHashTableUsed(set));
1127 if (fwrite(&len,4,1,fp) == 0) goto werr;
1128 while((de = dictNext(di)) != NULL) {
1129 robj *eleobj;
1130
1131 eleobj = dictGetEntryKey(de);
1132 len = htonl(sdslen(eleobj->ptr));
1133 if (fwrite(&len,4,1,fp) == 0) goto werr;
1134 if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
1135 goto werr;
1136 }
1137 dictReleaseIterator(di);
1138 } else {
1139 assert(0 != 0);
1140 }
1141 }
1142 dictReleaseIterator(di);
1143 }
1144 /* EOF opcode */
1145 type = REDIS_EOF;
1146 if (fwrite(&type,1,1,fp) == 0) goto werr;
1147 fclose(fp);
1148
1149 /* Use RENAME to make sure the DB file is changed atomically only
1150 * if the generate DB file is ok. */
1151 if (rename(tmpfile,filename) == -1) {
1152 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1153 unlink(tmpfile);
1154 return REDIS_ERR;
1155 }
1156 redisLog(REDIS_NOTICE,"DB saved on disk");
1157 server.dirty = 0;
1158 server.lastsave = time(NULL);
1159 return REDIS_OK;
1160
1161 werr:
1162 fclose(fp);
1163 unlink(tmpfile);
1164 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1165 if (di) dictReleaseIterator(di);
1166 return REDIS_ERR;
1167 }
在分析寫磁碟操作之前,說明一下redis記憶體資料庫的邏輯結構,redis是一個儲存key/value的NoSQL型別的資料庫,key的型別是string,redis中的string其實就是二進位制,只不過不不包含’\n’和空格,因為這兩個位元組被用作解析redis協議命令的分隔符.value支援三種類型,包括string
,list
和set
,其中list
和set
是複合型別.redis的記憶體資料庫的邏輯結構由全域性物件server
的dict
欄位管理,涉及到的相關型別有dict
,dictType
,sds
,sdshdr
,robj
型別dict的定義為(dict.h):
31 typedef struct dict {
32 dictEntry **table;
33 dictType *type;
34 unsigned int size;
35 unsigned int sizemask;
36 unsigned int used;
37 void *privdata;
38 } dict;
欄位table
組織成一個雜湊表結構,其BUCKETS的數量即欄位size
的值,size
的取值為16,32,64數列.欄位sizemask
的值為size-1. 欄位used
為雜湊表中節點的個數,需要說明的是節點的個數指的是key的個數,因為節點有可能是複合型別list
和set
,同一個key下面有多個子節點.欄位type
包含一組函式指標,用於操作雜湊表中的節點.
型別dictType
的定義(dict.h):
22 typedef struct dictType {
23 unsigned int (*hashFunction)(const void *key);
24 void *(*keyDup)(void *privdata, const void *key);
25 void *(*valDup)(void *privdata, const void *obj);
26 int (*keyCompare)(void *privdata, const void *key1, const void *key2);
27 void (*keyDestructor)(void *privdata, void *key);
28 void (*valDestructor)(void *privdata, void *obj);
29 } dictType;
這一組函式指標用來操作雜湊表中的節點,比如雜湊函式hashFunction
會根據key的值對映到指定索引的BUCKET上,其他節點操作還包括key和value的複製,key的比較,key和value的銷燬操作等.
型別dictEntry
的定義為(dict.h):
16 typedef struct dictEntry {
17 void *key;
18 void *val;
19 struct dictEntry *next;
20 } dictEntry;
型別dictEntry
標識雜湊表中的節點.當不同節點的key
具有相同的雜湊值,這些衝突節點就以連結串列的形式組織起來.欄位key
的型別為sds
,欄位value
的型別為robj
.欄位next
指向衝突連結串列中的下一個節點.
型別sds
其實就是一個char *指標,通常和型別sdshdr
配合使用. 該型別的定義為(sds.h):
33 typedef char *sds;
34
35 struct sdshdr {
36 long len;
37 long