Redis 服務端程式實現原理
阿新 • • 發佈:2020-03-20
上篇我們簡單介紹了 redis 客戶端的一些基本概念,包括其 client 資料結構中對應的相關欄位的含義,本篇我們結合這些,來分析分析 redis 服務端程式是如何執行的。一條命令請求的完成,客戶端服務端都經歷了什麼?服務端程式中定時函式 serverCron 都有哪些邏輯?
### 一、redis 客戶端如何連線服務端
我們平常最簡單的一個 redis 客戶端命令,**redis-cli**,這個命令會導致我們的客戶端向服務端發起一個 connect 連線操作,具體就是以下幾個步驟。
**1、網路連線**
第一步是網路連線,也就是我們的客戶端會與服務端進行 TCP 三次握手,並指明使用 socket 通訊協議。
接著服務端 redis 使用 epoll 事件機制監聽埠的讀事件,一旦事件可讀則判定是有客戶端嘗試建立連線,服務端會檢查最大允許連線數是否到達,如果達到則拒絕建立連線,否則服務端會建立一個 fd 檔案描述符並返回給客戶端,代表連線成功建立。
**2、更新客戶端連線資訊**
之前介紹 redis 客戶端的時候,我們說過 redisServer 中有這麼一個欄位:
```
struct redisServer {
........
list *clients; /* List of active clients */
........
}
```
clients 欄位是一個雙端連結串列結構,儲存了所有成功建立連線的客戶端 client 資訊,那麼我們第二步就是建立一個 client 結構的客戶端抽象例項並新增到 redisServer 結構 clients 連結串列中。
**3、為新客戶端註冊讀事件**
每一個客戶端連線都對應一個 fd 檔案描述符,我們只需要監聽這個檔案描述符的讀事件,即可判斷該套接字上是否有資訊傳送過來。
這裡也一樣,我們通過註冊該 fd 的讀事件,當該客戶端傳送資訊給服務端時,我們無需去輪詢即可發現該客戶端在請求服務端的動作,繼而服務端程式解析命令。
### 二、redis 如何執行一條命令
redis 服務端程式啟動後,會初始化一些欄位變數,為 redisServer 中的一些欄位賦預設值,還會讀取使用者指定的配置檔案內容並載入配置,反應到具體資料結構內,最後會呼叫 asMain 函式進行事件迴圈監聽。
每當客戶端發起連線請求,或者傳送命令過來,這裡的事件分發器就會監聽到套接字的可讀事件,於是找到可讀事件所繫結的事件處理器 readQueryFromClient,並呼叫它。
```
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
........
//讀取客戶端輸入緩衝區大小
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//從 fd 檔案描述符對應的 socket 中讀取命令資料
//儲存進 querybuf 輸入緩衝區
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
//異常返回
return;
} else {
//異常釋放客戶端連線
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
//客戶端已經關閉、釋放客戶端
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
//如果輸入緩衝區長度超過系統設定最大長度,釋放客戶端
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
size_t prev_offset = c->reploff;
//這裡會讀取緩衝區寫入的命令
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
```
總的來說,readQueryFromClient 主要完成的就是將 socket 中發來的命令讀取到客戶端輸入緩衝區,然後呼叫 processInputBuffer 處理緩衝區中的命令。
```
void processInputBuffer(client *c) {
server.current_client = c;
while(sdslen(c->querybuf)) {
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
//判斷請求型別
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
//根據不同的請求型別,執行命令解析
//實際上就是把命令的名稱、引數解析存入 argc 陣列中
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
//查詢執行命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
if (server.current_client == NULL) break;
}
}
server.current_client = NULL;
}
```
processCommand 函式會從客戶端例項命令引數欄位中拿到命令的名稱、引數型別、引數值等等資訊。redisServer 在成功啟動後,會呼叫 populateCommandTable 方法初始化 redisCommandTable,存入一個字典集合。
每一個 redisCommand 是這麼一個數據結構:
```
struct redisCommand {
//命令名稱
char *name;
//函式指標,指向一個具體實現
redisCommandProc *proc;
//引數個數
int arity;
//命令的型別,寫命令?讀命令?等
char *sflags;
int flags;
redisGetKeysProc *getkeys_proc;
int firstkey;
int lastkey;
int keystep;
//伺服器啟動後共呼叫該命令次數
//伺服器啟動後執行該命令耗時總
long long microseconds, calls;
};
```
processCommand 最後會找到命令,進而執行命令,並將命令執行的結果寫入客戶端輸出緩衝區,並將響應寫回客戶端。以上就是 redis 對於一條命令請求的執行過程,隨著我們的不斷學習,以上內容會不斷深入,現在你可以理解的大概就好。
### 三、週期系統函式 serverCron
redis 可以說是事件驅動中介軟體,它主要有兩種事件,檔案事件和時間事件,檔案事件我們就不多說,時間事件主要分為兩種,一種是定時事件,另一種週期事件。
定時事件指的是,預定的程式將會在某個具體的時間節點執行。週期事件是指,預定程式每隔某個時間間隔就會被呼叫執行。
而我們的 serverCron 顯然是一個週期時間事件,在正式分析其原始碼實現之前,我們先來看看它的前世今身,在哪裡被註冊,又是如何被呼叫的。
```
void initServer(void) {
。。。。。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
。。。。。
}
```
我們 redis 伺服器啟動初始化時,會呼叫 aeCreateTimeEvent 繫結一個 serverCron 的時間事件。
這是 redis 中事件迴圈結構
```
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
```
其中指標 timeEventHead 是一個雙端連結串列,所有的時間事件都會以連結串列的形式儲存在這裡,具體指向的結構是 aeTimeEvent。
```
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
//下一次什麼時候被執行(單位秒)
long when_sec; /* seconds */
//下一次什麼時候被執行(單位毫秒)
long when_ms; /* milliseconds */
//時間事件處理函式
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
//前後連結串列指標
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
```
serverCron 在這裡會被建立並新增到時間事件連結串列中,並設定它下一次執行時間為當前時間,具體你可以自行深入檢視呼叫棧,那麼下一次時間事件檢查的時候,serverCron 就一定會被執行。
好了,至此 serverCron 已經註冊進 redis 的時間事件結構中,那麼什麼時候檢查並呼叫呢?
```
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
```
還記的我們 redis 成功啟動後,會進入主事件迴圈中嗎?aeProcessEvents 裡面具體不一行行帶大家分析了,我們挑相關的進行分析。
```
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
。。。。。
//遍歷整個時間事件連結串列,找到最快要被執行的任務
//計算與當前時間的差值
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
//記錄差值儲存進變數 tvp
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
//已經錯過執行該時間事件,tvp 賦零
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL; /* wait forever */
}
}
//aeApiPoll 會處理檔案事件,最長 tvp 時間就要返回
numevents = aeApiPoll(eventLoop, tvp);
。。。。。
//檢查處理時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
}
```
你看,實際上儘管我們對週期時間事件指定了嚴格的執行間隔,但實際上大多數情況下,時間事件會晚於我們既定時間節點。
processTimeEvents 函式檢查所有時間事件函式,如果有符合條件應該得到執行的,會立即執行該事件處理器,並根據事件處理器返回的狀態,刪除時間事件或設定下一次執行時間。
```
static int processTimeEvents(aeEventLoop *eventLoop) {
。。。。。。
//獲取當前時間
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
//這是一個週期執行的時間事件,設定下次執行時間
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
//刪除事件
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
```
以上,你應該瞭解到 serverCron 何時註冊的,何時被執行,經過了哪些過程。下面我們具體看 serverCron 的內容。
```
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
//更新 server.unixtime 和 server.mxtime
updateCachedTime();
//每間隔 100 毫秒,統計一次這段時間內命令的執行情況
run_with_period(100) {
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
}
。。。。。。
}
```
其中 run_with_period 為什麼能做到顯式控制 100 毫秒內只執行一次呢?
其實 run_with_period 的巨集定義如下:
```
#define run_with_period(_ms_)
if ((_ms_ <= 1000/server.hz) ||
!(server.cronloops%((_ms_)/(1000/server.hz))))
```
server.hz 是 redisServer 結構中的一個欄位,可以允許我們通過配置檔案進行調節,它是一個整數,描述服務 serverCron 在一秒內執行 N 次。server.cronloops 描述伺服器自啟動以來,共執行 serverCron 次數。
那麼,1000/server.hz 描述的就是 serverCron 每間隔多少毫秒就需要被執行,如果我們傳入的 ms 小於這個間隔,返回 1 並立馬執行後續函式體。或者根據 serverCron 已經執行的次數,計算間隔時間是否達到,返回 0 或 1。
```
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。。
//更新全域性 lru 時鐘,這個用於每個 redis 物件最長未訪問淘汰策略
unsigned long lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
//不斷比較當前記憶體使用量,儲存最高峰值記憶體使用量
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
server.resident_set_size = zmalloc_get_rss();
// 如果收到了SIGTERM訊號,嘗試退出
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
。。。。。。
}
```
lru 後面我們會繼續說的,redis 維護一個全域性 lru 時鐘參照,每個 redisObject 結構中也會有一個自己的 lru 時鐘,它記錄的是上一次訪問該物件時的時鐘,這些資訊會用於鍵值淘汰策略。所以,伺服器會定時的更新這個全域性 lru 時鐘,保證它準確。
```
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。。
//每間隔五秒,輸出非空資料庫中的相關屬性資訊
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
//如果不是sentinel模式,則每5秒輸出一個connected的client的資訊到log
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
。。。。。。
}
```
```
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。。
clientsCron();
databasesCron();
。。。。。。
}
```
clientsCron 會檢查有哪些客戶端連線超時並將他們釋放,還會檢查客戶端的輸入緩衝區 querybuff 是否太大,或者該客戶端不是很活躍,那麼會釋放掉該客戶端的輸入緩衝區並重新建立一個預設大小的。
databasesCron 會首先隨機遍歷所有的資料庫並抽取 expired 集合中部分鍵,判斷是否過期並執行相應的刪除操作。除此之外,該函式還會隨機訪問部分資料庫,並根據其狀態觸發 rehash。
```
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。
//如果服務沒有在執行 rdb 備份生成,也沒有在 aof 備份生成
//並且有被延遲的 aof rewrite,那麼這裡會執行
//當伺服器正在進行 BGSAVE 備份的期間,所有的 rewrite 請求都會被延遲
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
//如果有 rdb 子程序或 aof 子程序
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
//子程序 id 等於負一,說明子程序退出或異常,記錄日誌
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
//pid 指向 rdb 子程序 id
//判斷如果子程序退出了,進行一些後續的 rdb 操作
//更新 dirty,lastsave 時間等等
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
//pid 指向 aof 子程序 id
//aof 子程序退出,處理其後續的一些收尾
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
} else {
//這部分我們前面的文章介紹過
//saveparams 儲存了 save 所有的配置項,是一個數組
//這裡校驗是否達到條件
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
。。。。
}
```
```
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
。。。。
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
//每一秒檢查一次上一輪aof的寫入是否發生了錯誤,如果有錯誤則嘗試重新寫一次
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
freeClientsInAsyncFreeQueue();
clientsArePaused();
run_with_period(1000) replicationCron();
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
//增加 serverCron 執行次數
server.cronloops++;
return 1000/server.hz;
。。。。
}
```
以上,我們分析了 serverCron 的內部邏輯,雖然說我們配置上可以指定它執行間隔,但是實際上取決於具體的執行時間,內部邏輯也不少,希望你能瞭解了個大概。
好了,這是我們對於 redis 服務端程式的一點點了解,如果覺得我有說不對的地方或者你有更深的理解,也歡迎你加我微信一起探討。
接下來,我們的 redis 之旅從單擊開始步入多機模式,下一篇多機資料庫的理~
---
**關注公眾不迷路,一個愛分享的程式設計師。 **
**公眾號回覆「1024」加作者微信一起探討學習! **
**每篇文章用到的所有案例程式碼素材都會上傳我個人 github **
**https://github.com/SingleYam/overview_java **
**歡迎來踩! **
![YangAM 公眾號](https://s2.ax1x.com/2019/01/05/F7d