自頂向下redis4.0(5)持久化
阿新 • • 發佈:2020-12-18
# redis4.0的持久化
[toc]
## 簡介
雖然`redis`是記憶體資料庫,但它也提供了持久化的功能。其中`rdb`持久化可以定時備份用於回滾,而`aof`持久化則更接近資料庫最新的狀態,伺服器重啟後可以恢復至最新的狀態。兩者資料備份的粒度不同,`rdb`將整個資料庫備份,`aof`持久化粒度更為小,但生成的檔案更大。如果有多個執行緒同時向磁碟寫入,那麼會增大磁碟的壓力,最終導致執行緒阻塞,因此`redis`在同一時間只允許一個持久化向磁碟寫入資料。`redis`預設配置關閉`aof`持久化,開啟`rdb`後臺持久化。由於`aof`持久化資料較新,所以如果開啟了`aof`持久化,`redis`啟動時會選擇載入`aof`檔案中的資料。
```bash
# 預設關閉aof
appendonly no
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
save 900 1
save 300 10
save 60 10000
```
## 正文
### rdb持久化
`redis`允許`save`命令和`bgsave`命令,還支援配置定期儲存`rdb`資料。
#### save命令
`save`命令使用`saveCommand`函式直接呼叫`rdbSave`函式在主執行緒儲存資料,線上模式不建議使用。在進一步介紹之前,我們先看一眼相關的成員。
```c
struct redisServer {
/* RDB persistence */
pid_t rdb_child_pid; /* PID of RDB saving child */
char *rdb_filename; /* Name of RDB file */
long long dirty; /* Changes to DB from the last rdb save */
time_t lastsave; /* Unix time of last successful save */
int lastbgsave_status; /* C_OK or C_ERR */
}
```
如果已經有`rdb`子程序在執行,則會直接返回。如果沒有執行的子程序,則將資料儲存到`server.rdb_filename`檔案中,預設為`dump.rdb`。`rdbSave`函式會開啟一個臨時檔案,向其寫入資料後,重新整理資料到磁碟,然後重新命名這個臨時檔案為`dump.rdb`。然後重置`server.dirty`為`0`,設定`lastsave`時間。
```c
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSave(server.rdb_filename,null) == C_OK) {
addReply(c,shared.ok);
}
}
```
具體寫入資料的操作位於`rdbSaveRio`,它會先寫入`rdb`的版本,再寫入一些輔助資訊,然後將每個`db`中的資料寫入,最後寫入校驗碼。
#### bgsave命令
`bgsave`命令會呼叫`fork`函式開啟子程序,在子程序中呼叫`rdbSave`函式。
和`save`命令相同,如果有正在執行的子程序在儲存資料,則會返回錯誤提示。但如果使用`bgsave schedule`命令並且當前的子程序為`aof`,則可以延遲呼叫`bgsave`命令。
```c
struct redisServer {
...
/* RDB persistence */
pid_t rdb_child_pid; /* PID of RDB saving child */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
int process_type; /* AOF or RDB child? */
size_t cow_size; /* Copy on write size. */
unsigned long long magic; /* Magic value to make sure data is valid. */
} child_info_data;
...
};
```
後臺啟動`rdb`就是呼叫`fork`函式建立一個子程序,在子程序中呼叫`rdbSave`函式。在呼叫`fork`函式之前,`redis`會先建立一個管道用於子程序向父程序的單向通訊,`fork`後的子程序會和父程序共享檔案描述符,所以可以通過管道檔案描述符單向通訊。在子程序儲存`db`資料的時候,會修改記憶體空間,造成`copy-on-write`,佔用額外的記憶體空間,資料儲存完成後,子程序會向父程序傳送額外建立的記憶體大小。
> ```
> fork(2)
> * The child inherits copies of the parent's set of open file
> descriptors. Each file descriptor in the child refers to the same
> open file description (see open(2)) as the corresponding file
> descriptor in the parent. This means that the two file
> descriptors share open file status flags, file offset, and signal-
> driven I/O attributes (see the description of F_SETOWN and
> F_SETSIG in fcntl(2)).
> ```
```c
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
openChildInfoPipe(); // 建立管道
start = ustime();
if ((childpid = fork()) == 0) {
//子程序
int retval;
closeListeningSockets(0); //因為會繼承檔案描述符,所以此處關閉套接字連線
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
//父程序
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}
```
父程序此時記錄子程序id `rdb_child_pid`和型別。然後在之前註冊的時間事件`serverCron`中檢查子程序是否結束。`wait3`等待子程序的狀態傳送改變,可能是執行結束了,也可能是被訊號量暫停或者恢復了。如果子程序已經結束則接受子程序通過管道傳送的資訊,也就是`Copy-On-Write`的大小。然後關閉管道。
```c
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
//如果有子程序在全量儲存資料
if (server.rdb_child_pid != -1|| server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
}
updateDictResizePolicy();
closeChildInfoPipe();
}
}
}
```
由於我們此處是`RDB`儲存(與之相對的是`AOF`重寫,但如果開啟`RDB`格式儲存,兩者幾乎等價),`backgroundSaveDoneHandler`會呼叫`backgroundSaveDoneHandlerDisk`函式。這裡會將`rdb_child_pid`等資料重置,如果儲存成功,則更新`server.dirty`以及`lastsave`。
```c
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE,
"Background saving terminated with success");
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background saving error");
server.lastbgsave_status = C_ERR;
} else {
mstime_t latency;
serverLog(LL_WARNING,
"Background saving terminated by signal %d", bysignal);
latencyStartMonitor(latency);
rdbRemoveTempFile(server.rdb_child_pid);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = C_ERR;
}
server.rdb_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
}
```
#### rdb定期儲存資料
`redis`預設新增3個定期儲存引數,如果使用`redis.conf`,則會清空預設配置使用`redis.conf`配置。如果`redis.conf`中沒有配置,則不會使用`rdb`定期儲存。
```c
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
```
同樣是在`serverCron`函式中,如果當前沒有`aof`或者`rdb`子程序儲存資料,則會檢測條件是否滿足。如果(距離上一次寫入的時間和資料變更的數量滿足條件)**並且**(上一次寫入成功或者距離上一次寫入已經超過5秒鐘,預設的`CONFIG_BGSAVE_RETRY_DELAY`值) ,則啟動`rdb`序列化。
```c
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
...
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY || // 值為5
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
/* Trigger an AOF rewrite if needed. */
...
}
```
#### 程序結束儲存資料
在`redis`正常關閉的情況下(接受客戶端`shutdown`命令或者是收到`terminal`訊號),會呼叫`prepareForShutdown`函式。該函式會關閉正在儲存的子程序。如果有配置定期儲存`rdb`或者是關閉時有傳入`save`引數,則會在主執行緒中呼叫`rdbSave`儲存資料等,接著關閉程序。
可以看到在使用`rdb`儲存資料之前,如果開啟了`AOF`,那麼`redis`會呼叫`flushAppendOnlyFile`強制將資料寫入磁碟,並呼叫`aof_fsync`保證資料重新整理。
```c
int prepareForShutdown(int flags) {
int save = flags & SHUTDOWN_SAVE;
int nosave = flags & SHUTDOWN_NOSAVE;
serverLog(LL_WARNING,"User requested shutdown...");
/* Kill all the Lua debugger forked sessions. */
ldbKillForkedSessions();
/* Kill the saving child if there is a background saving in progress.
We want to avoid race conditions, for instance our saving child may
overwrite the synchronous saving did by SHUTDOWN. */
if (server.rdb_child_pid != -1) {
serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (server.aof_state != AOF_OFF) {
/* Kill the AOF saving child as the AOF we already have may be longer
* but contains the full dataset anyway. */
if (server.aof_child_pid != -1) {
/* If we have AOF enabled but haven't written the AOF yet, don't
* shutdown or else the dataset will be lost. */
if (server.aof_state == AOF_WAIT_REWRITE) {
serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
return C_ERR;
}
serverLog(LL_WARNING,
"There is a child rewriting the AOF. Killing it!");
kill(server.aof_child_pid,SIGUSR1);
}
/* Append only file: flush buffers and fsync() the AOF at exit */
serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
flushAppendOnlyFile(1);
aof_fsync(server.aof_fd);
}
/* Create a new RDB file before exiting. */
if ((server.saveparamslen > 0 && !nosave) || save) {
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
/* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
* saving aborted, handling special stuff like slaves pending for
* synchronization... */
serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
return C_ERR;
}
}
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file.");
unlink(server.pidfile);
}
/* Best effort flush of slave output buffers, so that we hopefully
* send them pending writes. */
flushSlavesOutputBuffers();
/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
server.sentinel_mode ? "Sentinel" : "Redis");
return C_OK;
}
```
### aof持久化
#### 資料緩衝區
上文已經提到,`redis`在解析客戶端請求到`client-argc`和`client-argv`後會呼叫`processCommand`檢查請求命令的條件是否滿足,如果滿足,則會呼叫`call(client, CMD_CALL_FULL)`。
```c
/* Command call flags, see call() function */
#define CMD_CALL_NONE 0
#define CMD_CALL_SLOWLOG (1<<0)
#define CMD_CALL_STATS (1<<1)
#define CMD_CALL_PROPAGATE_AOF (1<<2)
#define CMD_CALL_PROPAGATE_REPL (1<<3)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
```
在這裡,我們觀察一下`CMD_CALL_FULL`,此時我們只需要知道,該值包含`CMD_CALL_PROPAGATE`。在呼叫完命令後,`redis`會根據情況將命令追加到`server->aof_buf`中,如果**資料有發生改動**,命令沒有禁止propagate,並且**`redis`開啟了`aof`**,則會將命令追加到緩衝區。
```c
call(client *c, int flags) {
c->cmd->proc(c); //已經執行命令
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE && // flag 就是 CMD_CALL_FULL
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
//如果指令有造成資料變化
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
//有些命令強制propagete, 比如publishMessage
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
//有些命令禁止在此處propagate,比如spop,會在其他函式操作
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
```
在追加命令之前,`redis`還會做一些處理,如果命令對應的`db`和上次追加命令的`db`不同,則插入`select`命令 。如果是`expire`系列的命令,則全部切換成`pexpireat`命令。如果是`setex`命令,則拆分成`set`和`pexpireat`。如果此時沒有子程序在重寫,則寫入到緩衝區,如果有子程序在重寫,則嘗試將資料傳送給子程序。
```c
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
```
#### 重新整理資料到磁碟
```bash
appendonly no #關閉aof
# 開啟aof後生效
# appendfsync always #aof 磁碟重新整理策略
appendfsync everysec
# appendfsync no
```
`redis`預設關閉`aof`,如果關閉`aof`則`server->aof_buf`不會包含任何資料,只有開啟了`aof`,也就是`appendonly yes`,才會往`aof`中寫入資料。
在配置`appendonly yes`之後,`appendfsync`配置才會生效,`redis`預設配置為`everysec`,也就是每秒嘗試後臺執行緒**重新整理**資料到磁碟,但**寫入資料還是主執行緒寫入的**,只要有資料且沒有子執行緒在寫入資料,就會寫入資料。
`redis`重新整理磁碟的操作也放在`beforeSleep`中處理。如果讀者看過該系列之前的文章,應該記得`redis`返回客戶端資料並不是直接傳送給客戶端,而是先將資料儲存在`client->buf`中,然後在下一輪的`aeMainLoop`前的`beforeSleep`函式中呼叫`handleClientsWithPendingWrites`, 將資料返回給客戶端。這樣做的目的是為了相容`appendfysync always`的效果。所以在`beforeSleep`函式中,重新整理函式`flushAppendOnlyFile`位於`handleClientsWithPendingWrites`之前。
```c
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
}
```
重新整理資料也有3種策略,下文會按照`no`,`always`,`everysec`的順序結合原始碼講解。
##### appendfsync no
在不保證重新整理的策略下,`redis`也會呼叫`flushAppendOnly`函式就等於直接呼叫`aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));`將資料寫入系統緩衝區,但檔案是否重新整理到磁碟,以及什麼時候重新整理由系統決定。由於呼叫`aofWrite`可能會遇到磁碟空間不夠的問題,`redis`會對比傳入的資料長度和寫入的資料長度,如果沒有全部寫入,為了保證下一次載入`aof`檔案能夠順利,**`reids`會裁剪掉部分寫入的資料,等待下次重新寫入。**如果裁剪失敗,則縮減`aof_buf`的長度,刪除`aof_buf`中已經寫入的部分,下次從最新的地方開始寫入。並且如果寫入系統緩衝區傳送問題,則會在處理完問題後返回,而不會呼叫`aof_sync`等重新整理磁碟的函式。
```c
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) return;
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
//有寫入資料
if (nwritten != -1) {
//將剛才寫入的資料裁剪掉
//todo what will happen if system ftruncate the file some part is still in the memory not yet flushed to the disk
if (ftruncate(server.aof_fd, server.aof_current_size) != -1) {
//裁剪成功
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
server.aof_last_write_status = C_ERR;
//如果裁剪失敗
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
server.aof_current_size += nwritten;
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//下面是重新整理磁碟的操作
}
```
##### appendfysnc always
`always`模式保證客戶端接受返回資料後,`redis`一定已經將資料變化重新整理回磁碟。採用該模式相當於`redis`在主執行緒中呼叫完`aofWrite`函式後,緊接著呼叫了`aof_sync`函式,也就是`fsync`系列的函式。該模式迫使`redis`在主執行緒訪問磁碟,會導致效能極具下降。並且`always`的容錯性較差,**如果`aofWrite`沒有將`aof_buf`中的全部資料寫入,`redis`會立刻退出。**
![磁碟 記憶體 訪問時間](https://img-blog.csdnimg.cn/20190516233655801.png)
##### appendfysnc everysec
每秒重新整理一次資料到磁碟是`redis`的預設配置,它會嘗試每秒重新整理檔案到磁碟。由於`flushAppendOnlyFile`在`serverCron`中被呼叫,而`serverCron`的頻率為10次/秒,所以`redis`預設寫入資料的頻率和重新整理資料的頻率為10:1。如果開啟了`aof_no_fsync_on_rewrite`,則不會在有子程序全量儲存的時候(包括`rdb`儲存和`aof`重寫)同步增量`aof`資料。
```c
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) return;
// 檢視是否有子執行緒在同步資料
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
if (sync_in_progress) {
//如果有另外的執行緒在寫入資料,則等待一個postponed的迴圈和2秒
if (server.aof_flush_postponed_start == 0) {
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
return;
}
//如果還沒有處理完,則繼續寫入,實際上會阻塞
}
}
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
//上文已經介紹,如果寫入的資料不全,則返回
...
return; /* We'll try again on the next call... */
}
//此時資料已寫入系統緩衝區,重新整理`aof_buf`的緩衝區
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
```
`redis`在將資料寫入磁碟時,會在主執行緒呼叫`write`函式,然後在另外的執行緒中呼叫`fsync`函式。這樣能夠讓另外一個執行緒阻塞在`IO`上而不影響主執行緒的操作,但需要注意的是如果另一個執行緒`fsync`函式如果沒有返回,主執行緒就呼叫`write`函式,那麼主執行緒也會阻塞在`write`函式上。[4]
《Redis開發與運維》[3]中提到
> 通過對AOF阻塞流程可以發現兩個問題:
>
> 1) everysec配置最多可能丟失2秒資料, 不是1秒
>
> 2) 如果系統fsync緩慢, 將會導致Redis主執行緒阻塞影響效率。
實際上在`redis`4.0版本中,`everysec`配置最多可能丟失2秒加上一個`aeMainLoop`迴圈的時間。雖然《Redis開發與運維》指出了兩個問題,但實際上它們是同一個問題,那就是**磁碟寫入速度無法承受過量的資料**。在使用`everysec`配置時,如果發生這個問題,`redis`首先考慮主執行緒的執行,如果距離上一次延遲寫入的時間戳`aof_flush_postponed_start`小於2秒,那麼先跳過這一次的寫入,避免阻塞以保證主執行緒能夠處理請求。如果2秒後資料還沒有從緩衝區重新整理到磁碟,那麼將會呼叫`aofWrite`導致主執行緒阻塞。
### aof重寫
#### aof重寫的配置
`aof`重寫可以輸入指令觸發`bgrewriteaof`,也可以配置條件觸發重寫。
```bash
auto-aof-rewrite-min-size 64mb
auto-aof-rewrite-percentage 100
```
僅僅這兩個配置還不能瞭解清楚`redis`何時重寫,我們還需要有`aof_current_size`和`aof_base_size`,`aof_current_size`就是`aof`檔案當前的大小,`redis`啟動載入`aof`檔案或者每次`aof`追加資料都會更新這個值,這個值並不會儲存到磁碟中,`aof_base_size`也是同理,如果啟動時有載入`aof`檔案,那麼`aof_base_size`的值就是`aof`檔案的大小。
當`aof_current_size`>`auto-aof-rewrite-min-size`並且有配置`auto-aof-rewrite-percentage`時,如果(`aof_current_size`-`aof_base_size`)/`100` >= `percentage`,則會自動重寫。比如按照上文的配置,`redis`啟動時載入的`aof`檔案大小為`100mb`,那麼`aof_base_size`就是`100mb`,當`redis`檔案增長到`200mb`的時候就會自動重寫。
但是會存在這樣一種情況,`redis`檔案增長到`199mb`的時候,剛好重啟了,那麼下次啟動的時候,`aof_base_size`就和`aof_current_size`大小相等,想要觸發自動重寫,就要等到`redis`檔案大小增長到`400mb`左右。**如果資料增長地比較緩慢,或者是百分比配置較大**。在觸發重寫之前,`redis`就關閉或者重啟了。那麼`aof_base_size`下次啟動的時候會被重新整理成`aof_current_size`的大小,**導致可能永遠無法觸發自動重寫。**
#### aof重寫的優先順序
`aof`重寫的優先順序低於`rdb`,如果兩者的觸發條件同時滿足,`redis`會優先處理`rdb`儲存。觀察原始碼,可以發現`rdb`儲存先於`aof`,如果`rdb`此處觸發,即使`aof`觸發重寫的條件滿足,因為`server.rdb_child_pid`將不為`-1`,導致無法進入`aof`重寫。
```c
serverCron(aeEventLoop*, longlong, void*) {
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren()) {
//... 檢查子程序是否結束並處理。
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
...
//..處理rdb自動儲存
}
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
}
```
#### aof 重寫的來龍去脈
`rewriteAppendOnlyFileBackground`會建立許多管道用於父子間通訊。
- `childInfoPipe`用於子程序向父程序提示有多少個`Copy-On-Write`記憶體。
- `aof_pipe_write_data_to_child`用於父程序向`aof`重寫子程序傳送最近的資料變更。
- `aof_pipe_write_ack_to_parent`和`aof_pipe_write_ack_to_child`用於等待彼此的確認訊息。
並且註冊了`aof_pipe_read_ack_from_child`的檔案事件,**當子程序向父程序傳送中止請求的時候,就會呼叫`aof_pipe_read_ack_from_child`函式。**
```c
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
//注意:
//這裡註冊了一個檔案事件
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;
error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return C_ERR;
}
```
---
**父程序** 建立完子程序後,父程序會更新`aof_child_pid`記錄子程序`id`,雖然只更新了一個欄位,但意味著已經開啟了一個很有可能影響`redis`效能的任務。
---
**子程序 **先向臨時檔案寫入當前資料庫的內容,如果開啟了`aof_use_rdb_preamble`(預設關閉,但建議開啟),那麼就會寫入`rdb`資料,也就是`db`資料全量儲存,否則按`aof`追加模式,全量儲存`db`中的內容,**接著重新整理資料到磁碟,阻塞。**
```c
//in function rewriteAppendOnlyFile(char* filename)
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
```
---
**父程序** 在`aof`子程序等待資料重新整理的時候,繼續處理請求,並且將資料追加到`server.aof_rewrite_buf_blocks`,如果沒有註冊`aof_pipe_write_data_to_child`(是個管道,也就是檔案描述符)檔案事件的話,會將該管道和`aofChildWriteDiffData`繫結,如果管道可寫,則會將`server.aof_rewrite_buf_blocks`中的資料寫入管道傳送給子程序。**這樣保證了父程序不會因為向管道寫入資料而阻塞**。
```c
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
...
// 一直將資料寫入aof_rewrite_buf_block
}
//註冊檔案事件
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
```
---
**子程序** 重新整理完之前的資料後,會在1秒內一直讀取來自父程序的資料,將其寫入到`aof_child_diff`中。然後向父程序傳送**停發資料**請求。
```c
//in function rewriteAppendOnlyFile(char* filename)
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
```
---
**父程序** 在`aeMainLoop`中檢測到`aof_pipe_read_ack_from_child`管道可讀事件(在建立管道的時候註冊,請看前文),呼叫`aofChildPipeReadable`函式,將`aof_stop_sending_diff`設定為1,父程序不會再將`aof_rewrite_buf_blocks`緩衝區的內容寫給子程序。並向子程序傳送訊息表示已經收到停發請求。
---
**子程序** 接受到父程序的同意後,**最後讀取一次資料**,因為在父程序接受到停發請求前可能又傳送了資料。至此,停發請求前的額外`aof`增量資料都已寫入`aof_child_diff`。接著子程序將其寫入檔案並重新整理,退出子程序。
```c
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
aofReadDiffFromParent();
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
```
---
**父程序** 在`serverCron`函式中呼叫`wait3`檢測到`aof`重寫子程序的退出,會呼叫`backgroundRewriteDoneHandler`處理。
它首先會開啟之前儲存的臨時檔案,將中止請求後的追加資料`aof_rewrite_buf_blocks`寫入檔案(**注意:雖然子程序之前請求中止傳送資料了,但因為`rdb_child_pid`直到現在還是儲存的子程序的id,會一直接受追加資料到`aof_rewrite_buf_blocks`**)。此時已經將所有的資料都寫入`aof`臨時檔案。接下來就是將臨時檔案替換為`aof`儲存的檔名。
### rdb對比aof
官網有一篇文章[《persistence》](https://redis.io/topics/persistence)已經做了比對,在此不再贅述。
## 參考文獻
\[1\][《Redis 原始碼》](https://github.com/dewxin/redis)
\[2][《Redis開發與運維》](https://book.douban.com/subject/26971561/)
\[3\][《Redis設計與實現》](https://book.douban.com/subject/25900156/)
\[4\][《fsync() on a different thread: apparently a useless trick》](http://oldblog.antirez.com/post/fsync-different-thread-useless.html)
\[5\][《private dirty memory》](https://stackoverflow.com/questions/17594183/what-does-private-dirty-memory-mean-in-smaps)
\[6\][《pipe(2) - Linux man page》](https://linux.die.net/man/2/pipe)
\[7\][《wait3(2) - Linux man page》](https://linux.die.net/man/2/wait3)
\[8\][《ftruncate(3) - Linux man page》](https://linux.die.net/man/3/ftruncate)
\[9\][《Redis persistence》](https://redis.io/topics/pers