Redis 內存管理與事件處理
阿新 • • 發佈:2017-07-01
erp 監聽端口 nal 回調 files poll 其中 監聽服務 repl
1 Redis內存管理
Redis內存管理相關文件為zmalloc.c/zmalloc.h,其只是對C中內存管理函數做了簡單的封裝,屏蔽了底層平臺的差異,並增加了內存使用情況統計的功能。void *zmalloc(size_t size) { // 多申請的一部分內存用於存儲當前分配了多少自己的內存 void *ptr = malloc(size+PREFIX_SIZE); if (!ptr) zmalloc_oom_handler(size); #ifdef HAVE_MALLOC_SIZE update_zmalloc_stat_alloc(zmalloc_size(ptr));return ptr; #else *((size_t*)ptr) = size; // 內存分配統計 update_zmalloc_stat_alloc(size+PREFIX_SIZE); return (char*)ptr+PREFIX_SIZE; #endif }
內存布局圖示:
2 事件處理
Redis的事件類型分為時間事件和文件事件,文件事件也就是網絡連接事件。時間事件的處理是在epoll_wait返回處理文件事件後處理的,每次epoll_wait的超時時間都是Redis最近的一個定時器時間。 Redis在進行事件處理前,首先會進行初始化,初始化的主要邏輯在main/initServer函數中。初始化流程主要做的工作如下:- 設置信號回調函數。
- 創建事件循環機制,即調用epoll_create。
- 創建服務監聽端口,創建定時事件,並將這些事件添加到事件機制中。
void initServer(void) { int j; // 設置信號對應的處理函數 signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); ... createSharedObjects(); adjustOpenFilesLimit(); // 創建事件循環機制,及調用epoll_create創建epollfd用於事件監聽server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ // 創建監聽服務端口,socket/bind/listen if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); ... /* Create the Redis databases, and initialize other internal state. */ for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].eviction_pool = evictionPoolAlloc(); server.db[j].id = j; server.db[j].avg_ttl = 0; } ... /* Create the serverCron() time event, that‘s our main way to process * background operations. 創建定時事件 */ if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can‘t create the serverCron time event."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } // 將事件加入到事件機制中,調用鏈為 aeCreateFileEvent/aeApiAddEvent/epoll_ctl if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { serverLog(LL_WARNING, "Can‘t open the append-only file: %s", strerror(errno)); exit(1); } } ... }
事件處理流程 事件處理函數鏈:aeMain / aeProcessEvents / aeApiPoll / epoll_wait。 常見的事件機制處理流程是:調用epoll_wait等待事件來臨,然後遍歷每一個epoll_event,提取epoll_event中的events和data域,data域常用來存儲fd或者指針,不過一般的做法是提取出events和data.fd,然後根據fd找到對應的回調函數,fd與對應回調函數之間的映射關系可以存儲在特定的數據結構中,比如數組或者哈希表,然後調用事件回調函數來處理。 Redis中用了一個數組來保存fd與回調函數的映射關系,使用數組的優點就是簡單高效,但是數組一般使用在建立的連接不太多情況,而Redis正好符合這個情況,一般Redis的文件事件大都是客戶端建立的連接,而客戶端的連接個數是一定的,該數量通過配置項maxclients來指定。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; } int aeProcessEvents(aeEventLoop *eventLoop, int flags) { numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 從eventLoop->events數組中查找對應的回調函數 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn‘t * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } ... }
文件事件的監聽 Redis監聽端口的事件回調函數鏈是:acceptTcpHandler / acceptCommonHandler / createClient / aeCreateFileEvent / aeApiAddEvent / epoll_ctl。 在Reids監聽事件處理流程中,會將客戶端的連接fd添加到事件機制中,並設置其回調函數為readQueryFromClient,該函數負責處理客戶端的命令請求。 命令處理流程 命令處理流程鏈是:readQueryFromClient / processInputBuffer / processCommand / call / 對應命令的回調函數(c->cmd->proc),比如get key命令的處理回調函數為getCommand。getCommand的執行流程是先到client對應的數據庫字典中根據key來查找數據,然後根據響應消息格式將查詢結果填充到響應消息中。
3 如何添加自定義命令
如何在Redis中添加自定的命令呢?其中只需要改動以下幾個地方就行了,比如自定義命令random xxx,然後返回redis: xxx,因為hello xxx和get key類似,所以就依葫蘆畫瓢。random命令用來返回一個小於xxx的隨機值。 首先在redisCommandTable數組中添加自定義的命令,redisCommandTable數組定義在server.c中。然後在getCommand定義處後面添加randomCommand的定義,getCommand定義在t_string.c中。最後在server.h中添加helloCommand的聲明。整個修改patch文件如下,代碼基於redis-2.8.9版本。From 5304020683078273c1bc6cc9666dab95efa18607 Mon Sep 17 00:00:00 2001 From: luoxn28 <luoxn28@163.com> Date: Fri, 30 Jun 2017 04:43:47 -0700 Subject: [PATCH] add own command: random num --- src/server.c | 3 ++- src/server.h | 1 + src/t_string.c | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/server.c b/src/server.c index 609f396..e040104 100644 --- a/src/server.c +++ b/src/server.c @@ -296,7 +296,8 @@ struct redisCommand redisCommandTable[] = { {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, - {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} + {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}, + {"random",randomCommand,2,"rF",0,NULL,1,1,1,0,0} }; struct evictionPoolEntry *evictionPoolAlloc(void); diff --git a/src/server.h b/src/server.h index 3fa7c3a..427ac92 100644 --- a/src/server.h +++ b/src/server.h @@ -1485,6 +1485,7 @@ void setnxCommand(client *c); void setexCommand(client *c); void psetexCommand(client *c); void getCommand(client *c); +void randomCommand(client *c); void delCommand(client *c); void existsCommand(client *c); void setbitCommand(client *c); diff --git a/src/t_string.c b/src/t_string.c index 8c737c4..df4022d 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -173,6 +173,50 @@ void getCommand(client *c) { getGenericCommand(c); } +static bool checkRandomNum(char *num) +{ + char *c = num; + + while (*c != ‘\0‘) { + if (!((‘0‘ <= *c) && (*c <= ‘9‘))) { + return false; + } + c++; + } + + return true; +} + +/** + * command: random n + * return a random num < n, if n <= 0, return 0 + * @author: luoxiangnan + */ +void randomCommand(client *c) +{ + char buff[64] = {0}; + int num = 0; + robj *o = NULL; + + if (!checkRandomNum(c->argv[1]->ptr)) { + o = createObject(OBJ_STRING, sdsnewlen("sorry, it‘s not a num :(", + strlen("sorry, it‘s not a num :("))); + addReplyBulk(c, o); + return; + } + + sscanf(c->argv[1]->ptr, "%d", &num); + if (num > 0) { + num = random() % num; + } else { + num = 0; + } + + sprintf(buff, "%s %d", "redis: ", num); + o = createObject(OBJ_STRING, sdsnewlen(buff, strlen(buff))); + addReplyBulk(c, o); +} + void getsetCommand(client *c) { if (getGenericCommand(c) == C_ERR) return; c->argv[2] = tryObjectEncoding(c->argv[2]); -- 1.8.3.1結果如下所示:
Redis 內存管理與事件處理