Libevent原始碼分析-----Libevent工作流程探究
之前的博文講了很多Libevent的基礎構件,現在以一個實際例子來初步探究Libevent的基本工作流程。由於還有很多Libevent的細節並沒有講所以,這裡的探究還是比較簡潔,例子也相當簡單。
#include<unistd.h> #include<stdio.h> #include<event.h> #include<thread.h> void cmd_cb(int fd, short events, void *arg) { char buf[1024]; printf("in the cmd_cb\n"); read(fd, buf, sizeof(buf)); } int main() { evthread_use_pthreads(); //使用預設的event_base配置 struct event_base *base = event_base_new(); struct event *cmd_ev = event_new(base, STDIN_FILENO, EV_READ | EV_PERSIST, cmd_cb, NULL); event_add(cmd_ev, NULL); //沒有超時 event_base_dispatch(base); return 0; }
上面程式碼估計是不會比讀者寫的第一個Libevent程式複雜。但這已經包含了Libevent的基礎工作流程。這裡將進入這些函式的內部探究,並且只會講解之前博文出現過的,沒出現的,儘量不講。在講解之前,要先了解一下struct event這個結構體。
struct event { TAILQ_ENTRY(event) ev_active_next; //啟用佇列 TAILQ_ENTRY(event) ev_next; //註冊事件佇列 /* for managing timeouts */ union { TAILQ_ENTRY(event) ev_next_with_common_timeout; int min_heap_idx; //指明該event結構體在堆的位置 } ev_timeout_pos; //僅用於定時事件處理器(event).EV_TIMEOUT型別 //對於I/O事件,是檔案描述符;對於signal事件,是訊號值 evutil_socket_t ev_fd; struct event_base *ev_base; //所屬的event_base //因為訊號和I/O是不能同時設定的。所以可以使用共用體以省記憶體 //在低版本的Libevent,兩者是分開的,不在共用體內。 union { //無論是訊號還是IO,都有一個TAILQ_ENTRY的佇列。它用於這樣的情景: //使用者對同一個fd呼叫event_new多次,並且都使用了不同的回撥函式。 //每次呼叫event_new都會產生一個event*。這個xxx_next成員就是把這些 //event連線起來的。 /* used for io events */ //用於IO事件 struct { TAILQ_ENTRY(event) ev_io_next; struct timeval ev_timeout; } ev_io; /* used by signal events */ //用於訊號事件 struct { TAILQ_ENTRY(event) ev_signal_next; short ev_ncalls; //事件就緒執行時,呼叫ev_callback的次數 /* Allows deletes in callback */ short *ev_pncalls; //指標,指向次數 } ev_signal; } _ev; short ev_events;//記錄監聽的事件型別 EV_READ EVTIMEOUT之類 short ev_res; /* result passed to event callback *///記錄了當前啟用事件的型別 //libevent用於標記event資訊的欄位,表明其當前的狀態. //可能值為前面的EVLIST_XXX short ev_flags; //本event的優先順序。呼叫event_priority_set設定 ev_uint8_t ev_pri; ev_uint8_t ev_closure; struct timeval ev_timeout;//用於定時器,指定定時器的超時值 /* allows us to adopt for different types of events */ void (*ev_callback)(evutil_socket_t, short, void *arg); //回撥函式 void *ev_arg; //回撥函式的引數 };
event結構體裡面有幾個TAILQ_ENTRY佇列節點型別。這裡因為一個event是會同時處於多個佇列之中。比如前幾篇博文說到的同一個檔案描述符或者訊號值對應的多個event會被連在一起,所有的被加入到event_base的event也會連在一起,所有被啟用的event也會被連在一起。所以會有多個QAILQ_ENTRY。
event結構體只有一兩個之前沒有說到的概念,這不妨礙理解event結構體。而event_base結構體則會太多之前沒有說到的概念,所以這裡就不貼出event_base的程式碼了。
在讀這篇博文前,最好讀一下前面幾篇博文,因為會用到其他講到的東西。如果之前有講過的東西,這裡也將一筆帶過。
好了,開始探究。
最前面的evthread_use_pthreads();就不多說了,看《多執行緒、鎖、條件變數(一)》和《多執行緒、鎖、條件變數(二)》這兩篇博文吧。
建立event_base:
下面看一下event_base_new函式。它是由event_base_new_with_config函式實現的。我們還是看後面那個函式吧。
//event.c檔案
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
int i;
struct event_base *base;
int should_check_environment;
//之所以不用mm_malloc是因為mm_malloc並不會清零該記憶體區域。
//而這個函式是會清零申請到的記憶體區域,這相當於被base初始化
if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
event_warn("%s: calloc", __func__);
return NULL;
}
...
TAILQ_INIT(&base->eventqueue);
...
if (cfg)
base->flags = cfg->flags;
evmap_io_initmap(&base->io);
evmap_signal_initmap(&base->sigmap);
base->evbase = NULL;
should_check_environment =
!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
//選擇IO複用結構體
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (event_config_is_avoided_method(cfg,
eventops[i]->name))
continue;
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features)
continue;
}
if (should_check_environment &&
event_is_method_disabled(eventops[i]->name))
continue;
//找到一個滿足條件的多路IO複用函式
base->evsel = eventops[i];
//初始化ev_base。並且會對訊號監聽的處理也進行初始化
base->evbase = base->evsel->init(base);
}
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
//測試evthread_lock_callbacks結構中的lock指標函式是否為NULL
//即測試Libevent是否已經初始化為支援多執行緒模式。
//由於一開始是用mm_calloc申請記憶體的,所以該記憶體區域的值為0
//對於th_base_lock變數,目前的值為NULL.
if (EVTHREAD_LOCKING_ENABLED() &&
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { //配置是支援鎖的
EVTHREAD_ALLOC_LOCK(base->th_base_lock,
EVTHREAD_LOCKTYPE_RECURSIVE); //申請一個鎖
base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_COND(base->current_event_cond);//申請一個條件變數
}
#endif
return (base);
}
這裡用到了event_config結構體,關於這個結構體可以參考《配置event_base》一文。這個結構體主要是對event_base進行一些配置。另外程式碼中還講到了怎麼使用選擇一個多IO複用函式,這個可以參考《跨平臺Reactor介面的實現》一文。
巨集EVTHREAD_LOCKING_ENABLED主要是檢測是否已經支援鎖了。檢測的方式也很簡單,也就是檢測_evthread_lock_fns全域性變數中的lock成員變數是否不為NULL。有關這個_evthread_lock_fns全域性變數可以檢視《多執行緒、鎖、條件變數(一)》。
建立event:
好了,現在event_base已經新建出來了。下面看一下event_new函式,它和前面的event_base_new一樣,把主要是的初始化工作交給另一個函式。event_new函式的工作只是建立一個struct event結構體,然後把它的引數原封不動地傳給event_assign,所以還是看event_assign函式。
//event.c檔案
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd,
short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
//進行一些賦值和初始化。
ev->ev_base = base;
ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT; //初始化狀態
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
if (events & EV_SIGNAL) {
if ((events & (EV_READ|EV_WRITE)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ or EV_WRITE", __func__);
return -1;
}
}
...
return 0;
}
從event_assign函式的名字可以得知它是進行賦值操作的。所以它能可以在event被初始化後再次呼叫。不過,初始化後再次呼叫的話,有些事情要注意。這個在後面的部落格中會說到。
從上面的程式碼可看到:如果這個event是用來監聽一個訊號的,那麼就不能讓這個event監聽讀或者寫事件。原因是其與訊號event的實現方法相抵觸,具體可以參考《訊號event的處理》。
注意,此時event結構體的變數ev_flags的值是EVLIST_INIT。對變數的追蹤是很有幫助的。它指明瞭event結構體的狀態。它通過以或運算的方式取下面的值:
//event_struct.h檔案
#define EVLIST_TIMEOUT 0x01 //event從屬於定時器佇列或者時間堆
#define EVLIST_INSERTED 0x02 //event從屬於註冊佇列
#define EVLIST_SIGNAL 0x04 //沒有使用
#define EVLIST_ACTIVE 0x08 //event從屬於活動佇列
#define EVLIST_INTERNAL 0x10 //該event是內部使用的。訊號處理時有用到
#define EVLIST_INIT 0x80 //event已經被初始化了
/* EVLIST_X_ Private space: 0x1000-0xf000 */
#define EVLIST_ALL (0xf000 | 0x9f) //所有標誌。這個不能取
將event加入到event_base中:
建立完一個event結構體後,現在看一下event_add。它同前面的函式一樣,內部也是呼叫其他函式完成工作。因為它用到了鎖,所以給出它的程式碼
//event.c檔案
int
event_add(struct event *ev, const struct timeval *tv)
{
int res;
//加鎖
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
res = event_add_internal(ev, tv, 0);
//解鎖
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
...
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add(base, ev->ev_fd, ev); //加入io佇列
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add(base, (int)ev->ev_fd, ev);//加入訊號佇列
if (res != -1)
event_queue_insert(base, ev, EVLIST_INSERTED);//向event_base註冊事件
}
...
return (res);
}
event_add函式只是對event_base加了鎖,然後呼叫event_add_internal函式完成工作。所以函式event_add是執行緒安全的。
event_add_internal函式會呼叫前幾篇博文講到的evmap_io_add和evmap_signal_add,把有相同檔案描述符fd和訊號值sig的event連在一個佇列裡面。成功之後,就會呼叫event_queue_insert,向event_base註冊事件。
前面博文的evmap_io_add和evmap_signal_add函式內部還有一些地方並沒有說到。那就是把要監聽的fd或者sig新增到多路IO複用函式中,使得其是可以監聽的。
//evmap.c檔案
int
evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
const struct eventop *evsel = base->evsel;
struct event_io_map *io = &base->io;
struct evmap_io *ctx = NULL;
int nread, nwrite, retval = 0;
short res = 0, old = 0;
struct event *old_ev;
...
//GET_IO_SLOT_AND_CTOR巨集的作用就是讓ctx指向struct event_map_entry結構體中的TAILQ_HEAD
//巨集的展開,可以到http://blog.csdn.net/luotuo44/article/details/38403241檢視
GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
evsel->fdinfo_len);
//同一個fd可以呼叫event_new,event_add
//多次。nread、nwrite就是記錄有多少次。如果每次event_new的回撥函式
//都不一樣,那麼當fd有可讀或者可寫時,這些回撥函式都是會觸發的。
//對一個fd不能event_new、event_add太多次的。後面會進行判斷
nread = ctx->nread;
nwrite = ctx->nwrite;
if (nread)
old |= EV_READ;
if (nwrite)
old |= EV_WRITE;
if (ev->ev_events & EV_READ) {
//記錄是不是第一次。如果是第一次,那麼就說明該fd還沒被
//加入到多路IO複用中。即還沒被加入到像select、epoll這些
//函式中。那麼就要加入。這個在後面可以看到。
if (++nread == 1)
res |= EV_READ;
}
if (ev->ev_events & EV_WRITE) {
if (++nwrite == 1)
res |= EV_WRITE;
}
if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff)) {
event_warnx("Too many events reading or writing on fd %d",
(int)fd);
return -1;
}
//把fd加入到多路IO複用中。
if (res) {
void *extra = ((char*)ctx) + sizeof(struct evmap_io);
if (evsel->add(base, ev->ev_fd,
old, (ev->ev_events & EV_ET) | res, extra) == -1)
return (-1);
retval = 1;
}
//nread進行了++。把次數記錄下來。下次對於同一個fd,這個次數就有用了
ctx->nread = (ev_uint16_t) nread;
ctx->nwrite = (ev_uint16_t) nwrite;
TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next);
return (retval);
}
程式碼中有兩個計數nread和nwrite,當其值為1時,就說明是第一次監聽對應的事件。此時,就要把這個fd新增到多路IO複用函式中。這就完成fd與select、poll、epoll之類的多路IO複用函式的相關聯。這完成對fd監聽的第一步。
下面再看event_queue_insert函式的實現。
//event.c檔案
static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
...
ev->ev_flags |= queue;
switch (queue) {
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
...
}
}
這個函式的主要作為是把event加入到對應的佇列中。在這裡,是為了把event加入到eventqueue這個已註冊佇列中,即將event向event_base註冊。注意,此時event結構體的ev_flags變數為EVLIST_INIT | EVLIST_INSERTED了。
進入主迴圈,開始監聽event:
現在事件已經新增完畢,開始進入主迴圈event_base_dispatch函式。還是同樣,該函式內部呼叫event_base_loop完成工作。
//event.c檔案
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
int res, done, retval = 0;
//加鎖
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
done = 0;
while (!done) {
//該函式的內部會解鎖,然後呼叫OS提供的的多路IO複用函式。
//這個函式退出後,又會立即加鎖。這有點像條件變數。
res = evsel->dispatch(base, tv_p);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
}
}
done:
//解鎖
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
在event_base_loop函式內部會進行加鎖,這是因為這裡要對event_base裡面的多個佇列進行一些資料操作(增刪操作),此時要用鎖來保護佇列不被另外一個執行緒所破壞。
上面程式碼中有兩個函式evsel->dispatch和event_process_active。前一個將呼叫多路IO複用函式,對event進行監聽,並且把滿足條件的event放到event_base的啟用佇列中。後一個則遍歷這個啟用佇列的所有event,逐個呼叫對應的回撥函式。
可以看到整個流程如下圖所示:
將已啟用event插入到啟用列表:
我們還是深入看看Libevent是怎麼把event新增到啟用佇列的。dispatch是一個函式指標,它的實現都包含是一個多路IO複用函式。這裡選擇poll這個多路IO複用函式來作分析。
//poll.c檔案
static int
poll_dispatch(struct event_base *base, struct timeval *tv)
{
int res, i, j, nfds;
long msec = -1;
struct pollop *pop = base->evbase;
struct pollfd *event_set;
nfds = pop->nfds;
event_set = pop->event_set;
//解鎖
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = poll(event_set, nfds, msec);
//再次加鎖
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
...
i = random() % nfds;
for (j = 0; j < nfds; j++) {
int what;
if (++i == nfds)
i = 0;
what = event_set[i].revents;
if (!what)
continue;
res = 0;
//如果fd發生錯誤,就把之當作讀和寫事件。之後呼叫read
//或者write時,就能得知具體是什麼錯誤了。這裡的作用是
//通知到上層。
if (what & (POLLHUP|POLLERR))
what |= POLLIN|POLLOUT;
if (what & POLLIN)
res |= EV_READ;
if (what & POLLOUT)
res |= EV_WRITE;
if (res == 0)
continue;
//把這個ev放到啟用佇列中。
evmap_io_active(base, event_set[i].fd, res);
}
return (0);
}
pollfd陣列的資料是在evmap_io_add函式中新增的,在evmap_io_add函式裡面,有一個evsel->add呼叫,它會把資料(fd和對應的監聽型別)放到pollfd陣列中。
當主執行緒從poll返回時,沒有錯誤的話,就說明有些監聽的事件發生了。在函式的後面,它會遍歷這個pollfd陣列,檢視哪個fd是有事件發生。如果事件發生,就呼叫evmap_io_active(base, event_set[i].fd, res);在這個函式裡面會把這個fd對應的event放到event_base的啟用event佇列中。下面是evmap_io_active的程式碼。
void //evmap.c檔案
evmap_io_active(struct event_base *base, evutil_socket_t fd, short events)
{
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
struct event *ev;
//由這個fd找到對應event_map_entry的TAILQ_HEAD.
GET_IO_SLOT(ctx, io, fd, evmap_io);
//遍歷這個佇列。將所有與fd相關聯的event結構體都處理一遍
TAILQ_FOREACH(ev, &ctx->events, ev_io_next) {
if (ev->ev_events & events)
event_active_nolock(ev, ev->ev_events & events, 1);
}
}
void //event.c檔案
event_active_nolock(struct event *ev, int res, short ncalls)
{
struct event_base *base;
base = ev->ev_base;
...
//將ev插入到啟用佇列
event_queue_insert(base, ev, EVLIST_ACTIVE);
...
}
//將event 插入到event_base的對應(由queue指定)的佇列裡面
static void //event.c檔案
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
...
ev->ev_flags |= queue;
switch (queue) {
case EVLIST_ACTIVE:
base->event_count_active++;
//將event插入到對應對應優先順序的啟用佇列中
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
ev,ev_active_next);
break;
}
}
經過上面三個函式的呼叫,就可以把一個fd對應的所有符合條件的event插入到啟用佇列中。因為Libevent還對事件處理設有優先順序,所以有一個啟用陣列佇列,而不是隻有一個啟用佇列。
注意,此時event結構體的ev_flags變數為EVLIST_INIT | EVLIST_INSERTED | EVLIST_ACTIVE了。
處理啟用列表中的event:
現在已經完成了將event插入到啟用佇列中。接下來就是遍歷啟用陣列佇列,把所有啟用的event都處理即可。
現在來追蹤event_process_active函式。
//event.c檔案
static int
event_process_active(struct event_base *base)
{
struct event_list *activeq = NULL;
int i, c = 0;
//從高優先順序到低優先順序遍歷優先順序陣列
for (i = 0; i < base->nactivequeues; ++i) {
//對於特定的優先順序,遍歷該優先順序的所有啟用event
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
activeq = &base->activequeues[i];
c = event_process_active_single_queue(base, activeq);
...
}
}
return c;
}
static int
event_process_active_single_queue(struct event_base *base,
struct event_list *activeq)
{
struct event *ev;
int count = 0;
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
//如果是永久事件,那麼只需從active佇列中刪除。
if (ev->ev_events & EV_PERSIST)
event_queue_remove(base, ev, EVLIST_ACTIVE);
else //不是的話,那麼就要把這個event刪除掉。
event_del_internal(ev);
if (!(ev->ev_flags & EVLIST_INTERNAL))
++count;
//下面開始處理這個event
switch (ev->ev_closure) {
...
case EV_CLOSURE_NONE:
//呼叫使用者設定的回撥函式。
(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
}
return count;
}
上面的程式碼,從高到低優先順序遍歷啟用event優先順序陣列。對於啟用的event,要呼叫event_queue_remove將之從啟用佇列中刪除掉。然後再對這個event呼叫其回撥函式。
event_queue_remove函式的呼叫會改變event結構體的ev_flags變數的值。呼叫後, ev_flags變數為EVLIST_INIT | EVLIST_INSERTED。現在又可以等待下一次事件的到來了。 -