Redis原始碼分析--伺服器(1)結構與啟動
阿新 • • 發佈:2022-02-07
伺服器:
一、伺服器結構體 redisServer:
struct redisServer { char *configfile; int hz; int dbnum; redisDb *db; dict *commands; aeEventLoop *el; int port; char *bindaddr[CONFIG_BINDADDR_MAX]; int bindaddr_count; int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ list *clients; int maxidletime; /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of REDIS_NOTIFY... flags. */ }
-
L15、L16:redis可以配置在多個網絡卡上,所以配置多個socket fd;
在config.c中可以看到,伺服器啟動載入配置檔案時,如果配置檔案中有bind項,才會有多個ipfd;
if (!strcasecmp(argv[0],"bind") && argc >= 2) { int j, addresses = argc-1; if (addresses > REDIS_BINDADDR_MAX) { err = "Too many bind addresses specified"; goto loaderr; } for (j = 0; j < addresses; j++) server.bindaddr[j] = zstrdup(argv[j+1]); server.bindaddr_count = addresses; }
-
L6:伺服器中的資料庫:
typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; long long avg_ttl; /* Average TTL, just for stats */ } redisDb;
-
L2: 這個字典稱作 鍵空間 key space,就是使用者所見到的資料庫;
所有對資料庫的操作(新增或刪除資料庫),都是對這個鍵空間字典的操作;
-
L2:鍵的過期處理,todo;
-
L7:事務,todo;
-
-
L21:訂閱,todo;
二、伺服器的啟動與初始化:
1、啟動伺服器:
Redis伺服器啟動後,首先執行main函式:
int main(int argc, char **argv) {
// ...
initServer();
// ...
// 解析引數,載入配置檔案等
// ...
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
- 伺服器首先進行初始化 L3,接著就進入事件迴圈[2] L7;
2、初始化伺服器:
接著看伺服器初始化需要哪些步驟:
void initServer() {
// ...
/* 建立事件迴圈和資料庫 */
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
// ...
/* Create the serverCron() time event, that's our main way to process
* background operations. */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
// ...
/* 為每個bind的埠開一個socket用來處理連線 */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// ...
}
-
L3:在事件迴圈中加入對時間事件serverCron的監聽;
-
L5:此時綁定了port和地址的socket fd正在listen,監聽可讀事件,就可以進行accept;回撥acceptTcpHandler,建立客戶端;
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); /* 進行accept */ cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == AE_ERR) { redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); /* 會呼叫createClient建立客戶端 */ acceptCommonHandler(cfd,0); }
-
L4~L6:學習一下,避免編譯器報Warning:
/* Anti-warning macro... */ #define REDIS_NOTUSED(V) ((void) V)
-