redis 文件事件模型
參考文獻:
- 深入剖析 redis 事件驅動
- Redis 中的事件循環
- 深入了解epoll (轉)
- Redis自己的事件模型 ae
- EPOLL(7)
- Linux IO模式及 select、poll、epoll詳解
epoll為什麽這麽快,epoll的實現原理
概述
在redis中,對於對於文件事件的處理采用了Reactor模型。總體來說,就是將io多路復用所監聽到的文件去處,並放入一個隊列中依次處理。接下去本文以一個io多路復用的例子開始,一步步還原redis文件事件的運行過程
epoll (本節從Linux IO模式及 select、poll、epoll詳解摘抄)
epoll使用的過程中需要如下的三個接口:
int epoll_create(int size);//創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
int epoll_create(int size)
創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不同於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並不是限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當創建好epoll句柄後,它就會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須調用close()關閉,否則可能導致fd被耗盡。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
- fd:是需要監聽的fd(文件描述符)
- epoll_event:是告訴內核需要監聽什麽事,struct epoll_event結構如下:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; //events可以是以下幾個宏的集合: EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉); EPOLLOUT:表示對應的文件描述符可以寫; EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來); EPOLLERR:表示對應的文件描述符發生錯誤; EPOLLHUP:表示對應的文件描述符被掛斷; EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。 EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列裏
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。
一個epoll的示例
有了上面的論述,用一個簡單的例子來說明下epoll的使用(來自http://man7.org/linux/man-pages/man7/epoll.7.html):
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
/* Code to set up listening socket, ‘listen_sock‘,
(socket(), bind(), listen()) omitted */
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &addr, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}
如圖所示,可以看出使用epoll的過程。接下來將介紹redis事件驅動模型中主要涉及的數據結構。
redis事件驅動模型
數據結構
redis事件驅動模型中主要涉及到如下的幾個數據結構:
- aeCreateEventLoop
- aeApiState
- aeFileEvent
aeFiredEvent
redis事件處理的核心是aeCreateEventLoop結構,如圖可以看出主要的結構體如下:
```
typedef struct aeEventLoop {// 目前已註冊的最大描述符
int maxfd; /* highest file descriptor currently registered */// 目前已追蹤的最大描述符
int setsize; /* max number of file descriptors tracked */// 用於生成時間事件 id
long long timeEventNextId;// 最後一次執行時間事件的時間
time_t lastTime; /* Used to detect system clock skew */// 已註冊的文件事件
aeFileEvent events; / Registered events */// 已就緒的文件事件
aeFiredEvent fired; / Fired events */// 時間事件
aeTimeEvent *timeEventHead;// 事件處理器的開關
int stop;// 多路復用庫的私有數據
void apidata; / This is used for polling API specific data */// 在處理事件前要執行的函數
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
其中aeFileEvent 結構體為已經註冊並需要監聽的事件的結構體。在redis初始化的時候會創建一個 setSize*sizeof(aeFileEvent) 以及一個 setSize*siezeof(aeFiredEvent) 大小的內存,用文件描述符作為其索引。那麽這個大小定位多少合適呢?在Linux個中,文件描述符是個有限的資源,當打開一個文件時就會消耗一個文件描述符,當關閉該文件描述符或者程序結束時會釋放該文件描述符資源,從而供其他文件打開操作使用。當文件描述符超過最大值後,打開文件就會出錯。那麽這個最大值是多少呢?可以通過/proc/sys/fs/file-max看到系統支持的最大的文件描述符數。通過 ulimit -n 可以看到當前用戶能打開的最大的文件描述符。在我這裏的一臺8g內存的機器上,系統支持最大的文件描述是365146。而在這臺64bit的機器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小為40byte。按系統最大支持的文件描述符來算,固定消耗內存為14.6M。這樣以文件描述符作為數組的下標來索引,雖然這樣的哈希在接入量不大的情況下會有大量的浪費。但是最多也就浪費14M 的內存,因此這樣的設計是可取的。【4】
typedef struct aeFileEvent {
// 監聽事件類型掩碼,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 讀事件處理器
aeFileProc *rfileProc;
// 寫事件處理器
aeFileProc *wfileProc;
// 多路復用庫的私有數據
void *clientData;
} aeFileEvent;
aeFiredEvent結構體是已經監聽到有事件發生的描述符的集合。
typedef struct aeFiredEvent {
// 已就緒文件描述符
int fd;
// 事件類型掩碼,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是兩者的或
int mask;
} aeFiredEvent;
void *apidata;在ae創建的時候,會被賦值為aeApiState結構體,結構體的定義如下:
typedef struct aeApiState {
// epoll_event 實例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
可以見得,這個結構體是為了epoll所準備的數據結構。redis可以選擇不同的io多路復用方法。因此 apidata 是個void類型,根據不同的io多路復用庫來選擇。
## Reactor模型的創建與使用
### aeEventLoop 的創建
aeEventLoop aeCreateEventLoop(int setsize) {
aeEventLoop eventLoop;
int i;
// 創建事件狀態結構
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 初始化文件事件結構和已就緒文件事件結構數組
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 設置數組大小
eventLoop->setsize = setsize;
// 初始化執行最近一次執行時間
eventLoop->lastTime = time(NULL);
// 初始化時間事件結構
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let‘s initialize the
* vector with it. */
// 初始化監聽事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
// 返回事件循環
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
如下圖所示,可以見得在初始化的時候創建結構體的流程。
graph LR
創建aeFileEvent-->創建aeFireEvent
創建aeFireEvent-->調用aeApiCreate創建aeApiState
函數aeApiCreate則創建了一個epoll所需要的數據結構。
/*
創建一個新的 epoll 實例,並將它賦值給 eventLoop
/
static int aeApiCreate(aeEventLoop eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 初始化事件槽空間
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}// 創建 epoll 實例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}// 賦值給 eventLoop
eventLoop->apidata = state;
return 0;
}
### aeFileEvent的註冊
在創建了aeEventLoop之後,對於需要監聽的文件描述符需要進行註冊,在aeFileEvent結構體中,可以看到如下的兩個結構aeFileProc *rfileProc和aeFileProc *wfileProc,就是在註冊監聽事件的時候進行賦值的。
函數aeCreateFileEvent執行創建aeFileEvent和添加文件句柄到epoll中。
/*
- 根據 mask 參數的值,監聽 fd 文件的狀態,
當 fd 可用時,執行 proc 函數
/
int aeCreateFileEvent(aeEventLoop eventLoop, int fd, int mask,
aeFileProc proc, void clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}if (fd >= eventLoop->setsize) return AE_ERR;
// 取出文件事件結構
aeFileEvent *fe = &eventLoop->events[fd];// 監聽指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;// 設置文件事件類型,以及事件的處理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;// 私有數據
fe->clientData = clientData;// 如果有需要,更新事件處理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;return AE_OK;
}
其中aeApiAddEvent函數就是在開頭之中epoll例子中添加一個文件描述符到監聽集合中的方法封裝函數:
/*
關聯給定事件到 fd
/* If the fd was already monitored for some event, we need a MOD
/
static int aeApiAddEvent(aeEventLoop eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;- operation. Otherwise we need an ADD operation.
- 如果 fd 沒有關聯任何事件,那麽這是一個 ADD 操作。
- 如果已經關聯了某個/某些事件,那麽這是一個 MOD 操作。
*/
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// 註冊事件到 epoll
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events /
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; / avoid valgrind warning */
ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
### 事件驅動模型的運行過程
到這裏redis事件驅動的主要數據結構和初始化的方法已經介紹完畢。接下來將展示事件驅動的運行過程。在redis源碼中,省略去其他部分,跟事件驅動相關的代碼如下:
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 為 TCP 連接關聯連接應答(accept)處理器
// 用於接受並應答客戶端的 connect() 調用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 為本地套接字關聯應答處理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
aeMain(server.el);
其中aeCreateEventLoop和aeCreateFileEvent函數在之前已經介紹過。接下來重點介紹下aeMain函數:
/*
事件處理器的主循環
/
void aeMain(aeEventLoop eventLoop) {eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件處理前執行的函數,那麽運行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 開始處理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
```
我們可以看出,aeMain函數中主要調用了aeProcessEvents處理事件,aeProcessEvents中我們略去其他的代碼,主要關註如下的部分:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
....
// 處理文件事件,阻塞時間由 tvp 決定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 從已就緒數組中獲取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn‘t
* processed, so we check if the event is still valid. */
// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執行其中一個
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
....
}
可以看出函數aeProcessEvents調用了aeApiPoll獲取已經就緒的事件。在for循環中,從eventLoop->fired(已經就緒的事件)中取出事件結構體,然後根據是讀時間還是寫事件進行處理。在aeApiPoll中,就可以看到我們熟悉的
epoll_wait的身影。可以見得通過調用系統的epoll_wait函數,然後將已經就緒的事件放入 eventLoop->fired中。
/*
* 獲取可執行事件
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 等待時間
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// 有至少一個事件就緒?
if (retval > 0) {
int j;
// 為已就緒事件設置相應的模式
// 並加入到 eventLoop 的 fired 數組中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// 返回已就緒事件個數
return numevents;
}
到這裏還有一個疑問,在redis初始化的時候只註冊了tcp和本地套接字的描述符,那麽當有個新的客戶端連接進來的時候,是怎麽將客戶端的描述符加到監聽隊列裏面的呢?答案就在最開始的acceptTcpHandler函數中。在這個函數中依次調用了acceptCommonHandler->createClient->aeCreateFileEvent函數。可以見得當監聽的一個tcp或者本地socket產生了connect 事件的時候,就會依次調用這些函數,然後將新的客戶端端描述符加入監聽中。
總結
redis的事件驅動模型分析就到這裏,總體而言還是比較直觀的。這中間也學習了很多,包括epoll的原理等。
redis 文件事件模型