Redis原始碼學習-AOF
阿新 • • 發佈:2019-02-06
前言
網路上也有許多介紹redis的AOF機制的文章,但是從巨集觀上介紹aof的流程,沒有具體分析在AOF過程中涉及到的資料結構和控制機制。昨晚特別看了2.8原始碼,感覺原始碼中的許多細節是值得細細深究的。特別是list *aof_rewrite_buf_blocks結構。仔細看原始碼,會發現原來看網路文章多的到的領會是片面的,最好的學習還是得自己動手...
原文連結: http://blog.csdn.net/ordeder/article/details/39271543
作者提及的AOF簡化的流程為:
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
AOF流程
依據原始碼,AOF總體有一下操作:
主要函式:
//函式1:將command寫入aof_buff
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
//函式2:啟動子程序,子程序用於刷一遍redis中的資料
int rewriteAppendOnlyFileBackground(void);
//函式3:刷一遍server.db[16],依次將物件寫入磁碟臨時檔案tmpfile
int rewriteAppendOnlyFile(char *filename);
//函式4:將aof_buff內容持久化
void flushAppendOnlyFile(int force);
//函式5:將server.aof_rewrite_buf_blocks中的內容寫入tmpfile,並替換aof檔案
void backgroundRewriteDoneHandler(exitcode,bysignal);
1 AOF日常命令append:
1.1. Redis執行檔案事件:執行使用者命令,並將該命令緩存於Server.aof_buf中{函式1}
1.2. Redis執行時間時間的ServerCron:依據引數server.aof_flush_postponed_start,{函式4}
1.2.1. 將redisServer.aof_buf寫入檔案Server.aof_fd。
1.2.2. 該檔案何時fsync到磁碟有三種機制:
AOF_FSYNC_EVERYSEC 每秒呼叫fsync
AOF_FSYNC_ALWAYS 寫檔案後立即呼叫fsync
其他 聽系統的
2 AOF日誌簡化操作:
2.1. Redis執行時間時間的ServerCron:{函式2-3}
2.1.1. 開啟後臺AOF程序,依據redis記憶體資料(redis.db[16]),生成可重建資料庫的命令集,並寫入tmpfile臨時檔案
2.2. Redis執行檔案事件:
執行使用者命令時,{函式1}
2.2.1. 將該命令緩存於redisServer.aof_buf;
2.2.2. 同時將該命令緩存於server.aof_rewrite_buf_blocks
2.3. Redis執行時間時間的ServerCron:
2.3.1 {函式4}在aof子程序還未結束期間,步驟 1.2 照常執行,將aof_buf寫入aof_fd(該幹嘛幹嘛)
2.3.2 wait3發現aof子程序結束,那麼:{函式5}
2.3.2.1 將server.aof_rewrite_buf_blocks中的內容寫入tmpfile中
2.3.2.2 用tmpfile替換原有aof檔案,並重置Server.aof_fd
函式和資料間關係如下圖所示:
原始碼
struct redisServer{
...
/* AOF persistence */
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy (每個操作|每秒|緩衝區滿)*/
char *aof_filename; /* Name of the AOF file */
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. 是否需要開啟後臺aof子程序*/
pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. 在aof bgsave期間redis執行的命令將儲存到aof_rewrite_buf_blocks,當然aof_buf還是要照常使用的,二者不衝突*/
sds aof_buf; /* AOF buffer, written before entering the event loop */
int aof_fd; /* File descriptor of currently selected AOF file */
int aof_selected_db; /* Currently selected DB in AOF */
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
time_t aof_last_fsync; /* UNIX time of last fsync() */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
...
}
/////////////////////////////////////////////////////////////////////////////////
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
int client_old_flags = c->flags;
...
/* 執行使用者命令 */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate);
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
duration = ustime()-start;
...
/* 將使用者命令進行AOF備份 */
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* 如果當前操作的dict和前一次操作的dict不同,
那麼redis要在aof中新增一條:select命令,選擇當前dict */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
//依據不同的命令,進行字元畫處理,並將結果寫入臨時的buff中
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
//如果使用者開啟的AOF,那麼將當前命令的buff Append到server.aof_buf緩衝的尾部
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
//如果當前有子程序正在進行AOF日誌的重構(即掃描redis資料庫,依據資料構建日誌)
//那麼將當前命令的buff新增到server.aof_rewrite_buf_blocks記憶體中(該部分記憶體
//專門記錄在重構AOF期間redis處理的操作)
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
////////////////////////////////////////////////////////////////////////////////////////
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* We take a cached value of the unix time in the global state because
* with virtual memory and aging there is to store the current time
* in objects at every object access, and accuracy is not needed.
* To access a global var is faster than calling time(NULL) */
//快取系統時間...
server.unixtime = time(NULL);
server.mstime = mstime();
...
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
//開啟AOF日誌重建的子程序(簡化日誌)
//後臺AOF子程序通過掃描redis.db[16]資料,生成可重建當前資料庫的命令,
//並寫入臨時檔案tmpfile
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
//AOF
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
//後臺AOF程序結束:將在後臺AOF子程序構建AOF日誌期間redis執行的新命令
//(記錄於server.aof_rewrite_buf_blocks)append 到後臺子程序構建的tmpfile中
//最後將tmpfile重名為server.aof_filename 替換原有AOF
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
//沒有後臺子程序在跑,那麼檢查是否要開啟一個AOF或者RDB的子程序。。。
...
}
/* If we postponed an AOF buffer flush, let's try to do it every time the
* cron function is called. */
//將server.aof_buf(快取redis最近執行過的命名)flush到磁碟AOF檔案中
//flush的策略有如下:
//每個操作,呼叫fync將命令持久化
//間隔1秒,呼叫fync將aof_buf持久化
//從不呼叫fync,由系統自行安排時機
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
...
server.cronloops++;
return 1000/server.hz;
}
/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1) return REDIS_ERR;
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) {
//新增一條定位dict的命令
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key);
/* If this key is already expired skip it */
if (expiretime != -1 && expiretime < now) continue;
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
dictReleaseIterator(di);
}
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
aof_fsync(fileno(fp));
fclose(fp);
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
return REDIS_OK;
werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
if (sdslen(server.aof_buf) == 0) return;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
//判定是否該開始將server.aof_buff中快取的命令flush到server.aof_fd檔案的寫緩衝中
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
//將redis最近執行的一些命令(存於server.aof_buf)寫入檔案(server.aof_fd)
//注意,寫入檔案並不能保證馬上寫入磁碟,因為這是帶緩衝的寫。關於何時將
//檔案寫緩衝中的命令fync到磁碟,這就要看使用者的設定:(見下文)
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
/* Ooops, we are in troubles. The best thing to do for now is
* aborting instead of giving the illusion that everything is
* working as expected. */
...
exit(1);
}
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//aof_no_fsync_on_rewrite : 該標誌位表示當有aof或rdb子程序時,不進行fsync操作
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
//fsync...
//每個操作,呼叫fync將命令持久化 [1]
//間隔1秒,呼叫fync將aof_buf持久化 [2]
//從不呼叫fync,由系統自行安排時機(fd的寫緩衝區滿了)[3]
//【1】
//每個操作都需要將檔案緩衝區的寫 buff sync到磁碟。從而保證每個redis操作在
//被redis執行後,都能馬上持久化,安全性很高,就是磁碟寫的系統開銷有點大大
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
}
//【2】
//每隔1s將檔案緩衝區的寫緩衝區sync到磁碟
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
//【3】
//else fd的寫緩衝滿後會由系統安排執行(聽天由命)
}