1. 程式人生 > >自頂向下redis4.0(5)持久化

自頂向下redis4.0(5)持久化

# redis4.0的持久化 [toc] ## 簡介 雖然`redis`是記憶體資料庫,但它也提供了持久化的功能。其中`rdb`持久化可以定時備份用於回滾,而`aof`持久化則更接近資料庫最新的狀態,伺服器重啟後可以恢復至最新的狀態。兩者資料備份的粒度不同,`rdb`將整個資料庫備份,`aof`持久化粒度更為小,但生成的檔案更大。如果有多個執行緒同時向磁碟寫入,那麼會增大磁碟的壓力,最終導致執行緒阻塞,因此`redis`在同一時間只允許一個持久化向磁碟寫入資料。`redis`預設配置關閉`aof`持久化,開啟`rdb`後臺持久化。由於`aof`持久化資料較新,所以如果開啟了`aof`持久化,`redis`啟動時會選擇載入`aof`檔案中的資料。 ```bash # 預設關閉aof appendonly no # after 900 sec (15 min) if at least 1 key changed # after 300 sec (5 min) if at least 10 keys changed # after 60 sec if at least 10000 keys changed save 900 1 save 300 10 save 60 10000 ``` ## 正文 ### rdb持久化 `redis`允許`save`命令和`bgsave`命令,還支援配置定期儲存`rdb`資料。 #### save命令 `save`命令使用`saveCommand`函式直接呼叫`rdbSave`函式在主執行緒儲存資料,線上模式不建議使用。在進一步介紹之前,我們先看一眼相關的成員。 ```c struct redisServer { /* RDB persistence */ pid_t rdb_child_pid; /* PID of RDB saving child */ char *rdb_filename; /* Name of RDB file */ long long dirty; /* Changes to DB from the last rdb save */ time_t lastsave; /* Unix time of last successful save */ int lastbgsave_status; /* C_OK or C_ERR */ } ``` 如果已經有`rdb`子程序在執行,則會直接返回。如果沒有執行的子程序,則將資料儲存到`server.rdb_filename`檔案中,預設為`dump.rdb`。`rdbSave`函式會開啟一個臨時檔案,向其寫入資料後,重新整理資料到磁碟,然後重新命名這個臨時檔案為`dump.rdb`。然後重置`server.dirty`為`0`,設定`lastsave`時間。 ```c void saveCommand(client *c) { if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); return; } if (rdbSave(server.rdb_filename,null) == C_OK) { addReply(c,shared.ok); } } ``` 具體寫入資料的操作位於`rdbSaveRio`,它會先寫入`rdb`的版本,再寫入一些輔助資訊,然後將每個`db`中的資料寫入,最後寫入校驗碼。 #### bgsave命令 `bgsave`命令會呼叫`fork`函式開啟子程序,在子程序中呼叫`rdbSave`函式。 和`save`命令相同,如果有正在執行的子程序在儲存資料,則會返回錯誤提示。但如果使用`bgsave schedule`命令並且當前的子程序為`aof`,則可以延遲呼叫`bgsave`命令。 ```c struct redisServer { ... /* RDB persistence */ pid_t rdb_child_pid; /* PID of RDB saving child */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { int process_type; /* AOF or RDB child? */ size_t cow_size; /* Copy on write size. */ unsigned long long magic; /* Magic value to make sure data is valid. */ } child_info_data; ... }; ``` 後臺啟動`rdb`就是呼叫`fork`函式建立一個子程序,在子程序中呼叫`rdbSave`函式。在呼叫`fork`函式之前,`redis`會先建立一個管道用於子程序向父程序的單向通訊,`fork`後的子程序會和父程序共享檔案描述符,所以可以通過管道檔案描述符單向通訊。在子程序儲存`db`資料的時候,會修改記憶體空間,造成`copy-on-write`,佔用額外的記憶體空間,資料儲存完成後,子程序會向父程序傳送額外建立的記憶體大小。 > ``` > fork(2) > * The child inherits copies of the parent's set of open file > descriptors. Each file descriptor in the child refers to the same > open file description (see open(2)) as the corresponding file > descriptor in the parent. This means that the two file > descriptors share open file status flags, file offset, and signal- > driven I/O attributes (see the description of F_SETOWN and > F_SETSIG in fcntl(2)). > ``` ```c int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; long long start; if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; openChildInfoPipe(); // 建立管道 start = ustime(); if ((childpid = fork()) == 0) { //子程序 int retval; closeListeningSockets(0); //因為會繼承檔案描述符,所以此處關閉套接字連線 redisSetProcTitle("redis-rdb-bgsave"); retval = rdbSave(filename,rsi); if (retval == C_OK) { size_t private_dirty = zmalloc_get_private_dirty(-1); server.child_info_data.cow_size = private_dirty; sendChildInfo(CHILD_INFO_TYPE_RDB); } exitFromChild((retval == C_OK) ? 0 : 1); } else { //父程序 serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; updateDictResizePolicy(); return C_OK; } return C_OK; /* unreached */ } ``` 父程序此時記錄子程序id `rdb_child_pid`和型別。然後在之前註冊的時間事件`serverCron`中檢查子程序是否結束。`wait3`等待子程序的狀態傳送改變,可能是執行結束了,也可能是被訊號量暫停或者恢復了。如果子程序已經結束則接受子程序通過管道傳送的資訊,也就是`Copy-On-Write`的大小。然後關閉管道。 ```c int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... //如果有子程序在全量儲存資料 if (server.rdb_child_pid != -1|| server.aof_child_pid != -1 || ldbPendingChildren()) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } updateDictResizePolicy(); closeChildInfoPipe(); } } } ``` 由於我們此處是`RDB`儲存(與之相對的是`AOF`重寫,但如果開啟`RDB`格式儲存,兩者幾乎等價),`backgroundSaveDoneHandler`會呼叫`backgroundSaveDoneHandlerDisk`函式。這裡會將`rdb_child_pid`等資料重置,如果儲存成功,則更新`server.dirty`以及`lastsave`。 ```c void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { serverLog(LL_NOTICE, "Background saving terminated with success"); server.dirty = server.dirty - server.dirty_before_bgsave; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; } else if (!bysignal && exitcode != 0) { serverLog(LL_WARNING, "Background saving error"); server.lastbgsave_status = C_ERR; } else { mstime_t latency; serverLog(LL_WARNING, "Background saving terminated by signal %d", bysignal); latencyStartMonitor(latency); rdbRemoveTempFile(server.rdb_child_pid); latencyEndMonitor(latency); latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency); /* SIGUSR1 is whitelisted, so we have a way to kill a child without * tirggering an error conditon. */ if (bysignal != SIGUSR1) server.lastbgsave_status = C_ERR; } server.rdb_child_pid = -1; server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start; server.rdb_save_time_start = -1; } ``` #### rdb定期儲存資料 `redis`預設新增3個定期儲存引數,如果使用`redis.conf`,則會清空預設配置使用`redis.conf`配置。如果`redis.conf`中沒有配置,則不會使用`rdb`定期儲存。 ```c appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ ``` 同樣是在`serverCron`函式中,如果當前沒有`aof`或者`rdb`子程序儲存資料,則會檢測條件是否滿足。如果(距離上一次寫入的時間和資料變更的數量滿足條件)**並且**(上一次寫入成功或者距離上一次寫入已經超過5秒鐘,預設的`CONFIG_BGSAVE_RETRY_DELAY`值) ,則啟動`rdb`序列化。 ```c if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { ... } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || // 值為5 server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } } /* Trigger an AOF rewrite if needed. */ ... } ``` #### 程序結束儲存資料 在`redis`正常關閉的情況下(接受客戶端`shutdown`命令或者是收到`terminal`訊號),會呼叫`prepareForShutdown`函式。該函式會關閉正在儲存的子程序。如果有配置定期儲存`rdb`或者是關閉時有傳入`save`引數,則會在主執行緒中呼叫`rdbSave`儲存資料等,接著關閉程序。 可以看到在使用`rdb`儲存資料之前,如果開啟了`AOF`,那麼`redis`會呼叫`flushAppendOnlyFile`強制將資料寫入磁碟,並呼叫`aof_fsync`保證資料重新整理。 ```c int prepareForShutdown(int flags) { int save = flags & SHUTDOWN_SAVE; int nosave = flags & SHUTDOWN_NOSAVE; serverLog(LL_WARNING,"User requested shutdown..."); /* Kill all the Lua debugger forked sessions. */ ldbKillForkedSessions(); /* Kill the saving child if there is a background saving in progress. We want to avoid race conditions, for instance our saving child may overwrite the synchronous saving did by SHUTDOWN. */ if (server.rdb_child_pid != -1) { serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!"); kill(server.rdb_child_pid,SIGUSR1); rdbRemoveTempFile(server.rdb_child_pid); } if (server.aof_state != AOF_OFF) { /* Kill the AOF saving child as the AOF we already have may be longer * but contains the full dataset anyway. */ if (server.aof_child_pid != -1) { /* If we have AOF enabled but haven't written the AOF yet, don't * shutdown or else the dataset will be lost. */ if (server.aof_state == AOF_WAIT_REWRITE) { serverLog(LL_WARNING, "Writing initial AOF, can't exit."); return C_ERR; } serverLog(LL_WARNING, "There is a child rewriting the AOF. Killing it!"); kill(server.aof_child_pid,SIGUSR1); } /* Append only file: flush buffers and fsync() the AOF at exit */ serverLog(LL_NOTICE,"Calling fsync() on the AOF file."); flushAppendOnlyFile(1); aof_fsync(server.aof_fd); } /* Create a new RDB file before exiting. */ if ((server.saveparamslen > 0 && !nosave) || save) { serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); /* Snapshotting. Perform a SYNC SAVE and exit */ rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSave(server.rdb_filename,rsiptr) != C_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background * saving aborted, handling special stuff like slaves pending for * synchronization... */ serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); return C_ERR; } } /* Remove the pid file if possible and needed. */ if (server.daemonize || server.pidfile) { serverLog(LL_NOTICE,"Removing the pid file."); unlink(server.pidfile); } /* Best effort flush of slave output buffers, so that we hopefully * send them pending writes. */ flushSlavesOutputBuffers(); /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Redis"); return C_OK; } ``` ### aof持久化 #### 資料緩衝區 上文已經提到,`redis`在解析客戶端請求到`client-argc`和`client-argv`後會呼叫`processCommand`檢查請求命令的條件是否滿足,如果滿足,則會呼叫`call(client, CMD_CALL_FULL)`。 ```c /* Command call flags, see call() function */ #define CMD_CALL_NONE 0 #define CMD_CALL_SLOWLOG (1<<0) #define CMD_CALL_STATS (1<<1) #define CMD_CALL_PROPAGATE_AOF (1<<2) #define CMD_CALL_PROPAGATE_REPL (1<<3) #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL) #define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) ``` 在這裡,我們觀察一下`CMD_CALL_FULL`,此時我們只需要知道,該值包含`CMD_CALL_PROPAGATE`。在呼叫完命令後,`redis`會根據情況將命令追加到`server->aof_buf`中,如果**資料有發生改動**,命令沒有禁止propagate,並且**`redis`開啟了`aof`**,則會將命令追加到緩衝區。 ```c call(client *c, int flags) { c->cmd->proc(c); //已經執行命令 /* Propagate the command into the AOF and replication link */ if (flags & CMD_CALL_PROPAGATE && // flag 就是 CMD_CALL_FULL (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; //如果指令有造成資料變化 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); //有些命令強制propagete, 比如publishMessage if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; //有些命令禁止在此處propagate,比如spop,會在其他函式操作 if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; /* Call propagate() only if at least one of AOF / replication * propagation is needed. Note that modules commands handle replication * in an explicit way, so we never replicate them automatically. */ if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } } void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } ``` 在追加命令之前,`redis`還會做一些處理,如果命令對應的`db`和上次追加命令的`db`不同,則插入`select`命令 。如果是`expire`系列的命令,則全部切換成`pexpireat`命令。如果是`setex`命令,則拆分成`set`和`pexpireat`。如果此時沒有子程序在重寫,則寫入到緩衝區,如果有子程序在重寫,則嘗試將資料傳送給子程序。 ```c void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setCommand && argc > 3) { int i; robj *exarg = NULL, *pxarg = NULL; /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */ buf = catAppendOnlyGenericCommand(buf,3,argv); for (i = 3; i < argc; i ++) { if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1]; if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1]; } serverAssert(!(exarg && pxarg)); if (exarg) buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1], exarg); if (pxarg) buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1], pxarg); } else { buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); } ``` #### 重新整理資料到磁碟 ```bash appendonly no #關閉aof # 開啟aof後生效 # appendfsync always #aof 磁碟重新整理策略 appendfsync everysec # appendfsync no ``` `redis`預設關閉`aof`,如果關閉`aof`則`server->aof_buf`不會包含任何資料,只有開啟了`aof`,也就是`appendonly yes`,才會往`aof`中寫入資料。 在配置`appendonly yes`之後,`appendfsync`配置才會生效,`redis`預設配置為`everysec`,也就是每秒嘗試後臺執行緒**重新整理**資料到磁碟,但**寫入資料還是主執行緒寫入的**,只要有資料且沒有子執行緒在寫入資料,就會寫入資料。 `redis`重新整理磁碟的操作也放在`beforeSleep`中處理。如果讀者看過該系列之前的文章,應該記得`redis`返回客戶端資料並不是直接傳送給客戶端,而是先將資料儲存在`client->buf`中,然後在下一輪的`aeMainLoop`前的`beforeSleep`函式中呼叫`handleClientsWithPendingWrites`, 將資料返回給客戶端。這樣做的目的是為了相容`appendfysync always`的效果。所以在`beforeSleep`函式中,重新整理函式`flushAppendOnlyFile`位於`handleClientsWithPendingWrites`之前。 ```c void beforeSleep(struct aeEventLoop *eventLoop) { ... /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); } ``` 重新整理資料也有3種策略,下文會按照`no`,`always`,`everysec`的順序結合原始碼講解。 ##### appendfsync no 在不保證重新整理的策略下,`redis`也會呼叫`flushAppendOnly`函式就等於直接呼叫`aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));`將資料寫入系統緩衝區,但檔案是否重新整理到磁碟,以及什麼時候重新整理由系統決定。由於呼叫`aofWrite`可能會遇到磁碟空間不夠的問題,`redis`會對比傳入的資料長度和寫入的資料長度,如果沒有全部寫入,為了保證下一次載入`aof`檔案能夠順利,**`reids`會裁剪掉部分寫入的資料,等待下次重新寫入。**如果裁剪失敗,則縮減`aof_buf`的長度,刪除`aof_buf`中已經寫入的部分,下次從最新的地方開始寫入。並且如果寫入系統緩衝區傳送問題,則會在處理完問題後返回,而不會呼叫`aof_sync`等重新整理磁碟的函式。 ```c void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; mstime_t latency; if (sdslen(server.aof_buf) == 0) return; nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (ssize_t)sdslen(server.aof_buf)) { static time_t last_write_error_log = 0; //有寫入資料 if (nwritten != -1) { //將剛才寫入的資料裁剪掉 //todo what will happen if system ftruncate the file some part is still in the memory not yet flushed to the disk if (ftruncate(server.aof_fd, server.aof_current_size) != -1) { //裁剪成功 nwritten = -1; } server.aof_last_write_errno = ENOSPC; } server.aof_last_write_status = C_ERR; //如果裁剪失敗 if (nwritten > 0) { server.aof_current_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } server.aof_current_size += nwritten; if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } //下面是重新整理磁碟的操作 } ``` ##### appendfysnc always `always`模式保證客戶端接受返回資料後,`redis`一定已經將資料變化重新整理回磁碟。採用該模式相當於`redis`在主執行緒中呼叫完`aofWrite`函式後,緊接著呼叫了`aof_sync`函式,也就是`fsync`系列的函式。該模式迫使`redis`在主執行緒訪問磁碟,會導致效能極具下降。並且`always`的容錯性較差,**如果`aofWrite`沒有將`aof_buf`中的全部資料寫入,`redis`會立刻退出。** ![磁碟 記憶體 訪問時間](https://img-blog.csdnimg.cn/20190516233655801.png) ##### appendfysnc everysec 每秒重新整理一次資料到磁碟是`redis`的預設配置,它會嘗試每秒重新整理檔案到磁碟。由於`flushAppendOnlyFile`在`serverCron`中被呼叫,而`serverCron`的頻率為10次/秒,所以`redis`預設寫入資料的頻率和重新整理資料的頻率為10:1。如果開啟了`aof_no_fsync_on_rewrite`,則不會在有子程序全量儲存的時候(包括`rdb`儲存和`aof`重寫)同步增量`aof`資料。 ```c void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; mstime_t latency; if (sdslen(server.aof_buf) == 0) return; // 檢視是否有子執行緒在同步資料 if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (sync_in_progress) { //如果有另外的執行緒在寫入資料,則等待一個postponed的迴圈和2秒 if (server.aof_flush_postponed_start == 0) { server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { return; } //如果還沒有處理完,則繼續寫入,實際上會阻塞 } } nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); server.aof_flush_postponed_start = 0; if (nwritten != (ssize_t)sdslen(server.aof_buf)) { //上文已經介紹,如果寫入的資料不全,則返回 ... return; /* We'll try again on the next call... */ } //此時資料已寫入系統緩衝區,重新整理`aof_buf`的緩衝區 sdsfree(server.aof_buf); server.aof_buf = sdsempty(); /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } } ``` `redis`在將資料寫入磁碟時,會在主執行緒呼叫`write`函式,然後在另外的執行緒中呼叫`fsync`函式。這樣能夠讓另外一個執行緒阻塞在`IO`上而不影響主執行緒的操作,但需要注意的是如果另一個執行緒`fsync`函式如果沒有返回,主執行緒就呼叫`write`函式,那麼主執行緒也會阻塞在`write`函式上。[4] 《Redis開發與運維》[3]中提到 > 通過對AOF阻塞流程可以發現兩個問題: > > 1) everysec配置最多可能丟失2秒資料, 不是1秒 > > 2) 如果系統fsync緩慢, 將會導致Redis主執行緒阻塞影響效率。 實際上在`redis`4.0版本中,`everysec`配置最多可能丟失2秒加上一個`aeMainLoop`迴圈的時間。雖然《Redis開發與運維》指出了兩個問題,但實際上它們是同一個問題,那就是**磁碟寫入速度無法承受過量的資料**。在使用`everysec`配置時,如果發生這個問題,`redis`首先考慮主執行緒的執行,如果距離上一次延遲寫入的時間戳`aof_flush_postponed_start`小於2秒,那麼先跳過這一次的寫入,避免阻塞以保證主執行緒能夠處理請求。如果2秒後資料還沒有從緩衝區重新整理到磁碟,那麼將會呼叫`aofWrite`導致主執行緒阻塞。 ### aof重寫 #### aof重寫的配置 `aof`重寫可以輸入指令觸發`bgrewriteaof`,也可以配置條件觸發重寫。 ```bash auto-aof-rewrite-min-size 64mb auto-aof-rewrite-percentage 100 ``` 僅僅這兩個配置還不能瞭解清楚`redis`何時重寫,我們還需要有`aof_current_size`和`aof_base_size`,`aof_current_size`就是`aof`檔案當前的大小,`redis`啟動載入`aof`檔案或者每次`aof`追加資料都會更新這個值,這個值並不會儲存到磁碟中,`aof_base_size`也是同理,如果啟動時有載入`aof`檔案,那麼`aof_base_size`的值就是`aof`檔案的大小。 當`aof_current_size`>`auto-aof-rewrite-min-size`並且有配置`auto-aof-rewrite-percentage`時,如果(`aof_current_size`-`aof_base_size`)/`100` >= `percentage`,則會自動重寫。比如按照上文的配置,`redis`啟動時載入的`aof`檔案大小為`100mb`,那麼`aof_base_size`就是`100mb`,當`redis`檔案增長到`200mb`的時候就會自動重寫。 但是會存在這樣一種情況,`redis`檔案增長到`199mb`的時候,剛好重啟了,那麼下次啟動的時候,`aof_base_size`就和`aof_current_size`大小相等,想要觸發自動重寫,就要等到`redis`檔案大小增長到`400mb`左右。**如果資料增長地比較緩慢,或者是百分比配置較大**。在觸發重寫之前,`redis`就關閉或者重啟了。那麼`aof_base_size`下次啟動的時候會被重新整理成`aof_current_size`的大小,**導致可能永遠無法觸發自動重寫。** #### aof重寫的優先順序 `aof`重寫的優先順序低於`rdb`,如果兩者的觸發條件同時滿足,`redis`會優先處理`rdb`儲存。觀察原始碼,可以發現`rdb`儲存先於`aof`,如果`rdb`此處觸發,即使`aof`觸發重寫的條件滿足,因為`server.rdb_child_pid`將不為`-1`,導致無法進入`aof`重寫。 ```c serverCron(aeEventLoop*, longlong, void*) { if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { //... 檢查子程序是否結束並處理。 } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j++) { ... //..處理rdb自動儲存 } /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } } ``` #### aof 重寫的來龍去脈 `rewriteAppendOnlyFileBackground`會建立許多管道用於父子間通訊。 - `childInfoPipe`用於子程序向父程序提示有多少個`Copy-On-Write`記憶體。 - `aof_pipe_write_data_to_child`用於父程序向`aof`重寫子程序傳送最近的資料變更。 - `aof_pipe_write_ack_to_parent`和`aof_pipe_write_ack_to_child`用於等待彼此的確認訊息。 並且註冊了`aof_pipe_read_ack_from_child`的檔案事件,**當子程序向父程序傳送中止請求的時候,就會呼叫`aof_pipe_read_ack_from_child`函式。** ```c int aofCreatePipes(void) { int fds[6] = {-1, -1, -1, -1, -1, -1}; int j; if (pipe(fds) == -1) goto error; /* parent -> children data. */ if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */ if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */ /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; //注意: //這裡註冊了一個檔案事件 if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; server.aof_pipe_write_ack_to_parent = fds[3]; server.aof_pipe_read_ack_from_child = fds[2]; server.aof_pipe_write_ack_to_child = fds[5]; server.aof_pipe_read_ack_from_parent = fds[4]; server.aof_stop_sending_diff = 0; return C_OK; error: serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s", strerror(errno)); for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]); return C_ERR; } ``` --- **父程序** 建立完子程序後,父程序會更新`aof_child_pid`記錄子程序`id`,雖然只更新了一個欄位,但意味著已經開啟了一個很有可能影響`redis`效能的任務。 --- **子程序 **先向臨時檔案寫入當前資料庫的內容,如果開啟了`aof_use_rdb_preamble`(預設關閉,但建議開啟),那麼就會寫入`rdb`資料,也就是`db`資料全量儲存,否則按`aof`追加模式,全量儲存`db`中的內容,**接著重新整理資料到磁碟,阻塞。** ```c //in function rewriteAppendOnlyFile(char* filename) if (server.aof_use_rdb_preamble) { int error; if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } } else { if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; } /* Do an initial slow fsync here while the parent is still sending * data, in order to make the next final fsync faster. */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; ``` --- **父程序** 在`aof`子程序等待資料重新整理的時候,繼續處理請求,並且將資料追加到`server.aof_rewrite_buf_blocks`,如果沒有註冊`aof_pipe_write_data_to_child`(是個管道,也就是檔案描述符)檔案事件的話,會將該管道和`aofChildWriteDiffData`繫結,如果管道可寫,則會將`server.aof_rewrite_buf_blocks`中的資料寫入管道傳送給子程序。**這樣保證了父程序不會因為向管道寫入資料而阻塞**。 ```c /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); aofrwblock *block = ln ? ln->value : NULL; while(len) { ... // 一直將資料寫入aof_rewrite_buf_block } //註冊檔案事件 if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); } } ``` --- **子程序** 重新整理完之前的資料後,會在1秒內一直讀取來自父程序的資料,將其寫入到`aof_child_diff`中。然後向父程序傳送**停發資料**請求。 ```c //in function rewriteAppendOnlyFile(char* filename) mstime_t start = mstime(); while(mstime()-start < 1000 && nodata < 20) { if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) { nodata++; continue; } nodata = 0; /* Start counting from zero, we stop on N *contiguous* timeouts. */ aofReadDiffFromParent(); } if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr; ``` --- **父程序** 在`aeMainLoop`中檢測到`aof_pipe_read_ack_from_child`管道可讀事件(在建立管道的時候註冊,請看前文),呼叫`aofChildPipeReadable`函式,將`aof_stop_sending_diff`設定為1,父程序不會再將`aof_rewrite_buf_blocks`緩衝區的內容寫給子程序。並向子程序傳送訊息表示已經收到停發請求。 --- **子程序** 接受到父程序的同意後,**最後讀取一次資料**,因為在父程序接受到停發請求前可能又傳送了資料。至此,停發請求前的額外`aof`增量資料都已寫入`aof_child_diff`。接著子程序將其寫入檔案並重新整理,退出子程序。 ```c if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') goto werr; aofReadDiffFromParent(); if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) goto werr; /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; ``` --- **父程序** 在`serverCron`函式中呼叫`wait3`檢測到`aof`重寫子程序的退出,會呼叫`backgroundRewriteDoneHandler`處理。 它首先會開啟之前儲存的臨時檔案,將中止請求後的追加資料`aof_rewrite_buf_blocks`寫入檔案(**注意:雖然子程序之前請求中止傳送資料了,但因為`rdb_child_pid`直到現在還是儲存的子程序的id,會一直接受追加資料到`aof_rewrite_buf_blocks`**)。此時已經將所有的資料都寫入`aof`臨時檔案。接下來就是將臨時檔案替換為`aof`儲存的檔名。 ### rdb對比aof 官網有一篇文章[《persistence》](https://redis.io/topics/persistence)已經做了比對,在此不再贅述。 ## 參考文獻 \[1\][《Redis 原始碼》](https://github.com/dewxin/redis) \[2][《Redis開發與運維》](https://book.douban.com/subject/26971561/) \[3\][《Redis設計與實現》](https://book.douban.com/subject/25900156/) \[4\][《fsync() on a different thread: apparently a useless trick》](http://oldblog.antirez.com/post/fsync-different-thread-useless.html) \[5\][《private dirty memory》](https://stackoverflow.com/questions/17594183/what-does-private-dirty-memory-mean-in-smaps) \[6\][《pipe(2) - Linux man page》](https://linux.die.net/man/2/pipe) \[7\][《wait3(2) - Linux man page》](https://linux.die.net/man/2/wait3) \[8\][《ftruncate(3) - Linux man page》](https://linux.die.net/man/3/ftruncate) \[9\][《Redis persistence》](https://redis.io/topics/pers