Redis中單機資料庫的實現
1. 記憶體操作層 zmalloc 系介面
redis為了優化記憶體操作, 封裝了一層記憶體操作介面. 預設情況下, 其底層實現就是最簡樸的libc中的malloc
系列介面. 如果有定製化需求, 可以通過配置方式, 將底層記憶體操作的實現更換為tcmalloc
或jemalloc
庫.
redis封裝的這一層介面, 其介面定義與預設實現在zmalloc.h
與zmalloc.c
中. 其預設實現支援在O(1)複雜度的情況下返回記憶體塊的大小. 具體實現上的思路也十分簡樸: 就是在記憶體塊頭部多分配一個long
的空間, 將記憶體塊的大小記錄在此.
在zmalloc.c
中可以看到, 底層介面的具體實現可選的有tcmalloc
jemalloc
兩種:
/* Explicitly override malloc/free etc when using tcmalloc. */ #if defined(USE_TCMALLOC) #define malloc(size) tc_malloc(size) #define calloc(count,size) tc_calloc(count,size) #define realloc(ptr,size) tc_realloc(ptr,size) #define free(ptr) tc_free(ptr) #elif defined(USE_JEMALLOC) #define malloc(size) je_malloc(size) #define calloc(count,size) je_calloc(count,size) #define realloc(ptr,size) je_realloc(ptr,size) #define free(ptr) je_free(ptr) #define mallocx(size,flags) je_mallocx(size,flags) #define dallocx(ptr,flags) je_dallocx(ptr,flags) #endif
記憶體分配介面的實現如下, 從程式碼中可以看出, 其頭部多分配了PREFIX_SIZE
個位元組用於儲存資料區的長度.
void *zmalloc(size_t size) { void *ptr = malloc(size+PREFIX_SIZE); if (!ptr) zmalloc_oom_handler(size); #ifdef HAVE_MALLOC_SIZE update_zmalloc_stat_alloc(zmalloc_size(ptr)); return ptr; #else *((size_t*)ptr) = size; update_zmalloc_stat_alloc(size+PREFIX_SIZE); return (char*)ptr+PREFIX_SIZE; #endif }
2. 事件與IO多路複用
redis中的事件處理機制和記憶體分配一樣, 也是糊了一層介面. 其底層實現是可選的. 預設情況下, 會自動選取當前OS平臺上速度最快的多路IO介面, 比如在Linux平臺上就是epoll
, 在Sun/Solaris系列平臺上會選擇evport
, 在BSD/Mac OS平臺上會選擇kqueue
, 實再沒得選了, 會使用POSIX標準的select
介面. 保證起碼能跑起來
事件處理對各個不同底層實現的包裹, 分別在ae_epoll.c
, ae_evport.c
, ae_kqueue
, ae_select.c
中, 事件處理器的定義與實現在ae.h
與ae.c
中.
ae.h
中, 定義了
- 事件處理回撥函式的別名:
ae***Proc
函式指標類型別名 - 檔案事件與定時事件兩種結構:
aeFileEvent
與aeTimeEvent
- 結構
aeFiredEvent
, 代表被觸發的事件 - 結構
aeEventLoop
, 一個事件處理器. 包含一個事件迴圈.
分別如下:
#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
READABLE event already fired in the same event
loop iteration. Useful when you want to persist
things to disk before sending replies, and want
to do that in a group fashion. */
// .....
/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc; // 可讀回撥
aeFileProc *wfileProc; // 可寫回調
void *clientData; // 自定義資料
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */ // 定時事件的編號
long when_sec; /* seconds */ // 觸發時間(單位為秒)
long when_ms; /* milliseconds */ // 觸發時間(單位為毫秒)
aeTimeProc *timeProc; // 定時回撥
aeEventFinalizerProc *finalizerProc; // 事件自殺析構回撥
void *clientData; // 自定義資料
struct aeTimeEvent *prev; // 連結串列的前向指標與後向指標
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent { // 描述了一個被觸發的事件
int fd; // 檔案描述符
int mask; // 觸發的型別
} aeFiredEvent;
/* State of an event based program */
typedef struct aeEventLoop { // 描述了一個熱火朝天不停迴圈的事件處理器
int maxfd; // 當前監控的檔案事件中, 檔案描述符的最大值
int setsize; // 事件處理器的最大容量(能監聽的檔案事件的個數)
long long timeEventNextId; // 下一個到期要執行的定時事件的編號
time_t lastTime; // 上一次有回撥函式執行的時間戳, 用以防止伺服器時間抖動或溢位
aeFileEvent *events; // 所有註冊的檔案事件
aeFiredEvent *fired; // 所有被觸發的檔案事件
aeTimeEvent *timeEventHead; // 定時事件連結串列
int stop; // 急停按鈕
void *apidata; // 底層多路IO介面所需的額外資料
aeBeforeSleepProc *beforesleep; // 一次迴圈結束, 所有事件處理結束後, 要執行的回撥函式
aeBeforeSleepProc *aftersleep; // 一次迴圈開始, 在執行任何事件之前, 要執行的回撥函式
} aeEventLoop;
對於檔案事件來說, 除了可讀AE_READABLE
與可寫AE_WRITABLE
兩種監聽觸發方式外, 還有一種額外的AE_BARRIER
觸發方式. 若監聽檔案事件時, 將AE_BARRIER
與AE_WRITABLE
組合時, 保證若當前檔案如果正在處理可讀(被可讀觸發), 就不再同時觸發可寫. 在一些特殊場景下這個特性是比較有用的.
事件處理器aeEventLoop
中, 對於檔案事件的處理, 走的都是老套路. 這裡需要注意的是事件處理器中, 定時事件的設計:
- 所有定時事件, 像糖葫蘆一樣, 串成一個連結串列
- 定時事件不是嚴格定時的, 定時事件中不適宜執行耗時操作
事件處理器的核心介面有以下幾個:
// 建立一個事件處理器例項
aeEventLoop *aeCreateEventLoop(int setsize);
// 銷燬一個事件處理器例項
void aeDeleteEventLoop(aeEventLoop *eventLoop);
// 事件處理器急停
void aeStop(aeEventLoop *eventLoop);
// 建立一個檔案事件, 並新增進事件處理器中
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
// 從事件處理器中刪除一個檔案事件.
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
// 通過檔案描述符, 獲取事件處理器中, 該檔案事件註冊的觸發方式(可讀|可寫|BARRIER)
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 建立一個定時事件, 並掛在事件處理器的時間事件連結串列中. 返回的是建立好的定時事件的編號
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
// 刪除事件處理器中的某個時間事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
// 呼叫多路IO介面, 處理所有被觸發的檔案事件, 與所有到期的定時事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
// tricky介面.內部呼叫poll介面, 等待milliseconds毫秒, 或直至指定fd上的指定事件被觸發
int aeWait(int fd, int mask, long long milliseconds);
// 事件處理器啟動器
void aeMain(aeEventLoop *eventLoop);
// 獲取底層路IO介面的名稱
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
核心程式碼如下:
首先是啟動器, 啟動器負責三件事:
- 響應急停
- 執行
beforesleep
回撥, 如果設定了該回調的值, 那麼在每次事件迴圈之前, 該函式會被執行 - 一遍一遍的輪大米, 呼叫
aeProcessEvents
, 即事件迴圈
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
然後是事件迴圈
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
// 無事快速退朝
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
// 在 "事件處理器中至少有一個檔案事件" 或 "以阻塞形式執行事件迴圈, 且處理定時事件"
// 時, 進入該分支
// 這裡有一點繞, 其實就是, 如果
// "事件處理器中沒有監聽任何檔案事件", 且,
// "執行事件迴圈時指明不執行定時事件, 或雖然指明瞭執行定時事件, 但是要求以非阻塞形式執行事件迴圈"
// 都不會進入該分支, 不進入該分支時, 分兩種情況:
// 1. 沒監聽任何檔案事件, 且不執行定時事件, 相當於直接返回了
// 2. 沒監聽任何檔案事件, 執行定時事件, 但要求以非阻塞形式執行. 則立即執行所有到期的時間事件, 如果沒有, 則相當於直接返回
// 第一步: 計算距離下一個到期的定時事件, 還有多長時間. 計該時間為tvp
// 即便本次事件迴圈沒有指明要處理定時事件, 也計算這個時間
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 第二步: 以tvp為超時時間, 呼叫多路IO介面, 獲取被觸發的檔案事件, 並處理檔案事件
// 如果tvp為null, 即當前事件處理器中沒有定時事件, 則呼叫aeApiPoll的超時時間是無限的
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
// 通常情況下, 如果一個檔案事件, 同時被可讀與可寫同時觸發
// 都是先執行可讀回撥, 再執行可寫回調
// 但如果事件掩碼中帶了AE_BARRIER, 就會扭轉這個行為, 先執行可寫回調, 再執行可讀加高
int invert = fe->mask & AE_BARRIER;
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 按需執行時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
3. 網路庫
在anet.h
與anet.c
中, 對*nix Tcp Socket相關介面進行了一層薄封裝, 基本上就是在原生介面的基礎上做了一些錯誤處理, 並整合了一些介面呼叫. 這部分程式碼很簡單, 甚至很無聊.
4. 單機資料庫的啟動
在Redis程序中, 有一個全域性變數 struct redisServer server
, 這個變數描述的就是Redis服務端. (結構體定義位於server.h
中, 全域性變數的定義位於server.c
中), 結構體的定義如下:
struct redisServer {
// ...
aeEventLoop * el; // 事件處理器
// ...
redisDb * db; // 資料庫陣列.
// ...
int ipfd[CONFIG_BINDADDR_MAX]; // listening tcp socket fd
int ipfd_count;
int sofd; // listening unix socket fd
// ...
list * clients; // 所有活動客戶端
list * clients_to_close; // 待關閉的客戶端
list * clients_pending_write; // 待寫入的客戶端
// ...
client * current_clients; // 當前客戶端, 僅在crash報告時才使用的客戶端
int clients_paused; // 暫停標識
// ...
list * clients_waiting_acks; // 執行WAIT命令的客戶端
// ...
uint64_t next_client_id; // 下一個連線至服務端的客戶端的編號, 自增編號
}
這個結構體中的欄位實再是太多了, 這裡僅列出一些與單機資料庫相關的欄位
結構體中定義了一個指標欄位, 指向struct redisDb
陣列型別. 這個結構的定義如下(也位於server.h
中)
typedef struct redisDb {
dict *dict; /* 資料庫的鍵空間 */
dict *expires; /* 有時效的鍵, 以及對應的過期時間 */
dict *blocking_keys; /* 佇列: 與阻塞操作有關*/
dict *ready_keys; /* 佇列: 與阻塞操作有關 */
dict *watched_keys; /* 與事務相關 */
int id; /* 資料庫編號 */
long long avg_ttl; /* 統計值 */
} redisDb;
在介紹阻塞操作與事務之前, 我們只需要關心三個欄位:
dict
資料庫中的所有k-v對都儲存在這裡, 這就是資料庫的鍵空間expires
資料庫中所有有時效的鍵都儲存在這裡id
資料庫的編號
從資料結構上也能看出來, 單機資料庫的啟動其實需要做以下的事情:
- 初始化全域性變數
server
, 並初始化server.db
陣列, 陣列中的每一個元素就是一個數據庫 - 建立一個事件處理器, 用於監聽來自使用者的命令, 並進行處理
在server.c
檔案中的main
函式跟下去, 就能看到上面兩步, 這裡的程式碼很繁雜, 下面只選取與單機資料庫啟動相關的程式碼進行展示:
int main(int argc, char **argv) {
// 初始化基礎設計
// ...
server.sentinel_mode = checkForSentinelMode(argc,argv); // 從命令列引數中解析, 是否以sentinel模式啟動server
initServerConfig(); // 初始化server配置
moduleInitModulesSystem(); // 初始化Module system
// ...
if (server.sentinel_mode) { // sentinel相關邏輯
initSentinelConfig();
initSentinel();
}
// 如果以 redis-check-rdb 或 redis-check-aof 模式啟動server, 則就是呼叫相應的 xxx_main()函式, 然後終止程序就行了
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
// 處理配置檔案與命令列引數, 並輸出歡迎LOGO...
// ...
// 判斷執行模式, 如果是後臺執行, 則進入daemonize()中, 使程序守護化
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
if (background) daemonize();
// 單機資料庫啟動的核心操作: 初始化server
initServer();
// 瑣事: 建立pid檔案, 設定程序名, 輸出ASCII LOGO, 檢查TCP backlog佇列等
//...
if(!server.sentinel_mode) {
// 載入自定義Module
// ...
} else {
sentinelIsRunning();
}
// 檢查記憶體狀態, 如有必要, 輸出警告日誌
// ...
// 向事件處理器註冊事前回調與事後回撥, 並開啟事件迴圈
// 至此, Redis服務端已經啟動
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el)
// 服務端關閉的善後工作
aeDeleteEventLoop(server.el);
return 0;
}
從main
函式中能大致瞭解啟動流程, 也基本能猜出來, 在最核心的initServer
函式的呼叫中, 至少要做兩件事:
- 初始化事件處理器
server.el
- 初始化服務端的資料庫
server.db
- 初始化監聽socket, 並將監聽socket fd加入事件處理回撥
如下:
void initSetver(void) {
// 處理 SIG_XXX 訊號
// ...
// 初始化server.xxx下的一部分欄位, 值得關注的有:
server.pid = getpid()
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
//...
server.clients_pending_write = listCreate();
//...
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
//...
server.clients_paused = 0
//...
// 建立全域性共享物件
createSharedObjects();
// 嘗試根據最大支援的客戶端連線數, 調整最大開啟檔案數
adjustOpenFilesLimit(void)
// 建立事件處理器
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
if (server.el == NULL) {
// ...
exit(1);
}
// 初始化資料庫陣列
server.db = zmalloc(sizeof(redisDb) * server.dbnum);
// 監聽tcp埠, tcp 監聽檔案描述符儲存在 server.ipfd 中
if (server.port != 0 && listenToPort(server.port, server.ipfd, &server.ipfd_count) == C_ERR) exit(1);
// 監聽unix埠
if (server.unixsocket != NULL) {
//...
}
// 如果沒有任何tcp監聽fd存在, abort
if (server.ipfd_count == 0 && server.sofd < 0) {
//...
exit(1);
}
// 初始化資料庫陣列中的各個資料庫
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType, NULL); // 初始化鍵空間
server.db[j].expires = dictCreate(&keyptrDictType, NULL); // 初始化有時效的鍵字典. key == 鍵, value == 過期時間戳
server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);// 初始化阻塞鍵字典.
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType, NULL); // 初始化解除阻塞鍵字典
server.db[j].watched_keys = dictCreate(&keylistDictType, NULL); // 初始化與事務相關的一個字典
server.db[j].id = j; // 初始化資料庫的編號
server.db[j].avg_ttl = 0; // 初始化統計資料
}
// 其它一大堆細節操作
// ...
// 向事件處理器中加一個定時回撥, 這個回撥中處理了很多雜事. 比如清理過期客戶端連線, 過期key等
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
//...
exit(1);
}
// 將tcp與unix監聽fd新增到事件處理器中
// 回撥函式分別為: acceptTcpHandler和acceptUnixHandler
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.ipfd file event");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.sofd file event.");
}
// 註冊一個額外的事件回撥: 用於喚醒一個被module操作阻塞的客戶端
// ...
// 如有需要, 開啟AOF檔案
// ...
// 對32位機器, 且沒有配置maxmemory時, 自動設定為3GB
// ...
// 其它與單機資料庫無關的操作
// ...
}
以上就是單機資料庫的啟動流程
總結:
- Redis服務端程序中, 以
server
這個全域性變數來描述服務端 server
全域性變數中, 持有著所有資料庫的指標(server.db
), 一個事件處理器(server.el
), 與所有與其連線的客戶端(server.clients
), 監聽的tcp/unix socket fd(server.ipfd, server.sofd
)- 在Redis服務端啟動過程中, 會初始化監聽的tcp/unix socket fd, 並把它們加入到事件處理器中, 相應的事件回撥分別為
acceptTcpHandler
與acceptUnixHandler
5. 客戶端與Redis服務端建立連線的過程
上面已經講了Redis單機資料庫服務端的啟動過程, 下面來看一看, 當一個客戶端通過tcp連線至服務端時, 服務端會做些什麼. 顯然要從listening tcp socket fd在server.el
中的事件回撥開始看起.
邏輯上來講, Redis服務端需要做以下幾件事:
- 建立連線, 接收請求
- 協議互動, 命令處理
這裡我們先只看建立連線部分, 下面是acceptTcpHandler
函式, 即listening tcp socket fd的可讀事件回撥:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 建立資料連線
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0,cip); // 處理請求
}
}
這個函式中只做了兩件事:
- 建立tcp資料連線
- 呼叫
acceptCommonHandler
下面是acceptCommonHandler
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 建立一個client例項
if ((c = createClient(fd)) == NULL) {
// ...
return;
}
// 如果設定了連線上限, 並且當前資料連線數已達上限, 則關閉這個資料連線, 退出, 什麼也不做
if (listLength(server.clients) > server.maxclients) {
//...
return;
}
// 如果服務端執行在保護模式下, 並且沒有登入密碼機制, 那麼不接受外部tcp請求
if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
// ...
return;
}
}
server.stat_numconnections++;
c->flags |= flags;
}
這裡可以看到, 對每一個數據連線, Redis中將其抽象成了一個 struct client
結構, 所有建立struct client
例項的細節被隱藏在createClient
函式中, 下面是createClient
的實現:
client *createClient(int fd) {
client * c = zmalloc(sizeof(client))
if (fd != -1) {
anetNonBlock(NULL, fd); // 設定非阻塞
anetEnableTcpNoDelay(NULL, fd); // 設定no delay
if (server.tcpkeepalive) anetKeepAlive(NULL, fd, server.tcpkeepalive); // 按需設定keep alive
if (aeCreateFileEvent(server.el, fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) { // 註冊事件回撥
close(fd); zfree(c); return NULL;
}
}
selectDb(c, 0) // 預設選擇第一個資料庫
uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1); // 原子自增 server.next_client_id
// 其它欄位初始化
c->id = client_id;
//...
// 把這個client例項新增到 server.clients尾巴上去
if (fd != -1) listAddNodeTail(server.clients,c);
return c;
}
6 Redis通訊協議
6.1 通訊方式
客戶端與伺服器之間的通訊方式有兩種
- 一問一答式的(request-response pattern). 由客戶端發起請求, 服務端處理邏輯, 然後回發回應
- 若客戶端訂閱了服務端的某個channel, 這時通訊方式為單向的傳輸: 服務端會主動向客戶端推送訊息. 客戶端無需發起請求
在單機資料庫範疇中, 我們先不討論Redis的釋出訂閱模式, 即在本文的討論範圍內, 客戶端與伺服器的通訊始終是一問一答式的. 這種情況下, 還分為兩種情況:
- 客戶端每次請求中, 僅包含一個命令. 服務端在迴應中, 也僅給出這一個命令的執行結果
- 客戶端每次請求中, 包含多個有序的命令. 服務端在迴應中, 按序給出這多個命令的執行結果
後者在官方文件中被稱為 pipelining
6.2 通訊協議
自上向下描述協議:
- 協議描述的是
資料
資料
有五種型別. 分別是短字串
,長字串
,錯誤資訊
,數值
,陣列
. 其中陣列
是複合型別. 協議中, 資料與資料之間以\r\n
兩個位元組作為定界符短字串
:+Hello World\r\n
以+
開頭, 末尾的\r\n
是定界符, 所以顯然, 短字串本身不支援表示\r\n
, 會和定界符衝突錯誤資訊
:-Error message\r\n
以-
開頭, 末尾的\r\n
是定界符. 顯然, 錯誤資訊裡也不支援包含\r\n
數值
::9527\r\n
以:
開頭, ASCII字元表示的數值, 僅支援表示整數長字串
:$8\r\nfuck you\r\n
先以$
開頭, 後面跟著字串的總長度, 以ASCII字元表示. 再跟一個定界符, 接下來的8個位元組就是長字串的內容, 隨後再是一個定界符.陣列
: 這是一個複合型別, 以*
開頭, 後面跟著陣列的容量, 以ASCII字元表示, 再跟一個定界符. 然後後面跟著多個其它資料. 比如*2\r\n$3\r\nfoo\r\n+Hello World\r\n
表示的是一個容量為2的陣列, 第一個陣列元素是一個長字串, 第二個陣列元素是一個短字串.
注意:
- 一般情況下, 協議體是一個數組
- 長字串在表示長度為0的字串時, 是
$0\r\n\r\n
- 還有一個特殊值,
$-1\r\n
, 它用來表示語義中的null
協議本身是支援二進位制資料傳輸的, 只需要將二進位制資料作為長字串傳輸就可以. 所以這並不能算是一個嚴格意義上的字元協議. 但如果傳輸的資料多數是可閱讀的資料的話, 協議本身的可讀性是很強的
6.3 請求體
客戶端向服務端傳送請求時, 語法格式基本如下:
6.3.1 單命令請求
單命令請求體, 就是一個數組, 陣列中將命令與引數各作為陣列元素儲存, 如下是一個 LLEN mylist
的請求
*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
這個陣列容量為2, 兩個元素均為長字串, 內容分別是LLEN
與mylist
6.3.2 多命令請求(pipelining request)
多命令請求體, 直接把多個單命令請求體拼起來就行了, 每個命令是一個數組, 即請求體中有多個數組
6.3.3 內聯命令
如果按協議向服務端傳送請求, 可以看到, 請求體的第一位元組始終要為*
. 為了方便一些弱客戶端與伺服器互動, REDIS支援所謂的內聯命令. 內聯命令僅是請求體的另一種表達方式.
內聯命令的使用場合是:
- 手頭沒有
redis-cli
- 手寫請求體太麻煩
- 只想做一些簡單操作
這種情況下, 直接把適用於redis-cli
的字元命令, 傳送給服務端就行了. 內聯命令支援通過telnet傳送.
6.4 迴應體
Redis服務端接收到任何一個有效的命令時, 都會給服務端寫回應. 請求-迴應的最小單位是命令, 所以對於多命令請求, 服務端會把這多個請求的對應的迴應, 按順序返回給客戶端.
迴應體依然遵守通訊協議規約.
6.5 手擼協議示例
單命令請求
[[email protected] ~]# echo -e '*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379
:2
[[email protected] ~]#
多命令請求
[[email protected] ~]# echo -e '*1\r\n$4\r\nPING\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379
+PONG
:4
:5
:6
[[email protected] ~]#
內聯命令請求
[[email protected] ~]# echo -e 'PING' | nc localhost 6379
+PONG
[[email protected] ~]# echo -e 'INCR counter' | nc localhost 6379
:7
[[email protected] ~]# echo -e 'INCR counter' | nc localhost 6379
:8
[[email protected] ~]#
通過telnet傳送內聯命令請求
[[email protected] ~]# telnet localhost 6379
Trying ::1...
Connected to localhost.
Escape character is '^]'.
PING
+PONG
INCR counter
:9
INCR counter
:10
INCR counter
:11
^]
telnet> quit
Connection closed.
[[email protected] ~]#
6.6 協議總結及注意事項
優點:
- 可讀性很高, 也可以容納二進位制資料(長字串型別)
- 協議簡單, 解析容易, 解析效能也很高. 得益於各種資料型別頭位元組的設計, 以及長字串型別自帶長度欄位, 所以解析起來基本飛快, 就算是二流程式設計師寫出來的協議解析器, 跑的也飛快
- 支援多命令請求, 特定場合下充分發揮這個特性, 可以提高命令的吞吐量
- 支援內聯命令請求. 這種請求雖然限制頗多(不支援二進位制資料型別, 不支援
\r\n
這種資料, 不支援多命令請求), 但對於運維人員來說, 可讀性更高, 對閱讀更友好
注意事項:
- 非內聯命令請求, 請求體中的每個資料都是長字串型別. 即便是在表達數值, 也需要寫成長字串形式(數值的ASCII表示). 這給服務端解析協議, 以及區分請求命令是否為內聯命令帶來了很大的便利
- 短字串, 數值, 錯誤資訊這三種資料型別, 僅出現在迴應體中
7. 服務端處理請求及寫回應的過程
當客戶端與服務端建立連線後, 雙方就要進行協議互動. 簡單來說就是以下幾步:
- 客戶端傳送請求
- 服務端解析請求, 進行邏輯處理
- 服務端寫回包
顯然, 從三層(tcp或unix)上接收到的資料, 首先要解析成協議資料, 再進一步解析成命令, 服務端才能進行處理. 三層上的協議是流式協議, 一個請求體可能要分多次才能完整接收, 這必然要涉及到一個接收緩衝機制. 先來看一看struct client
結構中與接收緩衝機制相關的一些欄位:
type struct client {
uint64_t id; /* Client incremental unique ID. */ // 客戶端ID, 自增, 唯一
int fd; /* Client socket. */ // 客戶端資料連線的底層檔案描述符
redisDb *db; /* Pointer to currently SELECTed DB. */ // 指向客戶端當前選定的DB
robj *name; /* As set by CLIENT SETNAME. */ // 客戶端的名稱. 由命令 CLIENT SETNAME 設定
sds querybuf; /* Buffer we use to accumulate client queries. */ // 請求體接收緩衝區
sds pending_querybuf; /* If this is a master, this buffer represents the // cluster特性相關的一個緩衝區, 暫忽視
yet not applied replication stream that we
are receiving from the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ // 近期(100ms或者更長時間中)接收緩衝區大小的峰值
int argc; /* Num of arguments of current command. */ // 當前處理的命令的引數個數
robj **argv; /* Arguments of current command. */ // 當前處理的命令的引數列表
struct redisCommand *cmd, *lastcmd; /* Last command executed. */ // 當前正在處理的命令字, 以及上一次最後執行的命令字
int reqtype; /* Request protocol type: PROTO_REQ_* */ // 區分客戶端是否以內聯形式上載請求
int multibulklen; /* Number of multi bulk arguments left to read. */ // 命令字剩餘要讀取的引數個數
long bulklen; /* Length of bulk argument in multi bulk request. */ // 當前讀取到的引數的長度
} client;
7.1 通過事件處理器, 在傳輸層獲取客戶端傳送的資料
我們先從巨集觀上看一下, 服務端是如何解析請求, 並進行邏輯處理的. 先從服務端資料連線的處理回撥readQueryFromClient
看起:
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN;
// 對於大引數的優化
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
// 如果讀取到了一個超大引數, 那麼先把這個超大引數讀進來
// 保證緩衝區中超大引數之後沒有其它資料
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
// 接收資料
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
// ...
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
// client的緩衝區中資料如果超長了, 最有可能的原因就是服務端處理能力不夠, 資料堆積了
// 或者是受惡意攻擊了
// 這種情況下, 幹掉這個客戶端
// ...
return;
}
// 處理緩衝區的二進位制資料
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
// cluster特性相關的操作, 暫時忽略
// ...
}
}
這裡有一個優化寫法, 是針對特大引數的讀取的. 在介紹這個優化方法之前, 先大致的看一下作為單機資料庫, 服務端請求客戶端請求的概覽流程(你可能需要在閱讀完以下幾節之後再回頭來看這裡):
一般情況下, 請求體的長度不會超過16kb(即使包括多個命令), 那麼處理流程(假設請求是協議資料, 而非內聯命令)是這樣的:
而正常情況下, 一個請求體的長度不應該超過16kb, 而如果請求體的長度超過16kb的話, 有下面兩種可能:
- 請求中的命令數量太多, 比如平均一個命令及其引數, 佔用100位元組, 而這個請求體中包含了200個命令
- 請求中命令數量不多, 但是有一個命令攜帶了超大引數, 比如, 只是一個
SET
單命令請求, 但其值引數是一個長度為40kb的二進位制資料
附帶多個命令, 導致請求體超過16kb時
在第一種情況下, 如果滿足以下條件的話, 這個請求會被服務端當作兩個請求去處理:
初次資料連線的可讀回撥接收了16kb的資料, 這16kb的資料恰好是多個請求. 沒有請求被截斷! 即是一個20kb的請求體, 被天然的拆分成了兩個請求, 第一個請求恰好16kb, 一位元組不多, 一位元組不少, 第二個請求是剩餘的4kb. 那麼服務端看來, 這就是兩個多命令請求
這種情況極其罕見, Redis中沒有對這種情況做處理.
一般情況下, 如果傳送一個20kb的多命令請求, 且請求中沒有超大引數的話(意味著命令特別多, 上百個), 那麼總會有一個命令被截斷, 如果被階段, 第一次的處理流程就會在處理最後一個被截斷的半截命令時, 在如圖紅線處流程終止, 而僅在第二次處理流程, 即處理剩餘的4kb資料時, 才會走到藍線處. 即兩次收包, 一次寫回包:
附帶超大引數, 導致請求體超過16kb
在第二種情況下, 由於某幾個引數特別巨大(大於PROTO_MBULK_BIG_ARG
巨集的值, 即32kb), 導致請求體一次不能接收完畢. 勢必也要進行多次接收. 本來, 樸素的思想上, 只需要在遇到超大引數的時候, 調整16kb這個閾值即可. 比如一個引數為40kb的二進位制資料, 那麼在接收到這個引數時, 破例呼叫read
時一次性把這個引數讀取完整即可. 除此外不需要什麼特殊處理.
即是, 處理流程應當如下:
但這樣會有一個問題: 當在processMultiBulkBuffer
中, 將超大引數以string物件的形式放在client->argv
中時, 建立這個string物件, 底層是拷貝操作:
int processMultibulkBuffer(client * c) {
// ....
c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen);
// ....
}
對於超大引數來說, 這種string物件的建立過程, 涉及到的記憶體分配與資料拷貝開銷是很昂貴的. 為了避免這種資料拷貝, Redis把對於超大引數的處理優化成了如下:
7.2 對二進位制資料進行協議解析
從傳輸層獲取到資料之後, 下一步就是傳遞給processInputBuffer
函式, 來進行協議解析, 如下:
void processInputBuffer(client * c) {
server.current_client = c;
// 每次迴圈, 解析並執行出一個命令字(及其引數), 並執行相應的命令
// 直到解析失敗, 或緩衝區資料耗盡為止
while(sdslen(c->querybuf)) {
// 在某些場合下, 不做處理
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (c->flag & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
// 如果client的reqtype欄位中沒有標明該客戶端傳遞給服務端的請求型別(是內聯命令, 還是協議資料)
// 那麼分析緩衝區中的第一個位元組來決定. 這一般發生在客戶端向服務端傳送第一個請求的時候
if (!c->reqtype) {
if (c->querybuf[0] == '*') c->reqtype = PROTO_REQ_MULTIBULK;
else c->reqtype = PROTO_REQ_INLINE;
}
// 按型別解析出一個命令, 及其引數
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknow request type")
}
// 執行命令邏輯
if (c->argc == 0) {
// 當argc的值為0時, 代表本次解析沒有解析出任何東西, 資料已耗盡
resetClient(c);
} else {
// 執行邏輯
if(processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) {
resetClient(c);
}
}
if (server.current_client == NULL) break;
}
}
server.current_client = NULL;
}
這一步做了分流處理, 如果是內聯命令, 則通過processInlineBuffer
進行協議解析, 如果不是內聯命令, 則通過processMultibulkBuffer
進行協議解析. 協議解析完成後, 客戶端上載的資料就被轉換成了command, 然後接下來呼叫processCommand
來執行服務端邏輯.
我們略過內聯命令的解析, 直接看非內聯命令請求的協議解析過程
int processMultibulkBuffer(client * c) {
//...
// 解析一個請求的頭部時, 即剛開始解析一個請求時, multibulklen代表著當前命令的命令+引數個數, 該欄位初始值為0
// 以下條件分支將讀取一個命令的引數個數, 比如 '*1\r\n$4\r\nPING\r\n'
// 以下分支完成後, 將有:
// c->multibulklen = 1
// c->argv = zmalloc(sizeof(robj*))
// pos = <指向$>
if (c->multibulklen == 0) {
// ...
// 讀取當前命令的引數個數, 即賦值c->multibulklen
newline = strchr(c->querybuf, '\r');
if (newline == NULL) {
// 請求體不滿足協議格式
// ...
return C_ERR:
}
// ...
ok = string2ll(c->querybuf+1, newline-(c->querybuf+1), &ll)
if (!ok || ll > 1024 * 1024) {
// 解析數值出錯, 或數值大於1MB
return C_ERR;
}
// 現在pos指向的是命令字
pos = (newline-c->querybuf) + 2
if (ll <= 0) {
// 解出來一個空命令字, 什麼也不做, 把緩衝區頭部移除掉, 並且返回成功
sdsrange(c->querybuf, pos, -1);
return C_OK;
}
c->multibulklen = ll;
// 為命令字及其引數準備儲存空間: 將它們都儲存在c->argv中, 且是以redisObject的形式儲存著
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}
// ...
// 讀取解析命令字本身與其所有引數
while(c->multibulklen) {
// 讀取當前長字串的長度
if (c->bulklen == -1) {
// ...
// 如果當前指標指向的引數, 是一個超大引數, 則把接收緩衝區修剪為該引數
pos += newline-(c->querybuf+pos) + 2; // 現在pos指向的是引數, 或命令字的起始位置
if (ll >= PROTO_MBULK_BIG_ARG) {
size_t qblen;
sdsrange(c->querybuf, pos, -1); // 將接收緩衝區修剪為該引數
pos = 0;
qblen = sdslen(c->querybuf);
if (qblen < (size_t)ll + 2) {
// 如果這個超大引數還未接收完畢, 則擴充接收緩衝區, 為其預留空間
c->querybuf = sdsMakeRoomFor(c->querybuf, ll+2-qblen);
}
}
c->bulklen == ll;
}
// 讀取長字串內容, 即是命令字本身, 或某個引數
if (sdslen(c->querybuf) - pos < (size_t)(c->bulklen+2)) {
// 當前緩衝區中, 該長字串的內容尚未接收完畢, 跳出解析
break;
} else {
if (pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) {
// 對於超大引數, 為了避免資料複製的開銷, 直接以接收緩衝區為底料, 建立一個字串物件
// 而對於接收緩衝區, 為其重新分配記憶體
// 注意, 如果當前指標指向的是一個超大引數, 接收機制保證了, 這個超大引數之後, 緩衝區沒有其它資料
c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf)
sdsIncrLen(c->querybuf, -2); // 移除掉超大引數末尾的\r\n
c->querybuf = sdsnewlen(NULL, c->bulklen + 2); // 為接收緩衝區重新分配記憶體
sdsclear(c->querybuf);
pos = 0;
} else {
// 對於普通引數或命令字, 以拷貝形式新建字串物件
c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen);
pos += c->bulklen+2; // pos指向下一個引數
}
c->bulklen = -1;
c->multibulklen--;
}
}
// 至此, 一個命令, 及其所有引數, 均被儲存在 c->argv中, 以redisObject形式儲存著, 物件的型別是string
// 扔掉所有已經解析過的二進位制資料
if (pos) sdsrange(c->querybuf, pos, -1);
if (c->multibulklen == 0) return C_OK
return C_ERR;
}
7.3 命令的執行
一個命令及其所有引數如果被成功解析, 則processMultibulkBuffer
函式會返回C_OK
, 並在隨後呼叫processCommand
執行這個命令. processCommand
內部包含了將命令的迴應寫入c->buf
的動作(封裝在函式addReply
中. 如果命令或命令中的引數在解析過程出錯, 或引數尚未接收完畢, 則會返回C_ERR
, 退棧, 重新返回, 直至再次從傳輸層讀取資料, 補齊所有引數, 再次回到這裡.
接下來我們來看執行命令並寫回包的過程, processCommand
中的關鍵程式碼如下:
int processCommand(client * c) {
if (!strcasecmp(c->argv[0]->ptr, "quit")) {
// 對 quit 命令做單獨處理, 呼叫addReply後, 返回
addReply(c, shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
// 拿出c->argv[0]中儲存的string物件中的ptr欄位(其實是個sds字串)
// 與server.commands中的命令字表進行比對, 查找出對應的命令, 即 struct redisCommand* 控制代碼
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 查無該命令, 進行錯誤處理
flagTransaction(c); // 如果這是一個在事務中的命令, 則將 CLIENT_DIRTY_EXEC 標誌位貼到 c-flags 中, 指示著事務出錯
addReplyErrorFormat(c, "unknown command '%s'", (char *)c->argv[0]->ptr)
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) {
// 對比命令所需要的引數數量, 是否和argc中的數量一致, 如果不一致, 說明錯誤
flagTransaction(c); // 依然同上, 事務出錯, 則打 CLIENT_DIRTY_EXEC 標誌位
addReplyErrorFormat(c, "wrong number of arguments for '%s' command", c->cmd->name);
return C_OK;
}
// 檢查使用者的登入態, 即在服務端要求登入認證時, 若當前使用者還未認證, 且當前命令不是認證命令的話, 則報錯
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
flagTransaction(c);
addReply(c, shared.noautherr);
return C_OK;
}
// 與叢集相關的程式碼
// 如果叢集模式開啟, 則通常情況下需要將命令重定向至其它例項, 但在以下兩種情況下不需要重定向:
// 1. 命令的傳送者是master
// 2. 命令沒有鍵相關的引數
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) {
int hashslot, error_code;
clusterNode * n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
if (n == NULL || n != server.cluster->myseld) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c, n, hashslot, error_code); // 重定向
return C_OK;
}
}
// 在正式執行命令之前, 首先先嚐試的放個屁, 擠點記憶體出來
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if (server.current_client == NULL) return C_ERR;
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR ) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
// 在持久化出錯的情況下, master不接受寫操作
if (
(
(server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR
) &&
server.masterhost == NULL &&
(c->cmdd->flags & CMD_WRITE || c->cmd->proc == pingCommand)
) {
flagTransaction(c);
if (server.aof_last_write_status == C_OK) {
addReply(c, shared.bgsaveerr);
} else {
addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno)));
}
return C_OK;
}
// 還有幾種不接受寫操作的場景, 這裡程式碼省略掉, 這幾種場景包括:
// 1. 使用者配置了 min_slaves-to_write配置項, 而符合要求的slave數量不達標
// 2. 當前是一個只讀slave
// ...
// 在釋出-訂閱模式中, 僅支援 (P)SUBSCRIBE/(P)UNSUBSCRIBE/PING/QUIT 命令
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand
) {
addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
// 當用戶配置slave-serve-stale-data為no, 且當前是一個slave, 且與master中斷連線的情況下, 僅支援 INFO 和 SLAVEOF 命令
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_server_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) {
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}
// 如果當前正在進行loading操作, 則僅接受 LOADING 相關命令
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}
// 如果是lua指令碼操作, 則支援受限數量的命令
// ...
// 終於到了終點: 執行命令
// 對於事務中的命令: 進佇列, 但不執行
// 對於事務結束標記EXEC: 呼叫call, 執行佇列中的所有命令
// 對於非事務命令: 也是呼叫call, 就地執行
if (
c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != watchCommand) {
// 如果是事務, 則將命令先排進佇列
queueMultiCommand(c);
addReply(c, shared.queued);
} else {
// 如果是非事務執行, 則直接執行
call (c, CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys)) {
handleClientsBlockedOnLists();
}
}
return C_OK;
}
需要注意的點有:
- 服務端全域性變數
server
中, 持有著一個命令表, 它本身是一個dict結構, 其key是為字串, 即Redis命令的字串, value則是為redisCommand
結構, 描述了一個Redis中的命令. 可以看到processCommand
一開始做的第一件事, 就是用c->argv[0]->ptr
去查詢對應的redisCommand
控制代碼 - Redis中的事務是以一個
MULTI
命令標記開始, 由EXEC
命令標記結束的. 如果一個命令處於事務之中, 那麼processCommand
內部並不會真正執行這個命令, 僅當最終EXEC
命令來臨時, 將事務中的所有命令全部執行掉. 而實際執行命令的函式, 是為call
, 這是Redis命令執行流程中最核心的一個函式. - 客戶端可以以pipelining的形式, 通過一次請求傳送多個命令, 但這不是事務, 如果沒有將命令包裹在
MULTI
和EXEC
中, 那麼這多個命令其實是一個個就地執行的.
有關事務的更詳細細節我們會在稍後討論, 但目前, 我們先來看一看這個最核心的call
函式, 以下程式碼隱藏了無關細節
void call(client * c, int flags) {
// ...
// 與主從複製相關的程式碼: 將命令分發給所有MONITOR模式下的從例項
// ...
// 呼叫命令處理函式
// ...
c->cmd->proc(c);
// ...
// AOF, 主從複製相關程式碼
// ...
}
撥開其它特性不管, 單機資料庫在這一步其實很簡單, 就是呼叫了對應命令字的回撥函式來處理
我們挑其中一個命令SET
來看一下, 命令處理回撥中都幹了些什麼:
void setCommand(client * c) {
int j;
robj * expire = NULL;
int uint = UINT_SECONDS;
int flags = OBJ_SET_NO_FLAGS;
for (j = 3; i < c->argc; j++) {
// 處理SET命令中的額外引數
// ...
}
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL)
}
void setGenericCommand(client * c, int flags, robj * key, robj, * val, robj * expire, int unit, robj * ok_reply, robj * abort_reply) {
long long milliseconds = 0;
if (expire) {
// 從string物件中讀取數值化的過期時間
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return;
if (milliseconds <= 0){
addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
if (
(flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)
) {
// NX 與 XX 時, 判斷鍵是否存在
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
// 執行所謂的set操作, 即把鍵值對插入至資料庫鍵空間去, 如果鍵已存在, 則替換值
setKey(c->db, key, val);
server.dirty++; // 統計資料更新
// 設定過期時間
if(expire) setExpire(c, c->db, key, kstime() + milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
void setKey(redisDb * db, robj * key, robj * val) {
if (lookupKeyWrite(db, key) == NULL) {
dbAdd(db, key, val); // 對 dictAdd 的封裝
} else {
dbOverwrite(db, key, val); // 對 dictReplace的封裝
}
incrRefCount(val);
removeExpire(db, key);
signalModifiedKey(db, key);
}
從 setCommand
->setGenericCommand
->setKey
->dictXXX
這樣一路追下來, 可以看到最終的操作, 是對於某個資料庫例項中, 鍵空間的操作. 邏輯比較清晰. 其它的命令也基本類似, 鑑於Redis中的命令眾多, 這裡沒有篇幅去一個一個的介紹, 也沒有必要.
至此, 使用者的命令經由服務端接收, 解析, 以及執行在了某個資料庫例項上. 我們在一路跟程式碼的過程中, 隨處可見的錯誤處理中都有addReplyXXX
系列的函式呼叫, 在SET
命令的最終執行中, 在setGenericCommand
中, setKey
操作成功後, 也會呼叫addReply
. 顯然, 這個函式是用於將服務端的迴應資料寫入到某個地方去的.
從邏輯上來講, 服務端應當將每個命令的迴應寫入到一個緩衝區中, 然後在適宜的時刻, 序列化為迴應體(符合協議規約的二進位制資料), 經由網路IO回發給客戶端. 接下來, 我們就從addReply
入手, 看一看回包的流程.
7.4 寫回包流程
addReply
系列函式眾多, 有十幾個, 其它函式諸如addReplyBulk
, addReplyError
, addReplyErrorFormat
等, 只是一些簡單的變體. 這一系列函式完成的功能都是類似的: 總結起來就是兩點:
- 將回包資訊寫入回包緩衝區
addReply
如下:
void addReply(client * c, robj * obj) {
// 檢查客戶端是否可以寫入回包資料, 這個函式在每個 addReplyXXX 函式中都會被首先呼叫
// 它的職責有:
// 0. 多數情況下, 對於正常的客戶端, 它會返回 C_OK. 並且如果該客戶端的可寫事件沒有在事件處理器中註冊回撥的話, 它會將這個客戶端先掛在 server.clients_pending_write 這個連結串列上
// 0. 在特殊情況下, 對於不能接收回包的客戶端(比如這是一個假客戶端