libevent程式碼閱讀(13)——epoll的事件等待以及分發過程
阿新 • • 發佈:2019-01-28
在前面的章節我們提到event_base_loop中會呼叫具體的io複用機制的事件等待以及分發函式evsel->dispatch,然後呼叫event_process_active處理已經啟用的事件。
我們看到epoll_dispatch中呼叫了一個evmap_io_active函式,這個函式會將啟用的事件插入到已啟用的事件列表中
event_queue_insert函式才正真將事件插入到已啟用事件佇列中:
對於epoll而言,dispatch就是epoll_dispatch,我們看一下它的執行流程
// 事件分發 static int epoll_dispatch(struct event_base *base, struct timeval *tv) { struct epollop *epollop = base->evbase; struct epoll_event *events = epollop->events; int i, res; long timeout = -1; if (tv != NULL) { timeout = evutil_tv_to_msec(tv); if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) { /* Linux kernels can wait forever if the timeout is * too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */ timeout = MAX_EPOLL_TIMEOUT_MSEC; } } // 暫時不用管下面兩個函式的呼叫 epoll_apply_changes(base); event_changelist_remove_all(&base->changelist, base); EVBASE_RELEASE_LOCK(base, th_base_lock); // 等待事件發生,res存放事件發生的個數 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); EVBASE_ACQUIRE_LOCK(base, th_base_lock); // 出錯 if (res == -1) { if (errno != EINTR) { event_warn("epoll_wait"); return (-1); } return (0); } event_debug(("%s: epoll_wait reports %d", __func__, res)); EVUTIL_ASSERT(res <= epollop->nevents); // 遍歷所有觸發的事件 for (i = 0; i < res; i++) { int what = events[i].events; short ev = 0; if (what & (EPOLLHUP|EPOLLERR)) { // 有錯誤發生 ev = EV_READ | EV_WRITE; } else { // 可讀 if (what & EPOLLIN) ev |= EV_READ; // 可寫 if (what & EPOLLOUT) ev |= EV_WRITE; } if (!ev) continue; // 啟用io事件對映表中的事件處理器 evmap_io_active(base, events[i].data.fd, ev | EV_ET); } // 如果所有的事件都被觸發了,表示事件陣列的大小還是太小了,需要擴充套件事件陣列的大小 if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) { /* We used all of the event space this time. We should be ready for more events next time. */ int new_nevents = epollop->nevents * 2; struct epoll_event *new_events; new_events = mm_realloc(epollop->events, new_nevents * sizeof(struct epoll_event)); if (new_events) { epollop->events = new_events; epollop->nevents = new_nevents; } } return (0); }
我們看到epoll_dispatch中呼叫了一個evmap_io_active函式,這個函式會將啟用的事件插入到已啟用的事件列表中
event_active_nolock函式的過程如下:/* * 啟用io事件對映表中的事件,然後插入到已啟用事件佇列中 */ void evmap_io_active(struct event_base *base, evutil_socket_t fd, short events) { struct event_io_map *io = &base->io; struct evmap_io *ctx; struct event *ev; #ifndef EVMAP_USE_HT EVUTIL_ASSERT(fd < io->nentries); #endif GET_IO_SLOT(ctx, io, fd, evmap_io); EVUTIL_ASSERT(ctx); TAILQ_FOREACH(ev, &ctx->events, ev_io_next) { if (ev->ev_events & events) event_active_nolock(ev, ev->ev_events & events, 1); } }
/* * 非阻塞的啟用事件,並將它存放於己啟用事件列表中 */ void event_active_nolock(struct event *ev, int res, short ncalls) { struct event_base *base; event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p", ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback)); /* We get different kinds of events, add them together */ // 如果本事件已經存在於已啟用事件列表中,那麼就返回 if (ev->ev_flags & EVLIST_ACTIVE) { ev->ev_res |= res; return; } base = ev->ev_base; EVENT_BASE_ASSERT_LOCKED(base); ev->ev_res = res; // 如果當前事件的優先順序小於event_base的執行時優先順序 // 就讓event_base跳過本次迴圈,進行下一次迴圈 if (ev->ev_pri < base->event_running_priority) base->event_continue = 1; // 如果是訊號事件 if (ev->ev_events & EV_SIGNAL) { #ifndef _EVENT_DISABLE_THREAD_SUPPORT if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } #endif ev->ev_ncalls = ncalls; ev->ev_pncalls = NULL; } // 把被啟用的事件插入到已啟用事件列表 event_queue_insert(base, ev, EVLIST_ACTIVE); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); }
event_queue_insert函式才正真將事件插入到已啟用事件佇列中:
/*
* 將事件處理器新增到各種事件佇列中
* 將io事件處理器和訊號事件處理器插入註冊事件佇列
* 將定時器插入通用定時器佇列或時間堆
* 將被啟用的事件處理器新增到活動事件佇列中
*/
static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
EVENT_BASE_ASSERT_LOCKED(base);
// 避免重複插入
if (ev->ev_flags & queue) {
/* Double insertion is possible for active events */
if (queue & EVLIST_ACTIVE)
return;
event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__,
ev, EV_SOCK_ARG(ev->ev_fd), queue);
return;
}
// 增加event_base擁有的事件處理器的數量
if (~ev->ev_flags & EVLIST_INTERNAL)
base->event_count++;
// 標記該事件處理器已經被處理過
ev->ev_flags |= queue;
// 根據不同的事件插入到不同的佇列中
switch (queue) {
// 將io事件處理器或訊號事件處理器插入註冊事件佇列
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
// 將就緒事件處理器插入活動事件佇列
case EVLIST_ACTIVE:
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
ev,ev_active_next);
break;
// 將定時器插入到通用定時器佇列或時間堆
case EVLIST_TIMEOUT: {
if (is_common_timeout(&ev->ev_timeout, base)) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
insert_common_timeout_inorder(ctl, ev);
} else
min_heap_push(&base->timeheap, ev);
break;
}
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
}
}