Redis的事件機制
阿新 • • 發佈:2020-07-28
[toc]
Redis程式的執行過程是一個處理事件的過程,也稱Redis是一個事件驅動的服務。Redis中的事件分兩類:檔案事件(File Event)、時間事件(Time Event)。檔案事件處理檔案的讀寫操作,特別是與客戶端通訊的Socket檔案描述符的讀寫操作;時間事件主要用於處理一些定時處理的任務。
本文首先介紹Redis的執行過程,闡明Redis程式是一個事件驅動的程式;接著介紹事件機制實現中涉及的資料結構以及事件的註冊;最後介紹了處理客戶端中涉及到的套接字檔案讀寫事件。
#### 一、Redis的執行過程
Redis的執行過程是一個事件處理的過程,可以通過下圖反映出來:
![](https://img2020.cnblogs.com/blog/339551/202007/339551-20200727222634160-1313294053.jpg)
圖1 Redis的事件處理過程
從上圖可以看出:Redis伺服器的執行過程就是迴圈等待並處理事件的過程。通過時間事件將執行事件分成一個個的時間分片,如圖1的右半部分所示。如果在指定的時間分片中,有檔案事件發生,如:讀檔案描述符可讀、寫檔案描述符可寫,則呼叫相應的處理函式進行檔案的讀寫處理。檔案事件處理完成之後,處理期望發生時間在當前時間之前或正好是當前時刻的時間事件。然後再進入下一次迴圈迭代處理。
如果在指定的事件間隔中,沒有檔案事件發生,則不需要處理,直接進行時間事件的處理,如下圖所示。
![](https://img2020.cnblogs.com/blog/339551/202007/339551-20200727222902809-586806562.jpg)
圖2 Redis的事件處理過程(無檔案事件發生)
#### 二、事件資料結構
##### 2.1 檔案事件資料結構
Redis用如下結構體來記錄一個檔案事件:
```c
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
```
通過mask來描述發生了什麼事件:
- AE_READABLE:檔案描述符可讀;
- AE_WRITABLE:檔案描述符可寫;
- AE_BARRIER:檔案描述符阻塞
rfileProc和wfileProc分別為讀事件和寫事件發生時的回撥函式,其函式簽名如下:
```c
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
```
##### 2.2 事件事件資料結構
Redis用如下結構體來記錄一個時間事件:
```c
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
```
when_sec和when_ms指定時間事件發生的時間,timeProc為時間事件發生時的處理函式,簽名如下:
```c
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
```
prev和next表明時間事件構成了一個雙向連結串列。
#### 3.3 事件迴圈
Redis用如下結構體來記錄系統中註冊的事件及其狀態:
```c
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
```
這一結構體中,最主要的就是檔案事件指標events和時間事件頭指標timeEventHead。檔案事件指標event指向一個固定大小(可配置)陣列,通過檔案描述符作為下標,可以獲取檔案對應的事件物件。
#### 三、事件的註冊過程
事件驅動的程式實際上就是在事件發生時,呼叫相應的處理函式(即:回撥函式)進行邏輯處理。因此關於事件,程式需要知道:①事件的發生;② 回撥函式。事件的註冊過程就是告訴程式這兩。下面我們分別從檔案事件、時間事件的註冊過程進行闡述。
##### 3.1 檔案事件的註冊過程
對於檔案事件:
- 事件的發生:應用程式需要知道哪些檔案描述符發生了哪些事件。感知檔案描述符上有事件發生是由作業系統的職責,應用程式需要告訴作業系統,它關心哪些檔案描述符的哪些事件,這樣通過相應的系統API就會返回發生了事件的檔案描述符。
- 回撥函式:應用程式知道了檔案描述符發生了事件之後,需要呼叫相應回撥函式進行處理,因而需要在事件發生之前將相應的回撥函式準備好。
這就是檔案事件的註冊過程,函式的實現如下:
```c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
```
這段程式碼邏輯非常清晰:首先根據檔案描述符獲得檔案事件物件,接著在作業系統中新增自己關心的檔案描述符(`addApiAddEvent`),最後將回調函式記錄到檔案事件物件中。因此,一個執行緒就可以同時監聽多個檔案事件,這就是IO多路複用。作業系統提供多種IO多路複用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支援所有這些模型,使用者可以根據需要進行選擇。不同的模型,向作業系統新增檔案描述符方式也不同,Redis將這部分邏輯封裝在`aeApiAddEvent`中,下面程式碼是EPOLL模型的實現:
```c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
```
這段程式碼就是對作業系統呼叫`epoll_ctl()`的封裝,`EPOLLIN`對應的是讀(輸入)事件,EPOLLOUT對應的是寫(輸出)事件。
##### 3.2 時間事件的註冊過程
對於時間事件:
- 事件的發生:當前時刻正好是事件期望發生的時刻,或者是晚於事件期望發生的時刻,所以需要讓程式知道事件期望發生的時刻;
- 回撥函式:此時呼叫回撥函式進行處理,所以需要讓程式知道事件的回撥函式。
對應的事件事件註冊函式如下:
```c
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
```
這段程式碼邏輯也是非常簡單:首先建立時間事件物件,接著設定事件,設定回撥函式,最後將事件事件物件插入到時間事件連結串列中。設定時間事件期望發生的時間比較簡單:
```c
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
static void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;
gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec/1000;
}
```
當前時間加上期望的時間間隔,作為事件期望發生的時刻。
#### 四、套接字檔案事件
Redis為客戶端提供儲存資料和獲取資料的快取服務,監聽並處理來自請求,將結果返回給客戶端,這一過程將會發生以下檔案事件:
![](https://img2020.cnblogs.com/blog/339551/202007/339551-20200727222952441-228252876.jpg)
與上圖相對應,對於一個請求,Redis會註冊三個檔案事件:
##### 4.1 TCP連線建立事件
伺服器初始化時,在伺服器套接字上註冊TCP連線建立的事件。
```c
void initServer(void) {
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
}
```
回撥函式為acceptTcpHandler,該函式最重要的職責是建立客戶端結構。
##### 4.2 客戶端套接字讀事件
建立客戶端:在客戶端套接字上註冊客戶端套接字可讀事件。
```c
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
```
回撥函式為readQueryFromClient,顧名思義,此函式將從客戶端套接字中讀取資料。
##### 4.3 向客戶端返回資料
Redis完成請求後,Redis並非處理完一個請求後就註冊一個寫檔案事件,然後事件回撥函式中往客戶端寫回結果。根據圖1,檢測到檔案事件發生後,Redis對這些檔案事件進行處理,即:呼叫rReadProc或writeProc回撥函式。處理完成後,對於需要向客戶端寫回的資料,先快取到記憶體中:
```c
typedef struct client {
// ...其他欄位
list *reply; /* List of reply objects to send to the client. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
};
```
傳送給客戶端的資料會存放到兩個地方:
- reply指標存放待發送的物件;
- buf中存放待返回的資料,bufpos指示資料中的最後一個位元組所在位置。
這裡遵循一個原則:只要能存放在buf中,就儘量存入buf位元組陣列中,如果buf存不下了,才存放在reply物件陣列中。
寫回發生在進入下一次等待檔案事件之前,見圖1中【等待前處理】,會呼叫以下函式來處理客戶端資料寫回邏輯:
```c
int writeToClient(int fd, client *c, int handler_installed) {
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = o->used;
if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
}
}
}
```
上述函式只截取了資料傳送部分,首先發送`buf`中的資料,然後傳送`reply`中的資料。
有讀者可能會疑惑:write()系統呼叫是阻塞式的介面,上述做法會不會在write()呼叫的地方有等待,從而導致效能低下?這裡就要介紹Redis是怎麼處理這個問題的。
首先,我們發現建立客戶端的程式碼:
```c
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
}
}
```
可以看到設定fd是非阻塞(NonBlock),這就保證了在套接字fd上的read()和write()系統呼叫不是阻塞的。
其次,和檔案事件的處理操作一樣,往客戶端寫資料的操作也是批量的,函式如下:
```c
int handleClientsWithPendingWrites(void) {
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
}
```
可以看到,首先對每個客戶端呼叫剛才介紹的`writeToClient()`函式進行寫資料,如果還有資料沒寫完,那麼註冊寫事件,當套接字檔案描述符寫就緒時,呼叫`sendReplyToClient()`進行剩餘資料的寫操作:
```c
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
writeToClient(fd,privdata,1);
}
```
仔細想一下就明白了:處理完得到結果後,這時套接字的寫緩衝區一般是空的,因此write()函式呼叫成功,所以就不需要註冊寫檔案事件了。如果寫緩衝區滿了,還有資料沒寫完,此時再註冊寫檔案事件。並且在資料寫完後,將寫事件刪除:
```c
int writeToClient(int fd, client *c, int handler_installed) {
if (!clientHasPendingReplies(c)) {
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
}
```
注意到,在sendReplyToClient()函式實現中,第三個引數正