1. 程式人生 > >Libevent原始碼分析(四)--- libevent事件機制

Libevent原始碼分析(四)--- libevent事件機制

之前幾個章節都是分析libevent的輔助功能,這一節將要詳細分析libevent處理事件的流程和機制,在分析之前先看一下libevent的使用方法,本文也將以libevent的使用方式入手來分析libevent的工作機制。

void cb_func(evutil_socket_t fd, short what, void *arg)
{
        const char *data = arg;
        printf("Got an event on socket %d:%s%s%s%s [%s]",
            (int) fd,
            (what&EV_TIMEOUT) ? " timeout"
: "", (what&EV_READ) ? " read" : "", (what&EV_WRITE) ? " write" : "", (what&EV_SIGNAL) ? " signal" : "", data); } void main_loop(evutil_socket_t fd1, evutil_socket_t fd2) { struct event *ev1, *ev2; struct timeval five_seconds = {5
,0}; struct event_base *base = event_base_new(); /* The caller has already set up fd1, fd2 somehow, and make them nonblocking. */ ev1 = event_new(base, fd1, EV_TIMEOUT|EV_READ|EV_PERSIST, cb_func, (char*)"Reading event"); ev2 = event_new(base, fd2, EV_WRITE|EV_PERSIST, cb_func, (char
*)"Writing event"); event_add(ev1, &five_seconds); event_add(ev2, NULL); event_base_dispatch(base); }

使用libevent,必須先初始化一個event_base結構體,event_base結構體之前分析過,下面是它的初始化程式碼:

struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
    int i;
    struct event_base *base;
    int should_check_environment;

#ifndef _EVENT_DISABLE_DEBUG_MODE
    event_debug_mode_too_late = 1;
#endif

    if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
        event_warn("%s: calloc", __func__);
        return NULL;
    }
    detect_monotonic();
    gettime(base, &base->event_tv);

    min_heap_ctor(&base->timeheap);
    TAILQ_INIT(&base->eventqueue);
    base->sig.ev_signal_pair[0] = -1;
    base->sig.ev_signal_pair[1] = -1;
    base->th_notify_fd[0] = -1;
    base->th_notify_fd[1] = -1;

    event_deferred_cb_queue_init(&base->defer_queue);
    base->defer_queue.notify_fn = notify_base_cbq_callback;
    base->defer_queue.notify_arg = base;
    if (cfg)
        base->flags = cfg->flags;

    evmap_io_initmap(&base->io);
    evmap_signal_initmap(&base->sigmap);
    event_changelist_init(&base->changelist);

    base->evbase = NULL;

    should_check_environment =
        !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));

    for (i = 0; eventops[i] && !base->evbase; i++) {
        if (cfg != NULL) {
            /* determine if this backend should be avoided */
            if (event_config_is_avoided_method(cfg,
                eventops[i]->name))
                continue;
            if ((eventops[i]->features & cfg->require_features)
                != cfg->require_features)
                continue;
        }

        /* also obey the environment variables */
        if (should_check_environment &&
            event_is_method_disabled(eventops[i]->name))
            continue;

        base->evsel = eventops[i];

        base->evbase = base->evsel->init(base);
    }

    if (base->evbase == NULL) {
        event_warnx("%s: no event mechanism available",
            __func__);
        base->evsel = NULL;
        event_base_free(base);
        return NULL;
    }

    if (evutil_getenv("EVENT_SHOW_METHOD"))
        event_msgx("libevent using: %s", base->evsel->name);

    /* allocate a single active event queue */
    if (event_base_priority_init(base, 1) < 0) {
        event_base_free(base);
        return NULL;
    }

    /* prepare for threading */

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    if (EVTHREAD_LOCKING_ENABLED() &&
        (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
        int r;
        EVTHREAD_ALLOC_LOCK(base->th_base_lock,
            EVTHREAD_LOCKTYPE_RECURSIVE);
        base->defer_queue.lock = base->th_base_lock;
        EVTHREAD_ALLOC_COND(base->current_event_cond);
        r = evthread_make_base_notifiable(base);
        if (r<0) {
            event_warnx("%s: Unable to make base notifiable.", __func__);
            event_base_free(base);
            return NULL;
        }
    }
#endif

#ifdef WIN32
    if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
        event_base_start_iocp(base, cfg->n_cpus_hint);
#endif

    return (base);
}

該初始化函式除了初始化event_base之外還要決定使用哪個io多路複用模型,所有的io多路複用模型都定義在eventops結構體中:

static const struct eventop *eventops[] = {
#ifdef _EVENT_HAVE_EVENT_PORTS
    &evportops,
#endif
#ifdef _EVENT_HAVE_WORKING_KQUEUE
    &kqops,
#endif
#ifdef _EVENT_HAVE_EPOLL
    &epollops,
#endif
#ifdef _EVENT_HAVE_DEVPOLL
    &devpollops,
#endif
#ifdef _EVENT_HAVE_POLL
    &pollops,
#endif
#ifdef _EVENT_HAVE_SELECT
    &selectops,
#endif
#ifdef WIN32
    &win32ops,
#endif
    NULL
};

event_base_new_with_config會從上到下進行選擇,儘量選用系統支援的最高效率的模型,event_config也可以配置選擇哪種模型。
初始化完event_base之後就可以開始新增事件了。

struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
    struct event *ev;
    ev = mm_malloc(sizeof(struct event));
    if (ev == NULL)
        return (NULL);
    if (event_assign(ev, base, fd, events, cb, arg) < 0) {
        mm_free(ev);
        return (NULL);
    }

    return (ev);
}
int event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
    if (!base)
        base = current_base;

    _event_debug_assert_not_added(ev);

    ev->ev_base = base;

    ev->ev_callback = callback;
    ev->ev_arg = arg;
    ev->ev_fd = fd;
    ev->ev_events = events;
    ev->ev_res = 0;
    ev->ev_flags = EVLIST_INIT;
    ev->ev_ncalls = 0;
    ev->ev_pncalls = NULL;

    if (events & EV_SIGNAL) {
        if ((events & (EV_READ|EV_WRITE)) != 0) {
            event_warnx("%s: EV_SIGNAL is not compatible with "
                "EV_READ or EV_WRITE", __func__);
            return -1;
        }
        ev->ev_closure = EV_CLOSURE_SIGNAL;
    } else {
        if (events & EV_PERSIST) {
            evutil_timerclear(&ev->ev_io_timeout);
            ev->ev_closure = EV_CLOSURE_PERSIST;
        } else {
            ev->ev_closure = EV_CLOSURE_NONE;
        }
    }

    min_heap_elem_init(ev);

    if (base != NULL) {
        /* by default, we put new events into the middle priority */
        ev->ev_pri = base->nactivequeues / 2;
    }

    _event_debug_note_setup(ev);

    return 0;
}

event_new用來建立一個event,event_assign用來初始化一個event。這裡比較重要的是ev_flags的取值,他用來標記ev_flags的狀態,EVLIST_INIT表明這是一個新建立的事件,只完成了初始化,還沒有加入到任何佇列中。下面是ev_flags的所有狀態:

#define EVLIST_TIMEOUT  0x01
#define EVLIST_INSERTED 0x02
#define EVLIST_SIGNAL   0x04
#define EVLIST_ACTIVE   0x08
#define EVLIST_INTERNAL 0x10
#define EVLIST_INIT     0x80

初始化event之後就可以新增到佇列中了:

int
event_add(struct event *ev, const struct timeval *tv)
{
    int res;

    if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
        event_warnx("%s: event has no event_base set.", __func__);
        return -1;
    }

    EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

    res = event_add_internal(ev, tv, 0);

    EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

    return (res);
}

static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
    int tv_is_absolute)
{
    struct event_base *base = ev->ev_base;
    int res = 0;
    int notify = 0;

    EVENT_BASE_ASSERT_LOCKED(base);
    _event_debug_assert_is_setup(ev);

    event_debug((
         "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
         ev,
         EV_SOCK_ARG(ev->ev_fd),
         ev->ev_events & EV_READ ? "EV_READ " : " ",
         ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
         tv ? "EV_TIMEOUT " : " ",
         ev->ev_callback));

    EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

    /*
     * prepare for timeout insertion further below, if we get a
     * failure on any step, we should not change any state.
     */
    if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
        if (min_heap_reserve(&base->timeheap,
            1 + min_heap_size(&base->timeheap)) == -1)
            return (-1);  /* ENOMEM == errno */
    }

    /* If the main thread is currently executing a signal event's
     * callback, and we are not the main thread, then we want to wait
     * until the callback is done before we mess with the event, or else
     * we can race on ev_ncalls and ev_pncalls below. */
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
        && !EVBASE_IN_THREAD(base)) {
        ++base->current_event_waiters;
        EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
    }
#endif

    if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
        !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
        if (ev->ev_events & (EV_READ|EV_WRITE))
            res = evmap_io_add(base, ev->ev_fd, ev);
        else if (ev->ev_events & EV_SIGNAL)
            res = evmap_signal_add(base, (int)ev->ev_fd, ev);
        if (res != -1)
            event_queue_insert(base, ev, EVLIST_INSERTED);
        if (res == 1) {
            /* evmap says we need to notify the main thread. */
            notify = 1;
            res = 0;
        }
    }

    /*
     * we should change the timeout state only if the previous event
     * addition succeeded.
     */
    if (res != -1 && tv != NULL) {
        struct timeval now;
        int common_timeout;

        /*
         * for persistent timeout events, we remember the
         * timeout value and re-add the event.
         *
         * If tv_is_absolute, this was already set.
         */
        if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
            ev->ev_io_timeout = *tv;

        /*
         * we already reserved memory above for the case where we
         * are not replacing an existing timeout.
         */
        if (ev->ev_flags & EVLIST_TIMEOUT) {
            /* XXX I believe this is needless. */
            if (min_heap_elt_is_top(ev))
                notify = 1;
            event_queue_remove(base, ev, EVLIST_TIMEOUT);
        }

        /* Check if it is active due to a timeout.  Rescheduling
         * this timeout before the callback can be executed
         * removes it from the active list. */
        if ((ev->ev_flags & EVLIST_ACTIVE) &&
            (ev->ev_res & EV_TIMEOUT)) {
            if (ev->ev_events & EV_SIGNAL) {
                /* See if we are just active executing
                 * this event in a loop
                 */
                if (ev->ev_ncalls && ev->ev_pncalls) {
                    /* Abort loop */
                    *ev->ev_pncalls = 0;
                }
            }

            event_queue_remove(base, ev, EVLIST_ACTIVE);
        }

        gettime(base, &now);

        common_timeout = is_common_timeout(tv, base);
        if (tv_is_absolute) {
            ev->ev_timeout = *tv;
        } else if (common_timeout) {
            struct timeval tmp = *tv;
            tmp.tv_usec &= MICROSECONDS_MASK;
            evutil_timeradd(&now, &tmp, &ev->ev_timeout);
            ev->ev_timeout.tv_usec |=
                (tv->tv_usec & ~MICROSECONDS_MASK);
        } else {
            evutil_timeradd(&now, tv, &ev->ev_timeout);
        }

        event_debug((
             "event_add: timeout in %d seconds, call %p",
             (int)tv->tv_sec, ev->ev_callback));

        event_queue_insert(base, ev, EVLIST_TIMEOUT);
        if (common_timeout) {
            struct common_timeout_list *ctl =
                get_common_timeout_list(base, &ev->ev_timeout);
            if (ev == TAILQ_FIRST(&ctl->events)) {
                common_timeout_schedule(ctl, &now, ev);
            }
        } else {
            /* See if the earliest timeout is now earlier than it
             * was before: if so, we will need to tell the main
             * thread to wake up earlier than it would
             * otherwise. */
            if (min_heap_elt_is_top(ev))
                notify = 1;
        }
    }

    /* if we are not in the right thread, we need to wake up the loop */
    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
        evthread_notify_base(base);

    _event_debug_note_add(ev);

    return (res);
}

事件的新增和刪除需要新增鎖,因為操作可能是通過其他執行緒呼叫的。event_add_internal的第二個引數代表這是一個和時間相關的值,第三個引數則用來表示第二個引數是絕對值還是相對值,即是具體的某一個時間點還是一個時間間隔。如果時間不為空並且EVLIST_TIMEOUT沒有被設定,則需要在小根堆中預留一個位置。base->current_event代表主執行緒正在處理該event的callback,此時的新增和刪除操作都需要等待處理完成的通知。之後就要把事件插入到對應的資料結構中了,如果是EV_READ或者EV_WRITE事件,則插入到evmap_io中,如果是EV_SIGNAL事件則插入到evmap_signal中。EV_SIGNAL事件和其他兩個事件是互斥的。之後呼叫event_queue_insert方法把事件插入到event_base中的雙向連結串列中,event_queue_insert根據傳入的queue標記把event新增相應的狀態並且插入到對應的資料結構中。此時改event的ev_flags標記變為了EVLIST_INIT|EVLIST_INSERTED,下面是event_queue_insert函式:

static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
    EVENT_BASE_ASSERT_LOCKED(base);

    if (ev->ev_flags & queue) {
        /* Double insertion is possible for active events */
        if (queue & EVLIST_ACTIVE)
            return;

        event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__,
            ev, EV_SOCK_ARG(ev->ev_fd), queue);
        return;
    }

    if (~ev->ev_flags & EVLIST_INTERNAL)
        base->event_count++;

    ev->ev_flags |= queue;
    switch (queue) {
    case EVLIST_INSERTED:
        TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
        break;
    case EVLIST_ACTIVE:
        base->event_count_active++;
        TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
            ev,ev_active_next);
        break;
    case EVLIST_TIMEOUT: {
        if (is_common_timeout(&ev->ev_timeout, base)) {
            struct common_timeout_list *ctl =
                get_common_timeout_list(base, &ev->ev_timeout);
            insert_common_timeout_inorder(ctl, ev);
        } else
            min_heap_push(&base->timeheap, ev);
        break;
    }
    default:
        event_errx(1, "%s: unknown queue %x", __func__, queue);
    }
}

繼續看event_add_internal,當呼叫event_queue_insert之後需要設定notify標記,該標記用於在非主執行緒操作時喚醒主線乘。因為可能新的event設定的timeout時間小於當前io模型的timeout時間,喚醒的方式依舊是通過socketpair,因為socketpari中的其中一個套接字已經作為一個內部event新增到event_base中,只要有寫事件會馬上返回,停止睡眠。
接下來的程式碼都是處理時間相關的,首先如果這是一個persist事件並且時間設定的是相對時間,則需要儲存這個相對時間,ev_io_timeout用於儲存該時間,persist和signal事件是互斥的。ev_timeout會儲存一個相對值,之後再次呼叫event_queue_insert將event儲存到小跟堆或者是common list列表中。這裡需要注意的是因為新加入到的時間時間如果是common時間並且新加入的event在commonlist 列表的第一個則需要調整common_timeout_list的timeout_event,timeout_event可能之前在小根堆中(佇列之前不為空,並且新加入的event的timeout時間小於timeout_event的過期時間,在前面的章節中分析過這在理論上是不可能的,但是libevent還是做了一次檢查),也可能不在小根隊中。

static void
common_timeout_schedule(struct common_timeout_list *ctl,
    const struct timeval *now, struct event *head)
{
    struct timeval timeout = head->ev_timeout;
    timeout.tv_usec &= MICROSECONDS_MASK;
    event_add_internal(&ctl->timeout_event, &timeout, 1);
}

event_add_internal方法中有判斷,如果ev_flags有標記EVLIST_TIMEOUT,則會呼叫 event_queue_remove(base, ev, EVLIST_TIMEOUT)先從小根堆(common_timeout_list的timeout_event在小根堆中)中移除,然後重新新增到小根堆中。
最後event_add_internal會根據情況判斷是否喚醒主執行緒。

event_add分析完成之後就是event_base_dispatch函數了,該函式是event_base的主迴圈。內部實際執行的是event_base_loop方法:

int
event_base_loop(struct event_base *base, int flags)
{
    const struct eventop *evsel = base->evsel;
    struct timeval tv;
    struct timeval *tv_p;
    int res, done, retval = 0;

    /* Grab the lock.  We will release it inside evsel.dispatch, and again
     * as we invoke user callbacks. */
    EVBASE_ACQUIRE_LOCK(base, th_base_lock);

    if (base->running_loop) {
        event_warnx("%s: reentrant invocation.  Only one event_base_loop"
            " can run on each event_base at once.", __func__);
        EVBASE_RELEASE_LOCK(base, th_base_lock);
        return -1;
    }

    base->running_loop = 1;

    clear_time_cache(base);

    if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
        evsig_set_base(base);

    done = 0;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    base->th_owner_id = EVTHREAD_GET_ID();
#endif

    base->event_gotterm = base->event_break = 0;

    while (!done) {
        base->event_continue = 0;

        /* Terminate the loop if we have been asked to */
        if (base->event_gotterm) {
            break;
        }

        if (base->event_break) {
            break;
        }

        timeout_correct(base, &tv);

        tv_p = &tv;
        if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
            timeout_next(base, &tv_p);
        } else {
            /*
             * if we have active events, we just poll new events
             * without waiting.
             */
            evutil_timerclear(&tv);
        }

        /* If we have no events, we just exit */
        if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
            event_debug(("%s: no events registered.", __func__));
            retval = 1;
            goto done;
        }

        /* update last old time */
        gettime(base, &base->event_tv);

        clear_time_cache(base);

        res = evsel->dispatch(base, tv_p);

        if (res == -1) {
            event_debug(("%s: dispatch returned unsuccessfully.",
                __func__));
            retval = -1;
            goto done;
        }

        update_time_cache(base);

        timeout_process(base);

        if (N_ACTIVE_CALLBACKS(base)) {
            int n = event_process_active(base);
            if ((flags & EVLOOP_ONCE)
                && N_ACTIVE_CALLBACKS(base) == 0
                && n != 0)
                done = 1;
        } else if (flags & EVLOOP_NONBLOCK)
            done = 1;
    }
    event_debug(("%s: asked to terminate loop.", __func__));

done:
    clear_time_cache(base);
    base->running_loop = 0;

    EVBASE_RELEASE_LOCK(base, th_base_lock);

    return (retval);
}

evsig_set_base方法使得libevent中只有最後呼叫event_base_dispatch的event_base才能支援訊號量事件。在進入到while迴圈之前,event_base_loop會獲取全域性鎖:EVBASE_ACQUIRE_LOCK(base, th_base_lock);進入while迴圈之後會進行時間的校正,這在之前的部落格中分析過。之後是一些狀態的判斷和時間的設定,接著就呼叫了evsel->dispatch(base, tv_p),如果當前有啟用事件,tv_p則為空,如果沒有tv_p則設定為小根堆中的最小時間。 該方法會呼叫對應的io模型的dispach方法用於檢測io事件,如果有事件則呼叫 evmap_io_active(base, i, res),該方法定義如下:

void
evmap_io_active(struct event_base *base, evutil_socket_t fd, short events)
{
    struct event_io_map *io = &base->io;
    struct evmap_io *ctx;
    struct event *ev;

#ifndef EVMAP_USE_HT
    EVUTIL_ASSERT(fd < io->nentries);
#endif
    GET_IO_SLOT(ctx, io, fd, evmap_io);

    EVUTIL_ASSERT(ctx);
    TAILQ_FOREACH(ev, &ctx->events, ev_io_next) {
        if (ev->ev_events & events)
            event_active_nolock(ev, ev->ev_events & events, 1);
    }
}

evmap_io_active會遍歷所有與該fd相關的event,如果fd上的事件是event監聽的事件,則呼叫event_active_nolock方法:

void
event_active_nolock(struct event *ev, int res, short ncalls)
{
    struct event_base *base;

    event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
        ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));


    /* We get different kinds of events, add them together */
    if (ev->ev_flags & EVLIST_ACTIVE) {
        ev->ev_res |= res;
        return;
    }

    base = ev->ev_base;

    EVENT_BASE_ASSERT_LOCKED(base);

    ev->ev_res = res;

    if (ev->ev_pri < base->event_running_priority)
        base->event_continue = 1;

    if (ev->ev_events & EV_SIGNAL) {
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
        if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
            ++base->current_event_waiters;
            EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
        }
#endif
        ev->ev_ncalls = ncalls;
        ev->ev_pncalls = NULL;
    }

    event_queue_insert(base, ev, EVLIST_ACTIVE);

    if (EVBASE_NEED_NOTIFY(base))
        evthread_notify_base(base);
}

該函式會呼叫event_queue_insert(base, ev, EVLIST_ACTIVE),用於把event新增到啟用列表中,event_queue_insert內部會判斷是否已經有了EVLIST_ACTIVE標記,如果有則不會重複新增。
繼續看event_base_loop函式,處理完IO事件之後會接著處理小根堆定時器事件。

/* Activate every event whose timeout has elapsed. */
static void
timeout_process(struct event_base *base)
{
    /* Caller must hold lock. */
    struct timeval now;
    struct event *ev;

    if (min_heap_empty(&base->timeheap)) {
        return;
    }

    gettime(base, &now);

    while ((ev = min_heap_top(&base->timeheap))) {
        if (evutil_timercmp(&ev->ev_timeout, &now, >))
            break;

        /* delete this event from the I/O queues */
        event_del_internal(ev);

        event_debug(("timeout_process: call %p",
             ev->ev_callback));
        event_active_nolock(ev, EV_TIMEOUT, 1);
    }
}

timeout_process會把小根堆中所有超時事件都呼叫event_del_internal,如果這是一個EV_PERSIST事件,之後在event_persist_closure還會添加回來,最後呼叫event_active_nolock把事件加入到啟用事件連結串列。最後當啟用連結串列中有事件時會呼叫event_process_active(base)來處理所有的啟用事件:

static int
event_process_active(struct event_base *base)
{
    /* Caller must hold th_base_lock */
    struct event_list *activeq = NULL;
    int i, c = 0;

    for (i = 0; i < base->nactivequeues; ++i) {
        if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
            base->event_running_priority = i;
            activeq = &base->activequeues[i];
            c = event_process_active_single_queue(base, activeq);
            if (c < 0) {
                base->event_running_priority = -1;
                return -1;
            } else if (c > 0)
                break; /* Processed a real event; do not
                    * consider lower-priority events */
            /* If we get here, all of the events we processed
             * were internal.  Continue. */
        }
    }

    event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
    base->event_running_priority = -1;
    return c;
}

event_process_active會根據優先順序順序呼叫event_process_active_single_queue處理已啟用狀態的事件,deferred_callbacks將在後面event_buffer中詳細分析:

static int
event_process_active_single_queue(struct event_base *base, struct event_list *activeq)
{
    struct event *ev;
    int count = 0;

    EVUTIL_ASSERT(activeq != NULL);

    for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
        if (ev->ev_events & EV_PERSIST)
            event_queue_remove(base, ev, EVLIST_ACTIVE);
        else
            event_del_internal(ev);
        if (!(ev->ev_flags & EVLIST_INTERNAL))
            ++count;

        event_debug((
             "event_process_active: event: %p, %s%scall %p",
            ev,
            ev->ev_res & EV_READ ? "EV_READ " : " ",
            ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
            ev->ev_callback));

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
        base->current_event = ev;
        base->current_event_waiters = 0;
#endif
        switch (ev->ev_closure) {
        case EV_CLOSURE_SIGNAL:
            event_signal_closure(base, ev);
            break;
        case EV_CLOSURE_PERSIST:
            event_persist_closure(base, ev);
            break;
        default:
        case EV_CLOSURE_NONE:
            EVBASE_RELEASE_LOCK(base, th_base_lock);
            (*ev->ev_callback)(
                ev->ev_fd, ev->ev_res, ev->ev_arg);
            break;
        }
        EVBASE_ACQUIRE_LOCK(base, th_base_lock);
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
        base->current_event = NULL;
        if (base->current_event_waiters) {
            base->current_event_waiters = 0;
            EVTHREAD_COND_BROADCAST(base->current_event_cond);
        }
#endif

        if (base->event_break)
            return -1;
        if (base->event_continue)
            break;
    }
    return count;
}

如果事件是EV_PERSIST型別只需要從啟用佇列中移除即可,否則就會執行event_del_internal。如果事件是定時器事件,那麼該事件在timeout_process中已經移除過一次了,但是當時事件不是啟用狀態的。所以此時的event_queue_remove和event_del_internal作用相同,都是從啟用列表中移除。接下來需要特殊處理的就是訊號量事件和EV_PERSIST事件,訊號量事件需要使用event_signal_closure關閉,EV_PERSIST需要呼叫event_persist_closure進行清理,普通事件直接呼叫回撥即可:

static inline void
event_persist_closure(struct event_base *base, struct event *ev)
{
    /* reschedule the persistent event if we have a timeout. */
    if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) {
        /* If there was a timeout, we want it to run at an interval of
         * ev_io_timeout after the last time it was _scheduled_ for,
         * not ev_io_timeout after _now_.  If it fired for another
         * reason, though, the timeout ought to start ticking _now_. */
        struct timeval run_at, relative_to, delay, now;
        ev_uint32_t usec_mask = 0;
        EVUTIL_ASSERT(is_same_common_timeout(&ev->ev_timeout,
            &ev->ev_io_timeout));
        gettime(base, &now);
        if (is_common_timeout(&ev->ev_timeout, base)) {
            delay = ev->ev_io_timeout;
            usec_mask = delay.tv_usec & ~MICROSECONDS_MASK;
            delay.tv_usec &= MICROSECONDS_MASK;
            if (ev->ev_res & EV_TIMEOUT) {
                relative_to = ev->ev_timeout;
                relative_to.tv_usec &= MICROSECONDS_MASK;
            } else {
                relative_to = now;
            }
        } else {
            delay = ev->ev_io_timeout;
            if (ev->ev_res & EV_TIMEOUT) {
                relative_to = ev->ev_timeout;
            } else {
                relative_to = now;
            }
        }
        evutil_timeradd(&relative_to, &delay, &run_at);
        if (evutil_timercmp(&run_at, &now, <)) {
            /* Looks like we missed at least one invocation due to
             * a clock jump, not running the event loop for a
             * while, really slow callbacks, or
             * something. Reschedule relative to now.
             */
            evutil_timeradd(&now, &delay, &run_at);
        }
        run_at.tv_usec |= usec_mask;
        event_add_internal(ev, &run_at, 1);
    }
    EVBASE_RELEASE_LOCK(base, th_base_lock);
    (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
}

該函式主要是重置時間,不管啟用事件是不是因為timeout引起的都需要重置時間然後重新新增事件。event_add_internal會進行判斷如果事件已經在evmap_io中或者evmap_signal中則不處理,但是如果在小根堆活著commonlist中則需要移除在新增。

以上就是libevent處理事件的流程,下面是作者在網上找的一幅流程圖:
這裡寫圖片描述