1. 程式人生 > >Libevent原始碼分析-----Libevent工作流程探究

Libevent原始碼分析-----Libevent工作流程探究

        之前的博文講了很多Libevent的基礎構件,現在以一個實際例子來初步探究Libevent的基本工作流程。由於還有很多Libevent的細節並沒有講所以,這裡的探究還是比較簡潔,例子也相當簡單。

#include<unistd.h>

#include<stdio.h>

#include<event.h>

#include<thread.h>



void cmd_cb(int fd, short events, void *arg)

{

char buf[1024];

printf("in the cmd_cb\n");


read(fd, buf, sizeof(buf));

}



int main()

{

evthread_use_pthreads();


//使用預設的event_base配置

struct event_base *base = event_base_new();


struct event *cmd_ev = event_new(base, STDIN_FILENO,

EV_READ | EV_PERSIST, cmd_cb, NULL);


event_add(cmd_ev, NULL); //沒有超時


event_base_dispatch(base);


return 0;

}

        上面程式碼估計是不會比讀者寫的第一個Libevent程式複雜。但這已經包含了Libevent的基礎工作流程。這裡將進入這些函式的內部探究,並且只會講解之前博文出現過的,沒出現的,儘量不講。在講解之前,要先了解一下struct event這個結構體。

struct event {

TAILQ_ENTRY(event) ev_active_next; //啟用佇列

TAILQ_ENTRY(event) ev_next; //註冊事件佇列

/* for managing timeouts */

union {

TAILQ_ENTRY(event) ev_next_with_common_timeout;

int min_heap_idx; //指明該event結構體在堆的位置

} ev_timeout_pos; //僅用於定時事件處理器(event).EV_TIMEOUT型別


//對於I/O事件,是檔案描述符;對於signal事件,是訊號值

evutil_socket_t ev_fd;


struct event_base *ev_base; //所屬的event_base


//因為訊號和I/O是不能同時設定的。所以可以使用共用體以省記憶體

//在低版本的Libevent,兩者是分開的,不在共用體內。

union {

//無論是訊號還是IO,都有一個TAILQ_ENTRY的佇列。它用於這樣的情景:

//使用者對同一個fd呼叫event_new多次,並且都使用了不同的回撥函式。

//每次呼叫event_new都會產生一個event*。這個xxx_next成員就是把這些

//event連線起來的。


/* used for io events */

//用於IO事件

struct {

TAILQ_ENTRY(event) ev_io_next;

struct timeval ev_timeout;

} ev_io;


/* used by signal events */

//用於訊號事件

struct {

TAILQ_ENTRY(event) ev_signal_next;

short ev_ncalls; //事件就緒執行時,呼叫ev_callback的次數 /* Allows deletes in callback */

short *ev_pncalls; //指標,指向次數

} ev_signal;

} _ev;


short ev_events;//記錄監聽的事件型別 EV_READ EVTIMEOUT之類

short ev_res; /* result passed to event callback *///記錄了當前啟用事件的型別

//libevent用於標記event資訊的欄位,表明其當前的狀態.

//可能值為前面的EVLIST_XXX

short ev_flags;


//本event的優先順序。呼叫event_priority_set設定

ev_uint8_t ev_pri;

ev_uint8_t ev_closure;

struct timeval ev_timeout;//用於定時器,指定定時器的超時值


/* allows us to adopt for different types of events */

void (*ev_callback)(evutil_socket_t, short, void *arg); //回撥函式

void *ev_arg; //回撥函式的引數

};

        event結構體裡面有幾個TAILQ_ENTRY佇列節點型別。這裡因為一個event是會同時處於多個佇列之中。比如前幾篇博文說到的同一個檔案描述符或者訊號值對應的多個event會被連在一起,所有的被加入到event_base的event也會連在一起,所有被啟用的event也會被連在一起。所以會有多個QAILQ_ENTRY。

        event結構體只有一兩個之前沒有說到的概念,這不妨礙理解event結構體。而event_base結構體則會太多之前沒有說到的概念,所以這裡就不貼出event_base的程式碼了。

        在讀這篇博文前,最好讀一下前面幾篇博文,因為會用到其他講到的東西。如果之前有講過的東西,這裡也將一筆帶過。

        好了,開始探究。

        最前面的evthread_use_pthreads();就不多說了,看《多執行緒、鎖、條件變數(一)》和《多執行緒、鎖、條件變數(二)》這兩篇博文吧。

建立event_base:

        下面看一下event_base_new函式。它是由event_base_new_with_config函式實現的。我們還是看後面那個函式吧。

//event.c檔案

struct event_base *

event_base_new_with_config(const struct event_config *cfg)

{

int i;

struct event_base *base;

int should_check_environment;



//之所以不用mm_malloc是因為mm_malloc並不會清零該記憶體區域。

//而這個函式是會清零申請到的記憶體區域,這相當於被base初始化

if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {

event_warn("%s: calloc", __func__);

return NULL;

}


...


TAILQ_INIT(&base->eventqueue);


...


if (cfg)

base->flags = cfg->flags;


evmap_io_initmap(&base->io);

evmap_signal_initmap(&base->sigmap);


base->evbase = NULL;


should_check_environment =

!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));


//選擇IO複用結構體

for (i = 0; eventops[i] && !base->evbase; i++) {

if (cfg != NULL) {

/* determine if this backend should be avoided */

if (event_config_is_avoided_method(cfg,

eventops[i]->name))

continue;

if ((eventops[i]->features & cfg->require_features)

!= cfg->require_features)

continue;

}


if (should_check_environment &&

event_is_method_disabled(eventops[i]->name))

continue;


//找到一個滿足條件的多路IO複用函式

base->evsel = eventops[i];


//初始化ev_base。並且會對訊號監聽的處理也進行初始化

base->evbase = base->evsel->init(base);

}




#ifndef _EVENT_DISABLE_THREAD_SUPPORT

//測試evthread_lock_callbacks結構中的lock指標函式是否為NULL

//即測試Libevent是否已經初始化為支援多執行緒模式。

//由於一開始是用mm_calloc申請記憶體的,所以該記憶體區域的值為0

//對於th_base_lock變數,目前的值為NULL.

if (EVTHREAD_LOCKING_ENABLED() &&

(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { //配置是支援鎖的

EVTHREAD_ALLOC_LOCK(base->th_base_lock,

EVTHREAD_LOCKTYPE_RECURSIVE); //申請一個鎖

base->defer_queue.lock = base->th_base_lock;

EVTHREAD_ALLOC_COND(base->current_event_cond);//申請一個條件變數

}

#endif


return (base);

}

        這裡用到了event_config結構體,關於這個結構體可以參考《配置event_base》一文。這個結構體主要是對event_base進行一些配置。另外程式碼中還講到了怎麼使用選擇一個多IO複用函式,這個可以參考《跨平臺Reactor介面的實現》一文。

        巨集EVTHREAD_LOCKING_ENABLED主要是檢測是否已經支援鎖了。檢測的方式也很簡單,也就是檢測_evthread_lock_fns全域性變數中的lock成員變數是否不為NULL。有關這個_evthread_lock_fns全域性變數可以檢視《多執行緒、鎖、條件變數(一)》。

建立event:

        好了,現在event_base已經新建出來了。下面看一下event_new函式,它和前面的event_base_new一樣,把主要是的初始化工作交給另一個函式。event_new函式的工作只是建立一個struct event結構體,然後把它的引數原封不動地傳給event_assign,所以還是看event_assign函式。

//event.c檔案

int

event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd,

short events, void (*callback)(evutil_socket_t, short, void *), void *arg)

{

//進行一些賦值和初始化。

ev->ev_base = base;

ev->ev_callback = callback;

ev->ev_arg = arg;

ev->ev_fd = fd;

ev->ev_events = events;

ev->ev_res = 0;

ev->ev_flags = EVLIST_INIT; //初始化狀態

ev->ev_ncalls = 0;

ev->ev_pncalls = NULL;


if (events & EV_SIGNAL) {

if ((events & (EV_READ|EV_WRITE)) != 0) {

event_warnx("%s: EV_SIGNAL is not compatible with "

"EV_READ or EV_WRITE", __func__);

return -1;

}

}


...


return 0;

}

        從event_assign函式的名字可以得知它是進行賦值操作的。所以它能可以在event被初始化後再次呼叫。不過,初始化後再次呼叫的話,有些事情要注意。這個在後面的部落格中會說到。

        從上面的程式碼可看到:如果這個event是用來監聽一個訊號的,那麼就不能讓這個event監聽讀或者寫事件。原因是其與訊號event的實現方法相抵觸,具體可以參考《訊號event的處理》。

        注意,此時event結構體的變數ev_flags的值是EVLIST_INIT。對變數的追蹤是很有幫助的。它指明瞭event結構體的狀態。它通過以或運算的方式取下面的值:

//event_struct.h檔案

#define EVLIST_TIMEOUT 0x01 //event從屬於定時器佇列或者時間堆

#define EVLIST_INSERTED 0x02 //event從屬於註冊佇列

#define EVLIST_SIGNAL 0x04 //沒有使用

#define EVLIST_ACTIVE 0x08 //event從屬於活動佇列

#define EVLIST_INTERNAL 0x10 //該event是內部使用的。訊號處理時有用到

#define EVLIST_INIT 0x80 //event已經被初始化了


/* EVLIST_X_ Private space: 0x1000-0xf000 */

#define EVLIST_ALL (0xf000 | 0x9f) //所有標誌。這個不能取

將event加入到event_base中:

        建立完一個event結構體後,現在看一下event_add。它同前面的函式一樣,內部也是呼叫其他函式完成工作。因為它用到了鎖,所以給出它的程式碼

//event.c檔案

int

event_add(struct event *ev, const struct timeval *tv)

{

int res;


//加鎖

EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

res = event_add_internal(ev, tv, 0);

//解鎖

EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

return (res);

}


static inline int

event_add_internal(struct event *ev, const struct timeval *tv,

int tv_is_absolute)

{

struct event_base *base = ev->ev_base;

int res = 0;

int notify = 0;

...

if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&

!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {

if (ev->ev_events & (EV_READ|EV_WRITE))

res = evmap_io_add(base, ev->ev_fd, ev); //加入io佇列

else if (ev->ev_events & EV_SIGNAL)

res = evmap_signal_add(base, (int)ev->ev_fd, ev);//加入訊號佇列

if (res != -1)

event_queue_insert(base, ev, EVLIST_INSERTED);//向event_base註冊事件

}

...

return (res);

}

        event_add函式只是對event_base加了鎖,然後呼叫event_add_internal函式完成工作。所以函式event_add是執行緒安全的。

        event_add_internal函式會呼叫前幾篇博文講到的evmap_io_add和evmap_signal_add,把有相同檔案描述符fd和訊號值sig的event連在一個佇列裡面。成功之後,就會呼叫event_queue_insert,向event_base註冊事件。

        前面博文的evmap_io_add和evmap_signal_add函式內部還有一些地方並沒有說到。那就是把要監聽的fd或者sig新增到多路IO複用函式中,使得其是可以監聽的。

//evmap.c檔案

int

evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev)

{

const struct eventop *evsel = base->evsel;

struct event_io_map *io = &base->io;

struct evmap_io *ctx = NULL;

int nread, nwrite, retval = 0;

short res = 0, old = 0;

struct event *old_ev;


...


//GET_IO_SLOT_AND_CTOR巨集的作用就是讓ctx指向struct event_map_entry結構體中的TAILQ_HEAD

//巨集的展開,可以到http://blog.csdn.net/luotuo44/article/details/38403241檢視

GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,

evsel->fdinfo_len);


//同一個fd可以呼叫event_new,event_add

//多次。nread、nwrite就是記錄有多少次。如果每次event_new的回撥函式

//都不一樣,那麼當fd有可讀或者可寫時,這些回撥函式都是會觸發的。

//對一個fd不能event_new、event_add太多次的。後面會進行判斷

nread = ctx->nread;

nwrite = ctx->nwrite;


if (nread)

old |= EV_READ;

if (nwrite)

old |= EV_WRITE;


if (ev->ev_events & EV_READ) {

//記錄是不是第一次。如果是第一次,那麼就說明該fd還沒被

//加入到多路IO複用中。即還沒被加入到像select、epoll這些

//函式中。那麼就要加入。這個在後面可以看到。

if (++nread == 1)

res |= EV_READ;

}

if (ev->ev_events & EV_WRITE) {

if (++nwrite == 1)

res |= EV_WRITE;

}

if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff)) {

event_warnx("Too many events reading or writing on fd %d",

(int)fd);

return -1;

}



//把fd加入到多路IO複用中。

if (res) {

void *extra = ((char*)ctx) + sizeof(struct evmap_io);

if (evsel->add(base, ev->ev_fd,

old, (ev->ev_events & EV_ET) | res, extra) == -1)

return (-1);

retval = 1;

}


//nread進行了++。把次數記錄下來。下次對於同一個fd,這個次數就有用了

ctx->nread = (ev_uint16_t) nread;

ctx->nwrite = (ev_uint16_t) nwrite;


TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next);


return (retval);

}

        程式碼中有兩個計數nread和nwrite,當其值為1時,就說明是第一次監聽對應的事件。此時,就要把這個fd新增到多路IO複用函式中。這就完成fd與select、poll、epoll之類的多路IO複用函式的相關聯。這完成對fd監聽的第一步。

        下面再看event_queue_insert函式的實現。

//event.c檔案

static void

event_queue_insert(struct event_base *base, struct event *ev, int queue)

{

...


ev->ev_flags |= queue;

switch (queue) {

case EVLIST_INSERTED:

TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);

break;

...

}

}

        這個函式的主要作為是把event加入到對應的佇列中。在這裡,是為了把event加入到eventqueue這個已註冊佇列中,即將event向event_base註冊。注意,此時event結構體的ev_flags變數為EVLIST_INIT | EVLIST_INSERTED了。

進入主迴圈,開始監聽event:        

        現在事件已經新增完畢,開始進入主迴圈event_base_dispatch函式。還是同樣,該函式內部呼叫event_base_loop完成工作。

//event.c檔案

int

event_base_loop(struct event_base *base, int flags)

{

const struct eventop *evsel = base->evsel;

int res, done, retval = 0;


//加鎖

EVBASE_ACQUIRE_LOCK(base, th_base_lock);


done = 0;


while (!done) {

//該函式的內部會解鎖,然後呼叫OS提供的的多路IO複用函式。

//這個函式退出後,又會立即加鎖。這有點像條件變數。

res = evsel->dispatch(base, tv_p);


if (N_ACTIVE_CALLBACKS(base)) {

int n = event_process_active(base);

}

}


done:

//解鎖

EVBASE_RELEASE_LOCK(base, th_base_lock);

return (retval);

}

        在event_base_loop函式內部會進行加鎖,這是因為這裡要對event_base裡面的多個佇列進行一些資料操作(增刪操作),此時要用鎖來保護佇列不被另外一個執行緒所破壞。

        上面程式碼中有兩個函式evsel->dispatch和event_process_active。前一個將呼叫多路IO複用函式,對event進行監聽,並且把滿足條件的event放到event_base的啟用佇列中。後一個則遍歷這個啟用佇列的所有event,逐個呼叫對應的回撥函式。

        可以看到整個流程如下圖所示:

        

將已啟用event插入到啟用列表:

        我們還是深入看看Libevent是怎麼把event新增到啟用佇列的。dispatch是一個函式指標,它的實現都包含是一個多路IO複用函式。這裡選擇poll這個多路IO複用函式來作分析。

//poll.c檔案

static int

poll_dispatch(struct event_base *base, struct timeval *tv)

{

int res, i, j, nfds;

long msec = -1;

struct pollop *pop = base->evbase;

struct pollfd *event_set;


nfds = pop->nfds;


event_set = pop->event_set;


//解鎖

EVBASE_RELEASE_LOCK(base, th_base_lock);

res = poll(event_set, nfds, msec);

//再次加鎖

EVBASE_ACQUIRE_LOCK(base, th_base_lock);


...


i = random() % nfds;

for (j = 0; j < nfds; j++) {

int what;

if (++i == nfds)

i = 0;

what = event_set[i].revents;

if (!what)

continue;


res = 0;


//如果fd發生錯誤,就把之當作讀和寫事件。之後呼叫read

//或者write時,就能得知具體是什麼錯誤了。這裡的作用是

//通知到上層。

if (what & (POLLHUP|POLLERR))

what |= POLLIN|POLLOUT;


if (what & POLLIN)

res |= EV_READ;

if (what & POLLOUT)

res |= EV_WRITE;

if (res == 0)

continue;


//把這個ev放到啟用佇列中。

evmap_io_active(base, event_set[i].fd, res);

}


return (0);

}

        pollfd陣列的資料是在evmap_io_add函式中新增的,在evmap_io_add函式裡面,有一個evsel->add呼叫,它會把資料(fd和對應的監聽型別)放到pollfd陣列中。

        當主執行緒從poll返回時,沒有錯誤的話,就說明有些監聽的事件發生了。在函式的後面,它會遍歷這個pollfd陣列,檢視哪個fd是有事件發生。如果事件發生,就呼叫evmap_io_active(base, event_set[i].fd, res);在這個函式裡面會把這個fd對應的event放到event_base的啟用event佇列中。下面是evmap_io_active的程式碼。

void //evmap.c檔案

evmap_io_active(struct event_base *base, evutil_socket_t fd, short events)

{

struct event_io_map *io = &base->io;

struct evmap_io *ctx;

struct event *ev;


//由這個fd找到對應event_map_entry的TAILQ_HEAD.

GET_IO_SLOT(ctx, io, fd, evmap_io);


//遍歷這個佇列。將所有與fd相關聯的event結構體都處理一遍

TAILQ_FOREACH(ev, &ctx->events, ev_io_next) {

if (ev->ev_events & events)

event_active_nolock(ev, ev->ev_events & events, 1);

}

}


void //event.c檔案

event_active_nolock(struct event *ev, int res, short ncalls)

{

struct event_base *base;

base = ev->ev_base;


...

//將ev插入到啟用佇列

event_queue_insert(base, ev, EVLIST_ACTIVE);


...

}



//將event 插入到event_base的對應(由queue指定)的佇列裡面

static void //event.c檔案

event_queue_insert(struct event_base *base, struct event *ev, int queue)

{

...


ev->ev_flags |= queue;

switch (queue) {

case EVLIST_ACTIVE:

base->event_count_active++;

//將event插入到對應對應優先順序的啟用佇列中

TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],

ev,ev_active_next);

break;

}

}

        經過上面三個函式的呼叫,就可以把一個fd對應的所有符合條件的event插入到啟用佇列中。因為Libevent還對事件處理設有優先順序,所以有一個啟用陣列佇列,而不是隻有一個啟用佇列。

        注意,此時event結構體的ev_flags變數為EVLIST_INIT | EVLIST_INSERTED | EVLIST_ACTIVE了。

處理啟用列表中的event:

        現在已經完成了將event插入到啟用佇列中。接下來就是遍歷啟用陣列佇列,把所有啟用的event都處理即可。

        現在來追蹤event_process_active函式。

//event.c檔案

static int

event_process_active(struct event_base *base)

{

struct event_list *activeq = NULL;

int i, c = 0;


//從高優先順序到低優先順序遍歷優先順序陣列

for (i = 0; i < base->nactivequeues; ++i) {

//對於特定的優先順序,遍歷該優先順序的所有啟用event

if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {

activeq = &base->activequeues[i];

c = event_process_active_single_queue(base, activeq);

...

}

}

return c;

}


static int

event_process_active_single_queue(struct event_base *base,

struct event_list *activeq)

{

struct event *ev;

int count = 0;


for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {

//如果是永久事件,那麼只需從active佇列中刪除。

if (ev->ev_events & EV_PERSIST)

event_queue_remove(base, ev, EVLIST_ACTIVE);

else //不是的話,那麼就要把這個event刪除掉。

event_del_internal(ev);

if (!(ev->ev_flags & EVLIST_INTERNAL))

++count;


//下面開始處理這個event

switch (ev->ev_closure) {

...

case EV_CLOSURE_NONE:

//呼叫使用者設定的回撥函式。

(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);

break;

}


EVBASE_ACQUIRE_LOCK(base, th_base_lock);


}

return count;

}

        上面的程式碼,從高到低優先順序遍歷啟用event優先順序陣列。對於啟用的event,要呼叫event_queue_remove將之從啟用佇列中刪除掉。然後再對這個event呼叫其回撥函式。

        event_queue_remove函式的呼叫會改變event結構體的ev_flags變數的值。呼叫後, ev_flags變數為EVLIST_INIT | EVLIST_INSERTED。現在又可以等待下一次事件的到來了。 -