Libev原始碼分析02:Libev中的IO監視器
一:程式碼流程
在Libev中,啟動一個IO監視器,等待該監視器上的事件觸發,然後呼叫該監視器的回撥函式。整個的流程是這樣的:
首先呼叫ev_default_loop初始化struct ev_loop結構;
然後呼叫ev_io_init初始化監視器中的屬性,該巨集主要就是呼叫ev_init和ev_io_set;
然後呼叫ev_io_start啟動該監視器,該函式主要是將監視器新增到loop->anfds結構中,將監視的描述符新增到((loop)->fdchanges)中;
呼叫ev_run開始等待事件的觸發,該函式中:
首先會呼叫fd_reify函式,該函式根據((loop)->fdchanges)中記錄的描述符,將該描述符上的事件新增到backend所使用的資料結構中,比如select中的fd_set中;
然後呼叫time_update更新當前的時間,如果日曆時間被人為調整的話,則相應的調整超時事件和週期事件;
呼叫backend_poll開始等待事件的發生,如果事件在規定時間內觸發的話,則會呼叫fd_event將觸發的監視器記錄到loop->pendings中;
backend的監聽函式(select,poll,epoll_wait)返回之後,首先再次呼叫time_update更新當前的時間,然後呼叫ev_invoke_pending,依次處理loop->pendings中的監視器,呼叫該監視器的回撥函式。
以上就是Libev中IO監視器的工作流程,下面詳細分析各個函式:
1:ev_default_loop函式
#if EV_MULTIPLICITY struct ev_loop * #else int #endif ev_default_loop (unsigned int flags) { if (!ev_default_loop_ptr) { #if EV_MULTIPLICITY struct ev_loop *loop = ev_default_loop_ptr = &default_loop_struct; #else ev_default_loop_ptr = 1; #endif loop_init (loop, flags); if (ev_backend (loop)) { #if EV_CHILD_ENABLE ev_signal_init (&childev, childcb, SIGCHLD); ev_set_priority (&childev, EV_MAXPRI); ev_signal_start (EV_A_ &childev); ev_unref (loop); /* child watcher should not keep loop alive */ #endif } else ev_default_loop_ptr = 0; } return ev_default_loop_ptr; }
EV_MULTIPLICITY巨集用來決定是否支援多個loop。系統提供了預設的loop結構default_loop_struct,和指向其的指標ev_default_loop_ptr。
如果支援多個loop,則default_loop_struct就是一個靜態的struct ev_loop型別的結構體,其中包含了各種成員,比如ev_tstamp ev_rt_now; int pendingpri;等等。
ev_default_loop_ptr就是指向struct ev_loop 型別的指標。
如果不支援多個loop,則上述的struct ev_loop結構就不復存在,其成員都是以靜態變數的形式進行定義,而ev_default_loop_ptr也只是一個int變數,用來表明”loop”是否已經初始化成功。
下面的描述,均以支援多個loop為準。
在ev_default_loop中,首先是呼叫loop_init初始化loop中的各種成員:
static void loop_init (struct ev_loop *loop, unsigned int flags)
{
if (!backend)
{
origflags = flags;
#if EV_USE_REALTIME
if (!have_realtime)
{
struct timespec ts;
if (!clock_gettime (CLOCK_REALTIME, &ts))
have_realtime = 1;
}
#endif
#if EV_USE_MONOTONIC
if (!have_monotonic)
{
struct timespec ts;
if (!clock_gettime (CLOCK_MONOTONIC, &ts))
have_monotonic = 1;
}
#endif
/* pid check not overridable via env */
#ifndef _WIN32
if (flags & EVFLAG_FORKCHECK)
curpid = getpid ();
#endif
if (!(flags & EVFLAG_NOENV) && !enable_secure () && getenv ("LIBEV_FLAGS"))
flags = atoi (getenv ("LIBEV_FLAGS"));
ev_rt_now = ev_time ();
mn_now = get_clock ();
now_floor = mn_now;
rtmn_diff = ev_rt_now - mn_now;
#if EV_FEATURE_API
invoke_cb = ev_invoke_pending;
#endif
io_blocktime = 0.;
timeout_blocktime = 0.;
backend = 0;
backend_fd = -1;
sig_pending = 0;
#if EV_ASYNC_ENABLE
async_pending = 0;
#endif
pipe_write_skipped = 0;
pipe_write_wanted = 0;
evpipe [0] = -1;
evpipe [1] = -1;
#if EV_USE_INOTIFY
fs_fd = flags & EVFLAG_NOINOTIFY ? -1 : -2;
#endif
#if EV_USE_SIGNALFD
sigfd = flags & EVFLAG_SIGNALFD ? -2 : -1;
#endif
if (!(flags & EVBACKEND_MASK))
flags |= ev_recommended_backends ();
#if EV_USE_IOCP
if (!backend && (flags & EVBACKEND_IOCP )) backend = iocp_init (EV_A_ flags);
#endif
#if EV_USE_PORT
if (!backend && (flags & EVBACKEND_PORT )) backend = port_init (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init (EV_A_ flags);
#endif
#if EV_USE_POLL
if (!backend && (flags & EVBACKEND_POLL )) backend = poll_init (EV_A_ flags);
#endif
#if EV_USE_SELECT
if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif
ev_prepare_init (&pending_w, pendingcb);
#if EV_SIGNAL_ENABLE || EV_ASYNC_ENABLE
ev_init (&pipe_w, pipecb);
ev_set_priority (&pipe_w, EV_MAXPRI);
#endif
}
}
呼叫ev_time初始化ev_rt_now,得到當前的日曆時間,也就是自19700101000000以來的秒數,該值通過CLOCK_REALTIME或者gettimeofday得到;
呼叫get_clock初始化mn_now,該變數要麼是CLOCK_MONOTONIC(系統啟動時間),要麼就是ev_time的值(日曆時間);
然後就是:
now_floor = mn_now;
rtmn_diff = ev_rt_now - mn_now;
#if EV_FEATURE_API
invoke_cb = ev_invoke_pending;
#endif
io_blocktime = 0.;
timeout_blocktime = 0.;
backend = 0;
backend_fd = -1;
sig_pending = 0;
#if EV_ASYNC_ENABLE
async_pending = 0;
#endif
pipe_write_skipped = 0;
pipe_write_wanted = 0;
evpipe [0] = -1;
evpipe [1] = -1;
#if EV_USE_INOTIFY
fs_fd = flags & EVFLAG_NOINOTIFY ? -1 : -2;
#endif
#if EV_USE_SIGNALFD
sigfd = flags & EVFLAG_SIGNALFD ? -2 : -1;
#endif
之後呼叫ev_recommended_backends得到當前系統支援的backend型別,比如select,poll, epoll等。
接下來就是根據系統支援的backend,按照一定的優先順序,去初始化backend:
#if EV_USE_IOCP
if (!backend && (flags & EVBACKEND_IOCP )) backend = iocp_init (EV_A_ flags);
#endif
#if EV_USE_PORT
if (!backend && (flags & EVBACKEND_PORT )) backend = port_init (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init (EV_A_ flags);
#endif
#if EV_USE_POLL
if (!backend && (flags & EVBACKEND_POLL )) backend = poll_init (EV_A_ flags);
#endif
#if EV_USE_SELECT
if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif
接下來,初始化loop中的ev_prepare監視器pending_w,以及ev_io監視器pipe_w
loop_init返回後,backend已經初始化完成,接著,初始化並啟動訊號監視器ev_signal childev。暫不深入。
至此,初始化預設loop的工作就完成了。
2:ev_init
該函式以巨集的形式存在,主要用來設定監視器的公共成員active、pending、priority、cb等。程式碼如下:
#define ev_init(ev,cb_) do { \
((ev_watcher *)(void *)(ev))->active = \
((ev_watcher *)(void *)(ev))->pending = 0; \
ev_set_priority ((ev), 0); \
ev_set_cb ((ev), cb_); \
} while (0)
3:ev_io_set
該巨集主要是設定IO監視器ev_io的特有成員:要監聽的描述符fd和其上的事件event。其中設定event會包含事件掩碼EV__IOFDSET,其程式碼如下:
#define ev_io_set(ev,fd_,events_) do { (ev)->fd = (fd_); (ev)->events = (events_) | EV__IOFDSET; } while (0)
4:ev_io_start
void ev_io_start (struct ev_loop *loop, ev_io *w) EV_THROW
{
int fd = w->fd;
if (expect_false (ev_is_active (w)))
return;
assert (("libev: ev_io_start called with negative fd", fd >= 0));
assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));
ev_start (loop, (W)w, 1);
array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
wlist_add (&anfds[fd].head, (WL)w);
/* common bug, apparently */
assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));
fd_change (loop, fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
w->events &= ~EV__IOFDSET;
}
首先對監視器ev做檢查:
ev->active ==0: 監視器現在的狀態應是未啟動的;
fd>=0;
(!(w->events& ~(EV__IOFDSET | EV_READ | EV_WRITE))): IO監視器只能監控EV__IOFDSET,EV_READ,EV_WRITE中的事件,其他事件一律不能關心。
呼叫ev_start矯正ev的優先順序;置ev->active=1表明狀態為啟動狀態;++(loop->activecnt)
根據情況調整((loop)->anfds)陣列的大小,然後將監視器ev加入到(loop->anfds)[fd].head的連結串列中。
loop->anfds是ANFD結構型別的陣列,ANFD結構體定義如下:
typedef struct
{
WL head;
unsigned char events; /* the events watched for */
unsigned char reify; /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
unsigned char emask; /* the epoll backend stores the actual kernel mask in here */
unsigned char unused;
#if EV_USE_EPOLL
unsigned int egen; /* generation counter to counter epoll bugs */
#endif
#if EV_SELECT_IS_WINSOCKET || EV_USE_IOCP
SOCKET handle;
#endif
#if EV_USE_IOCP
OVERLAPPED or, ow;
#endif
} ANFD;
每一個描述符對應著一個ANFD結構,描述符的值就是((loop)->anfds)的下標。每個描述符上可以有若干監視器,同一個描述符上的監視器以連結串列的形式組織,這裡ANFD結構中的head就是連結串列頭指標。
((loop)->anfds)陣列是動態變化的,初始為空。(loop)->anfdmax就是該陣列的當前大小。
呼叫fd_change(loop, fd, w->events & EV__IOFDSET |EV_ANFD_REIFY):
void fd_change (struct ev_loop *loop, int fd, int flags)
{
unsigned char reify = anfds [fd].reify;
anfds [fd].reify |= flags;
if (expect_true (!reify))
{
++fdchangecnt;
array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);
fdchanges [fdchangecnt - 1] = fd;
}
}
檢視(loop->anfds)[fd].reify的原值,如果原值為0,表明該描述符是第一次加入監控,將其記錄到((loop)->fdchanges)陣列中,該陣列記錄了當前監控中的描述符,((loop)->fdchangemax)記錄該陣列當前實際大小,((loop)->fdchangecnt)記錄該陣列當前使用大小。
將w->events & EV__IOFDSET | EV_ANFD_REIFY新增到(loop->anfds)[fd].reify中。
最後,將w->events中的EV__IOFDSET掩碼消除:
w->events &= ~EV__IOFDSET;
5:fd_reify
void fd_reify (struct ev_loop *loop)
{
int i;
for (i = 0; i < fdchangecnt; ++i)
{
int fd = fdchanges [i];
ANFD *anfd = anfds + fd;
ev_io *w;
unsigned char o_events = anfd->events;
unsigned char o_reify = anfd->reify;
anfd->reify = 0;
/*if (expect_true (o_reify & EV_ANFD_REIFY)) probably a deoptimisation */
{
anfd->events = 0;
for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
anfd->events |= (unsigned char)w->events;
if (o_events != anfd->events)
o_reify = EV__IOFDSET; /* actually |= */
}
if (o_reify & EV__IOFDSET)
backend_modify (loop, fd, o_events, anfd->events);
}
fdchangecnt = 0;
}
輪訓陣列((loop)->fdchanges),從0到((loop)->fdchangecnt-1)之間的所有元素,每個元素代表了一個描述符,根據取得的描述符值fd,找到相應的ANFD結構anfd。
記錄原anfd->events和anfd->reify的值,然後:
anfd->reify = 0;
anfd->events = 0;
for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
anfd->events |= (unsigned char)w->events;
然後呼叫backend_modify函式開始對fd及其上的所有事件開始監控。以backend為select例,就是根據anfd->events中的事件,將fd新增到相應的fd_set中去。
最後,置((loop)->fdchangecnt)為0。
6:time_update函式
該函式重新獲得mn_now、ev_rt_now等的值,並且如果發現時間被人為調整的話,則在程式碼中也作出相應的調整。ev_rt_now表示日曆時間,mn_now要麼表示系統啟動時間,要麼表示日曆時間。
/* fetch new monotonic and realtime times from the kernel */
/* also detect if there was a timejump, and act accordingly */
void time_update (struct ev_loop *loop, ev_tstamp max_block)
{
#if EV_USE_MONOTONIC
if (expect_true (have_monotonic))
{
int i;
ev_tstamp odiff = rtmn_diff;
mn_now = get_clock ();
/* only fetch the realtime clock every 0.5*MIN_TIMEJUMP seconds */
/* interpolate in the meantime */
if (expect_true (mn_now - now_floor < MIN_TIMEJUMP * .5))
{
ev_rt_now = rtmn_diff + mn_now;
return;
}
now_floor = mn_now;
ev_rt_now = ev_time ();
/* loop a few times, before making important decisions.
* on the choice of "4": one iteration isn't enough,
* in case we get preempted during the calls to
* ev_time and get_clock. a second call is almost guaranteed
* to succeed in that case, though. and looping a few more times
* doesn't hurt either as we only do this on time-jumps or
* in the unlikely event of having been preempted here.
*/
for (i = 4; --i; )
{
ev_tstamp diff;
rtmn_diff = ev_rt_now - mn_now;
diff = odiff - rtmn_diff;
if (expect_true ((diff < 0. ? -diff : diff) < MIN_TIMEJUMP))
return; /* all is well */
ev_rt_now = ev_time ();
mn_now = get_clock ();
now_floor = mn_now;
}
/* no timer adjustment, as the monotonic clock doesn't jump */
/* timers_reschedule (EV_A_ rtmn_diff - odiff) */
# if EV_PERIODIC_ENABLE
periodics_reschedule (EV_A);
# endif
}
else
#endif
{
ev_rt_now = ev_time ();
if (expect_false (mn_now > ev_rt_now || ev_rt_now > mn_now + max_block + MIN_TIMEJUMP))
{
/* adjust timers. this is easy, as the offset is the same for all of them */
timers_reschedule (EV_A_ ev_rt_now - mn_now);
#if EV_PERIODIC_ENABLE
periodics_reschedule (EV_A);
#endif
}
mn_now = ev_rt_now;
}
}
如果巨集定義EV_USE_MONOTONIC為1,並且have_monotonic為1(sys_clock_gettime支援CLOCK_MONOTONIC)的話,mn_now就表示系統啟動時間,它不會被人為的調整。
這種情況下,更新系統啟動時間mn_now的值,如果該值與舊的mn_now的值之差不超過0.5s的話,表示剛剛更新過時間(更新時間不超過0.5s),則更新ev_rt_now之後,直接退出。
更新ev_rt_now的值,然後根據ev_rt_now- mn_now之差的變化,判斷時間是否被人調整。如果ev_rt_now - mn_now之差的浮動小於1s,則說明時間沒有調整,直接退出。如果浮動大於1s,則重新更新mn_now和ev_rt_now,再次判斷時間差的浮動,如果判斷了3次,浮動始終大於1s,說明時間被認為調整了,則需要更新週期事件,這種情況下不調整超時事件(超時事件都是根據mn_now設定的,在have_monotonic為1的情況下,mn_now表示系統啟動時間,不會被調整)。
如果巨集定義EV_USE_MONOTONIC為0,或者have_monotonic為0(sys_clock_gettime不支援CLOCK_MONOTONIC)的話,mn_now與ev_rt_now一樣,也是日曆時間。
這種情況下,更新ev_rt_now的值,將該值與之前的日曆時間比較,如果時間被人調整了,則需要調整超時事件和週期事件。
7:fd_event
在backend_poll函式中,如果有些監視器的事件觸發了,就會呼叫fd_event函式,將觸發的描述符fd和事件event記錄到pending陣列中。
void fd_event (struct ev_loop *loop, int fd, int revents)
{
ANFD *anfd = anfds + fd;
if (expect_true (!anfd->reify))
fd_event_nocheck (EV_A_ fd, revents);
}
已經觸發而還沒有處理的事件狀態稱為PENDING狀態。在fd_event函式中,根據fd找到相應的ANFD結構。然後就是:
if (expect_true (!anfd->reify))
fd_event_nocheck (EV_A_ fd, revents);
fd_event_nocheck的程式碼如下,根據fd找到相應的ANFD結構,輪訓其中的監視器連結串列,如果某監視器上的事件觸發了,則呼叫ev_feed_event函式處理:
fd_event_nocheck (EV_P_ int fd, int revents)
{
ANFD *anfd = anfds + fd;
ev_io *w;
for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
{
int ev = w->events & revents;
if (ev)
ev_feed_event (EV_A_ (W)w, ev);
}
}
ev_feed_event程式碼如下:
void ev_feed_event (struct ev_loop *loop, void *w, int revents)
{
W w_ = (W)w;
int pri = ABSPRI (w_);
if (expect_false (w_->pending))
pendings [pri][w_->pending - 1].events |= revents;
else
{
w_->pending = ++pendingcnt [pri];
array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
pendings [pri][w_->pending - 1].w = w_;
pendings [pri][w_->pending - 1].events = revents;
}
pendingpri = NUMPRI - 1;
}
(loop->pendingcnt)是一個一維整型陣列,(loop->pendingcnt)[i]表示當前處於PENDING狀態的優先順序為i的監視器的個數。
(loop->pendings )是個二維陣列,每個元素型別為ANPENDING,該結構的定義如下:
/* stores the pending event set for a given watcher */
typedef struct
{
ev_watcher *w;
int events;
} ANPENDING;
APPENDING結構記錄了處於PENDING狀態的監視器以及觸發的事件。(loop->pendings)陣列,以優先順序為第一維,以APPENDING為第二維。
在函式ev_feed_event中,判斷w_->pending的值,該值為0表示該監視器第一次被啟用,不為0表示的是該監視器已經處於PENDING狀態,而其具體的值,代表該監視器在pendings [pri]中的排名(從1開始),也就是當前(loop->pendingcnt) [pri]的值。
該值不為0,說明該監視器已經處於PENDING狀態了,因此只需要:
pendings [pri][w_->pending - 1].events |= revents;
如果該值為0,則
w_->pending = ++pendingcnt [pri];
array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
pendings [pri][w_->pending - 1].w = w_;
pendings [pri][w_->pending - 1].events = revents;
8:ev_invoke_pending
void ev_invoke_pending (struct ev_loop *loop )
{
pendingpri = NUMPRI;
while (pendingpri) /* pendingpri possibly gets modified in the inner loop */
{
--pendingpri;
while (pendingcnt [pendingpri])
{
ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];
p->w->pending = 0;
EV_CB_INVOKE (p->w, p->events);
}
}
}
該函式主要是,呼叫所有當前處於PENDING狀態的監視器的回撥函式。根據優先順序pendingpri從高到底,(loop->pendingcnt) [pendingpri]表示PENDING狀態的,優先順序為pendingpri的監視器個數。從後向前輪訓(loop->pendings)[pendingpri]陣列,呼叫每個監視器的回撥函式。並且置w->pending = 0。
二:總結
三:例子
ev_io io_w;
void io_action(struct ev_loop *main_loop, ev_io *io_w, int e)
{
int rst;
char buf[1024] = {'\0'};
rst = read(io_w->fd, buf, sizeof(buf));
if(rst <= 0)
{
close(io_w->fd);
printf("client over\n");
ev_io_stop(main_loop,io_w);
return;
}
buf[1023] = '\0';
printf("Read in a string: %s \n",buf);
write(io_w->fd, buf, strlen(buf));
}
int socketfd()
{
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
perror("socket error");
return -1;
}
struct sockaddr_in serveraddr;
struct sockaddr_in clientaddr;
int addrlen = sizeof(struct sockaddr_in);
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(8898);
if(bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr_in)) < 0)
{
perror("bind error");
return -1;
}
if(listen(listenfd, 5) < 0)
{
perror("listen error");
return -1;
}
int connectfd = 0;
connectfd = accept(listenfd, (struct sockaddr *)&clientaddr, (socklen_t *)&addrlen);
if(connectfd < 0)
{
perror("accept error");
return -1;
}
return connectfd;
}
int main()
{
int fd = socketfd();
if(fd < 0) return;
struct ev_loop *main_loop = ev_default_loop(0);
ev_init(&io_w,io_action);
ev_io_set(&io_w,fd,EV_READ);
ev_io_start(main_loop,&io_w);
ev_run(main_loop,0);
return;
}