曹工說Redis原始碼(3)-- redis server 啟動過程完整解析(中)
文章導航
Redis原始碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟著下面的這一篇,把環境搭建起來,後續可以自己閱讀原始碼,或者跟著我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。
曹工說Redis原始碼(1)-- redis debug環境搭建,使用clion,達到和除錯java一樣的效果
曹工說Redis原始碼(2)-- redis server 啟動過程解析及簡單c語言基礎知識補充
本講主題
首先,會再補充一點c語言中,指標的相關知識;接下來,開始接著昨天的那篇,講redis的啟動過程,由大到小來講,避免迅速陷入到細節中。
關於指標的理解
指標,其實就是指向一個記憶體地址,在知道這個地址前後儲存的內容的前提下,這個指標可以被你任意解釋。我舉個例子:
typedef struct Test_Struct{ int a; int b; }Test_Struct; int main() { // 1 void *pVoid = malloc(4); // 2 memset(pVoid,0x01,4); // 3 int *pInt = pVoid; // 4 char *pChar = pVoid; // 5 short *pShort = pVoid; // 6 Test_Struct *pTestStruct = pVoid; // 7 printf("address:%p, point to %d\n", pChar, *pChar); printf("address:%p, point to %d\n", pShort, *pShort); printf("address:%p, point to %d\n", pInt, *pInt); printf("address:%p, point to %d\n", pTestStruct, pTestStruct->a); }
-
1處,分配一片記憶體,4個位元組,32位;返回一個指標,指向這片記憶體區域,準確地說,指向第一個位元組,因為分配的記憶體是連續的,你可以理解為陣列。
The malloc() function allocates size bytes and returns a pointer to the allocated memory.
-
2處,呼叫memset,將這個pVoid 指向的記憶體開始的4個位元組,設定為0x01,其實就是把每個位元組設定為00000001。
這個memset的註釋如下:
NAME memset - fill memory with a constant byte SYNOPSIS #include <string.h> void *memset(void *s, int c, size_t n); DESCRIPTION The memset() function fills the first n bytes of the memory area pointed to by s with the constant byte c.
參考資料: https://www.cnblogs.com/yhlboke-1992/p/9292877.html
這裡我們把每個位元組,設為0x01,最終的二進位制,其實就是如下這樣:
-
3處,定義int型別的指標,將pVoid賦值給它,int佔4位元組
-
4處,定義char型別的指標,將pVoid賦值給它,char佔1位元組
-
5處,定義short型別的指標,將pVoid賦值給它,short佔2位元組
-
6處,定義Test_Struct型別的指標,這是個結構體,類似於高階語言的類,這個結構體的結構如下:
typedef struct Test_Struct{ int a; int b; }Test_Struct;
同樣,我們將pVoid賦值給它。
-
7處,分別列印各類指標的地址,和對其解引用後的值。
輸出如下:
257的二進位制就是:0000 0001 0000 0001
16843009的二進位制就是:0000 0001 0000 0001 0000 0001 0000 0001
結構體那個,也好理解,因為這個結構體,第一個屬性a,就是int型別的,佔4個位元組。
另外,大家要注意,上面輸出的指標地址都是一模一樣的。
如果大家能理解這個demo,再看看這個連結,相信會更加理解指標:
C 指標的算術運算
redis server大致的啟動過程
int main(int argc, char **argv) {
struct timeval tv;
/**
* 1 設定時區等等
*/
setlocale(LC_COLLATE,"");
...
// 2 檢查伺服器是否以 Sentinel 模式啟動
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 3 初始化伺服器配置
initServerConfig();
// 4
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 5 檢查使用者是否指定了配置檔案,或者配置選項
if (argc >= 2) {
...
// 載入配置檔案, options 是前面分析出的給定選項
loadServerConfig(configfile,options);
sdsfree(options);
}
// 6 將伺服器設定為守護程序
if (server.daemonize) daemonize();
// 7 建立並初始化伺服器資料結構
initServer();
// 8 如果伺服器是守護程序,那麼建立 PID 檔案
if (server.daemonize) createPidFile();
// 9 為伺服器程序設定名字
redisSetProcTitle(argv[0]);
// 10 列印 ASCII LOGO
redisAsciiArt();
// 11 如果伺服器不是執行在 SENTINEL 模式,那麼執行以下程式碼
if (!server.sentinel_mode) {
// 從 AOF 檔案或者 RDB 檔案中載入資料
loadDataFromDisk();
// 啟動叢集
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == REDIS_ERR) {
redisLog(REDIS_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
// 列印 TCP 埠
if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
} else {
sentinelIsRunning();
}
// 12 執行事件處理器,一直到伺服器關閉為止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
// 13 伺服器關閉,停止事件迴圈
aeDeleteEventLoop(server.el);
return 0;
}
-
1,2,3處,在前面那篇中已經講過,主要是初始化各種配置引數,比如socket相關的;redis.conf中涉及的,aof,rdb,replication,sentinel等;redis server自己內部的資料結構等,如runid,配置檔案地址,伺服器的相關資訊(32位還是64位,因為redis直接執行在作業系統上,而不是像高階語言有虛擬機器,32位和64位下,不同資料的長度是不同的),日誌級別,最大客戶端數量,客戶端最大idle時間等等
-
4處,因為sentinel和普通的redis server其實是共用同一份程式碼,所以這裡啟動時,要看是啟動sentinel,還是啟動普通的redis server,如果是啟動sentinel,則進行sentinel相關配置
-
5處,檢查啟動時的命令列引數中,是否指定了配置檔案,如果指定了,要使用配置檔案的配置為準
-
6處,設定為守護程序
-
7處,根據前面的配置,初始化redis server
-
8處,建立pid檔案,一般預設路徑:/var/run/redis.pid,這個可以在redis.conf進行配置,如:
pidfile "/var/run/redis_6379.pid"
-
9處,為伺服器程序設定名字
-
10處,列印logo
-
11處,如果不是sentinel模式啟動的話,載入aof或rdb檔案
-
12處,跳入死迴圈,開始等待接收連線,處理客戶端的請求;同時,週期執行後臺任務,比如刪除過期key等
-
13處,伺服器關閉,一般來說,走不到這裡,一般都是陷入在12處的死迴圈中;只有在某些場景下,將一個全域性變數stop修改為true後,程式會從12處跳出死迴圈,然後走到這裡。
初始化redis server的過程
這一節,主要是細化前面的第7步操作,即初始化redis server。這一個函式,位於redis.c中,名為initServer,做的事情很多,接下來會順序講解。
設定全域性的訊號處理函式
// 設定訊號處理函式
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
最重要的是最後一行:
void setupSignalHandlers(void) {
// 1
struct sigaction act;
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
* Otherwise, sa_handler is used. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
// 2
act.sa_handler = sigtermHandler;
// 3
sigaction(SIGTERM, &act, NULL);
return;
}
3處,設定了:接收到SIGTERM訊號時,使用act
來處理訊號,act在1處定義,是一個區域性變數,它有一個欄位,在2處被賦值,這是一個函式指標。函式指標類似於java中的一個static方法的引用,為什麼是static,因為執行這類方法不需要new一個物件;在c語言中,所有的方法都是最頂級的,呼叫時,不需要new一個物件;所以,從這點來說,c語言的函式指標類似java中的static方法的引用。
我們可以看看2處,
act.sa_handler = sigtermHandler;
這個sigtermHandler,應該就是一個全域性函數了,看看其怎麼被定義的:
// SIGTERM 訊號的處理器
static void sigtermHandler(int sig) {
REDIS_NOTUSED(sig);
redisLogFromHandler(REDIS_WARNING,"Received SIGTERM, scheduling shutdown...");
// 開啟關閉標識
server.shutdown_asap = 1;
}
這個函式就是開啟server這個全域性變數的shutdown_asap。這個欄位在以下地方被使用:
serverCron in redis.c
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// 伺服器程序收到 SIGTERM 訊號,關閉伺服器
if (server.shutdown_asap) {
// 嘗試關閉伺服器
if (prepareForShutdown(0) == REDIS_OK) exit(0);
// 如果關閉失敗,那麼列印 LOG ,並移除關閉標識
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
以上這段程式碼的第一行,標識了這段程式碼所處的位置,為redis.c中的serverCron函式,這個函式,就是redis server的週期執行函式,類似於java中的ScheduledThreadPoolExecutor,當這個週期任務,檢測到server.shutdown_asap開啟後,就會去關閉伺服器。
那,上面這個接收到訊號,要執行的動作說完了,那麼,什麼是訊號,訊號其實是linux下程序間通訊的一種手段,比如kill -9 ,就會給對應的pid,傳送一個SIGKILL 命令;在redis前臺執行時,你按下ctrl + c,其實也是傳送了一個訊號,訊號為SIGINT,值為2。大家可以看下圖:
那麼,前面我們註冊的訊號是哪個呢,是:SIGTERM,15。也就是我們按下kill -15時,會觸發這個訊號。
關於kill 9 和kill 15的差別,可以看這篇部落格:
Linux kill -9 和 kill -15 的區別
開啟syslog
// 設定 syslog
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
這個就是傳送日誌到linux系統的syslog,可以看看openlog函式的說明:
send messages to the system logger
這個感覺用得不多,可以查閱:
redis 的syslog日誌沒有打印出來的探索過程
初始化當前redisServer的部分屬性
// 初始化並建立資料結構
server.current_client = NULL;
// 1
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
這個其實沒啥說的,大家看到,比如1處,這個server.clients,server是一個全域性變數,維護當前redis server的各種狀態,clients呢,是用來儲存當前連線到redis server的客戶端,型別為連結串列:
// 一個連結串列,儲存了所有客戶端狀態結構
list *clients; /* List of active clients */
所以,這裡其實就是呼叫listCreate()
,建立了一個空連結串列,然後賦值給clients。
其他屬性,類似。
建立常量字串池,供複用
大家知道,redis在返回響應的時候,通常就是一句:"+OK"之類的。這個字串,如果每次響應的時候,再去new一個,也太浪費了,所以,乾脆,redis自己把這些常用的字串,快取了起來。
void createSharedObjects(void) {
int j;
// 常用回覆
shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
...
// 常用錯誤回覆
shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
...
}
這個和java中,把字串字面量快取起來,是一樣的,都是為了提高效能;java裡,不是還把128以內的整數也快取了嗎,對吧。
調整程序可以開啟的最大檔案數
伺服器一般在真實線上環境,如果是需要應對高併發的話,可能會有幾十上百萬的客戶端,和伺服器上的某個程序,建立tcp連線,而這時候,一般就需要調整程序可以開啟的最大檔案數(socket也是檔案)。
在閱讀redis原始碼之前,我知道的,修改程序可以開啟的最大檔案數的方式是通過ulimit,具體的,大家可以看下面這兩個連結:
linux最大檔案控制代碼數量總結
Elasticsearch之優化
但是,在這個原始碼中,發現了另外一種方式:
- 獲取當前的指定資源的限制值的api
#define RLIMIT_NOFILE 5 /* max number of open files */
struct rlimit {
rlim_t rlim_cur;
rlim_t rlim_max;
};
struct rlimit limit;
getrlimit(RLIMIT_NOFILE,&limit)
上面這個程式碼,獲取當前系統中,NOFILE(程序最大檔案數)這個值的資源限制大小。
通過man getrlimit(需要先安裝,安裝方式:yum install man-pages.noarch
),可以看到:
-
setrlimit則可以設定資源的相關限制
limit.rlim_cur = f; limit.rlim_max = f; setrlimit(RLIMIT_NOFILE,&limit)
建立事件迴圈相關資料結構
事件迴圈器的結構如下:
/*
* State of an event based program
*
* 事件處理器的狀態
*/
typedef struct aeEventLoop {
// 目前已註冊的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追蹤的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用於生成時間事件 id
long long timeEventNextId;
// 最後一次執行時間事件的時間
time_t lastTime; /* Used to detect system clock skew */
// 已註冊的檔案事件
aeFileEvent *events; /* Registered events */
// 已就緒的檔案事件
aeFiredEvent *fired; /* Fired events */
// 時間事件
aeTimeEvent *timeEventHead;
// 事件處理器的開關
int stop;
// 多路複用庫的私有資料
void *apidata; /* This is used for polling API specific data */
// 在處理事件前要執行的函式
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
初始化上面這個資料結構的程式碼在:aeCreateEventLoop in redis.c
上面這個結構中,主要就是:
-
apidata中,主要用於儲存多路複用庫的相關資料,每次呼叫多路複用庫,去進行select時,如果發現有就緒的io事件發生,就會存放到 fired 屬性中。
比如,select就是linux下,老版本的linux核心中,多路複用的一種實現,redis中,其程式碼如下:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { ... // 1 retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { ... // 2 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
省略了部分程式碼,其中,1處,進行select,這一步類似於java中nio的select操作;2處,將select返回的,已就緒的檔案描述符,填充到fired 屬性。
-
另外,我們提到過,redis有一些後臺任務,比如清理過期key,這個不是一蹴而就的;每次週期執行後臺任務時,就會去清理一部分,而這裡的後臺任務,其實就是上面這個資料結構中的時間事件。
// 時間事件 aeTimeEvent *timeEventHead;
分配16個數據庫的記憶體空間
server.db = zmalloc(sizeof(redisDb) * server.dbnum);
開啟listen埠,監聽請求
/* Open the TCP listening socket for the user commands. */
// 開啟 TCP 監聽埠,用於等待客戶端的命令請求
listenToPort(server.port, server.ipfd, &server.ipfd_count)
這裡就是開啟平時的6379埠的地方。
初始化16個數據庫對應的資料結構
/* Create the Redis databases, and initialize other internal state. */
// 建立並初始化資料庫結構
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType, NULL);
server.db[j].expires = dictCreate(&keyptrDictType, NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);
server.db[j].ready_keys = dictCreate(&setDictType, NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType, NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
db的資料結構如下:
typedef struct redisDb {
// 資料庫鍵空間,儲存著資料庫中的所有鍵值對
dict *dict; /* The keyspace for this DB */
// 鍵的過期時間,字典的鍵為鍵,字典的值為過期事件 UNIX 時間戳
dict *expires; /* Timeout of keys with a timeout set */
// 正處於阻塞狀態的鍵
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
// 可以解除阻塞的鍵
dict *ready_keys; /* Blocked keys that received a PUSH */
// 正在被 WATCH 命令監視的鍵
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
// 資料庫號碼
int id; /* Database ID */
// 資料庫的鍵的平均 TTL ,統計資訊
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
這裡可以看到,設定了過期時間的key,除了會在 dict 屬性儲存,還會新增一條記錄到 expires 字典。
expires字典的key:執行鍵的指標;value:過期時間。
建立pub/sub相關資料結構並初始化
// 建立 PUBSUB 相關結構
server.pubsub_channels = dictCreate(&keylistDictType, NULL);
server.pubsub_patterns = listCreate();
初始化部分統計屬性
// serverCron() 函式的執行次數計數器
server.cronloops = 0;
// 負責執行 BGSAVE 的子程序的 ID
server.rdb_child_pid = -1;
// 負責進行 AOF 重寫的子程序 ID
server.aof_child_pid = -1;
aofRewriteBufferReset();
// AOF 緩衝區
server.aof_buf = sdsempty();
// 最後一次完成 SAVE 的時間
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
// 最後一次嘗試執行 BGSAVE 的時間
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
// 伺服器啟動時間
server.stat_starttime = time(NULL);
// 已使用記憶體峰值
server.stat_peak_memory = 0;
server.resident_set_size = 0;
// 最後一次執行 SAVE 的狀態
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
設定時間事件對應的函式指標
/* Create the serverCron() time event, that's our main way to process
* background operations. */
// 為 serverCron() 建立時間事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
這裡的serverCron就是一個函式,後續每次週期觸發時間事件時,就會執行這個serverCron。
可以看這裡的英文註釋,作者也提到,這是主要的處理後臺任務的方式。
這塊以後也會重點分析。
設定connect事件對應的連線處理器
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
這裡的acceptTcpHandler就是處理新連線的函式:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while (max--) {
// accept 客戶端連線
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
// 為客戶端建立客戶端狀態(redisClient)
acceptCommonHandler(cfd, 0);
}
}
建立aof檔案
如果aof打開了,就需要建立aof檔案。
if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY | O_APPEND | O_CREAT, 0644);
}
剩下的幾個,暫時不涉及的任務
// 如果伺服器以 cluster 模式開啟,那麼初始化 cluster
if (server.cluster_enabled) clusterInit();
// 初始化複製功能有關的指令碼快取
replicationScriptCacheInit();
// 初始化腳本系統
scriptingInit();
// 初始化慢查詢功能
slowlogInit();
// 初始化 BIO 系統
bioInit();
上面的幾個,我們暫時還講解不到,先看看就行。
到此,初始化redis server,就基本結束了。
總結
本講內容較多,主要是redis啟動過程中,要做的事,也太多了。希望我已經大致講清楚了,其中,連線處理器那些都只是大致講了,後面會繼續。謝謝大家