1. 程式人生 > >libev:詳解

libev:詳解

事件庫之Libev(一)

使用Libev

Libev的作者寫了一份很好的官方Manual,比較的齊全,即介紹了Libev的設計思想,也介紹了基本使用還包括內部各類事件詳細介紹。這裡略微贅述一下。Libev通過一個 struct ev_loop 結結構表示一個事件驅動的框架。在這個框架裡面通過ev_xxx結構,ev_init、ev_xxx_set、ev_xxx_start介面向這個事件驅動的框架裡面註冊事件監控器,當相應的事件監控器的事件出現時,便會觸發該事件監控器的處理邏輯,去處理該事件。處理完之後,這些監控器進入到下一輪的監控中。符合一個標準的事件驅動狀態的模型。

Libev 除了提供了基本的三大類事件(IO事件、定時器事件、訊號事件)外還提供了週期事件、子程序事件、檔案狀態改變事件等多個事件,這裡我們用三大基本事件寫一個例子。

  1. #include<ev.h>
  2. #include <stdio.h>
  3. #include <signal.h>
  4. #include <sys/unistd.h>
  5.  
  6. ev_io io_w;
  7. ev_timer timer_w;
  8. ev_signal signal_w;
  9.  
  10. void io_action(struct ev_loop *main_loop,ev_io *io_w,int e)
  11. {
  12.         int rst;
  13.         char buf[1024] = {'\0'};
  14.         puts("in io cb\n");
  15.         read(STDIN_FILENO,buf,sizeof(buf));
  16.         buf[1023] = '\0';
  17.         printf("Read in a string %s \n",buf);
  18.         ev_io_stop(main_loop,io_w);
  19. }
  20.  
  21. void timer_action(struct ev_loop *main_loop,ev_timer *timer_w,int e)
  22. {
  23.         puts("in tiemr cb \n");
  24.         ev_timer_stop(main_loop,timer_w);
  25. }
  26.  
  27. void signal_action(struct ev_loop *main_loop,ev_signal signal_w,int e)
  28. {
  29.         puts("in signal cb \n");
  30.         ev_signal_stop(main_loop,signal_w);
  31.         ev_break(main_loop,EVBREAK_ALL);
  32. }
  33.  
  34. int main(int argc ,char *argv[])
  35. {
  36.         struct ev_loop *main_loop = ev_default_loop(0);
  37.         ev_init(&io_w,io_action);
  38.         ev_io_set(&io_w,STDIN_FILENO,EV_READ); 
  39.         ev_init(&timer_w,timer_action);
  40.         ev_timer_set(&timer_w,2,0); 
  41.         ev_init(&signal_w,signal_action);
  42.         ev_signal_set(&signal_w,SIGINT);
  43.  
  44.         ev_io_start(main_loop,&io_w);
  45.         ev_timer_start(main_loop,&timer_w);
  46.         ev_signal_start(main_loop,&signal_w);
  47.  
  48.         ev_run(main_loop,0);
  49.  
  50. return 0;
  51. }

定義了3個監控器(watcher),以及觸發監控器時要執行動作的回撥函式。Libev定義了多種監控器,命名方式為 ev_xxx 這裡xxx代表監控器型別,其實現是一個結構體,

  1. typedef struct ev_io
  2. {
  3.   ....
  4. } ev_io

回撥函式的型別為 void cb_name(struct ev_loop *main_loop,ev_xxx *io_w,int event) 。

呼叫 ev_default_loop(0)生成一個預製的全域性驅動器。

初始化監控器的過程是將相應的回撥函式即觸發時的動作註冊到監控器上。

設定觸發條件則是該條件產生時才去執行註冊到監控器上的動作。對於IO事件,一般是設定特定fd上的的可讀或可寫事件,定時器則是多久後觸發。這裡定時器的觸發條件中還有第三引數,表示第一次觸發後,是否迴圈,若為0則不迴圈,否則按該值迴圈。訊號觸發器則是設定觸發的訊號。

在初始化並設定好觸發條件後,先呼叫ev_xxx_start 將監控器註冊到事件驅動器上。接著呼叫 ev_run 開始事件驅動器。

在事件的觸發動作裡面。我加入了一個 ev_xxx_stop 函式,是將監控器從事件驅動器裡面登出掉。使其不再起作用。而在訊號觸發的動作中還加入了一個 ev_break 該函式可以使程序跳出 main_loop 事件驅動器迴圈,也就是關閉事件驅動器。結束這一邏輯。

libev最簡單的示例就是這樣的一個結構。定義一個監控器、書寫觸發動作邏輯、初始化監控器、設定監控器觸發條件、將監控器加入大事件驅動器的迴圈中即可。一個比較清晰的事件驅動框架。

libev的事件驅動過程可以想象成如下的虛擬碼:

  1. do_some_init()
  2. is_run = True
  3. while is_run:
  4.     t = caculate_loop_time()
  5.     deal_loop(t)
  6.     deal_with_pending_event()
  7. do_some_clear()

首先做一些初始化操作,然後進入到迴圈中,該迴圈通過一個狀態位來控制是否執行。在迴圈中,計算出下一次輪詢的時間,這裡輪詢的實現就採用了系統提供的epoll、kqueue等機制。再輪詢結束後檢查有哪些監控器的被觸發了,依次執行觸發動作。

事件庫之Libev(二)

Libev原始碼結構

把原始碼進行分類:一類是產品類的,就比如Redis、Ngnix這一類本身是一個完整的可以運維的成熟產品;另一類就是Libev這樣的元件類的。對於元件類的專案,我一般就是分成這樣幾步:

  1. 有文件看文件,沒有文件問相關人員(包括Google),這個元件主要提供什麼服務
  2. 結合上述資訊使用元件的API寫個示例程式,跑起來
  3. 大致瀏覽下原始碼,分析一下程式碼的組織結構
  4. 根據使用的API,進到原始碼中看看主幹是怎麼樣實現的,從而瞭解整體思路
  5. 再搜刮原始碼,把一些輔助的功能看下,並在例子中嘗試
  6. 之後將整個理解用文字記錄下來。提煉兩大塊內容:實現思想和技巧tips

ev.c程式碼結構

首先下載Libev的原始碼包,下載回來後進行解壓,Libev的原始碼都放在同一個目錄中,除去autoconfig產生的檔案,程式碼檔案還是比較直觀的。主要的.c和.h檔案從命名上也差不多能猜出來幹嘛呢。根據我們的例子,主要抽出其中的"ev.c ev_epoll.c ev_select.c ev.h ev_wrap.c ev_vars.c"結合我們的例子進行梳理。

“ev_epoll.c"和"ev_select.c"是對系統提供的IO複用機制“epoll”、“select"的支援,還有"poll”、“kqueue” Solaris的"port"的支援,分別是"ev_poll.c”、“ev_kqueue.c”、“ev_port.c”。具體的框架是類似的,因此只要分析一個其他的就都瞭解了。

“ev.h” 是對一些API和變數、常量的定義,“ev.c"是Libev的主要邏輯,其中在型別的定義的時候用了一個巨集的包裝來宣告成員變數,在檔案"ev_vars.c” 中。為了對成員變數使用的語句進行簡化,就又寫了一個"ev_wrap.c”。因此我們可以這樣去看待這些檔案,主要邏輯都在"ev.c”,其中部分常量、變數的定義可以在"ev.h"中,有些結構的成員變數部分的定義在"ev_vars.c"中,同時對該結構成員變數的引用通過"ev_wrap.c"檔案做了個簡寫的巨集定義;當需要系統提供底層的事件介面時,按分類分別在"ev_epoll.c”、“ev_select.c"等檔案中。

開啟"ev.c"檔案,“ev.h"裡面的各種定義,在需要的時候去查詢即可,通過IDE或者Vim/Emacs結合cscope/ctag都可以很好的解決。通過瀏覽可以發現這些程式碼大概可以分成三部分:

程式碼結構

因此可以直接跳到程式碼部分。分隔點有ecb結束的註釋。這可以不用擔心略過的部分,等需要的時候回過去查閱即可。其中ecb的部分,只要知道其API作用即可,無需深究,如果未來需要的時候可以到這邊來做一個參考。

對於邏輯結構可以把他分成幾個部分: 
邏輯結構

這樣對整體的佈局有個大概的瞭解,就可以有選擇性的逐個突破了。

主要資料結構

瀏覽的過程中梳理下幾個重要的資料結構

1.時間型別

  1. typedef double ev_tstamp

2.坑爹的 EV_XX_

  1. struct ev_loop;
  2. # define EV_P struct ev_loop *loop /* a loop as sole parameter in a declaration */
  3. # define EV_P_ EV_P, /* a loop as first of multiple parameters */
  4. # define EV_A loop /* a loop as sole argument to a function call */
  5. # define EV_A_ EV_A, /* a loop as first of multiple arguments */
  6. # define EV_DEFAULT_UC ev_default_loop_uc_ () /* the default loop, if initialised, as sole arg */
  7. # define EV_DEFAULT_UC_ EV_DEFAULT_UC, /* the default loop as first of multiple arguments */
  8. # define EV_DEFAULT ev_default_loop (0) /* the default loop as sole arg */
  9. # define EV_DEFAULT_ EV_DEFAULT, /* the default loop as first of multiple arguments */

“EV_XXX” 等同於 EV_XXX,,這樣在後續的API使用中,會顯的更簡潔一些,比如針對第一個引數是struct ev_loop *loop 的回撥函式的書寫,就可以寫成 · void io_action(EV_P ev_io *io_w,int e)· 。這裡不知道作者還有沒有其他用以,這裡我不是很推薦,但是要知道,後面再看程式碼的時候才更容易理解。

3.各種watcher

首先看一個ev_watcher,這個我們可以用OO思想去理解他,他就相當於一個基類,後續的ev_io什麼的都是派生自該基類,這裡利用了編譯器的一個“潛規則”就是變數的定義順序與宣告順序一致。看過Libev的程式碼,我想在驚歎其巨集的高明之餘一定也吐槽過。

  1. typedef struct ev_watcher
  2. {
  3.     int active;
  4.     int pending;
  5.     int priority;
  6.     void *data;
  7.     void (*cb)(struct ev_loop *loop, struct ev_watcher *w, int revents);
  8. } ev_watcher

與基類配套的還有個裝監控器的List。

  1. typedef struct ev_watcher_list
  2. {
  3.     int active;
  4.     int pending;
  5.     int priority;
  6.     void *data;
  7.     void (*cb)(struct ev_loop *loop, struct ev_watcher_list *w, int revents);
  8.     struct ev_watcher_list *next;
  9. } ev_watcher_list

IO監控器

  1. typedef struct ev_io
  2. {
  3.     int active;
  4.     int pending;
  5.     int priority;
  6.     void *data;
  7.     void (*cb)(struct ev_loop *loop, struct ev_io *w, int revents);
  8.     struct ev_watcher_list *next;
  9.     int fd; /* 這裡的fd,events就是派生類的私有成員,分別表示監聽的檔案fd和觸發的事件(可讀還是可寫) */
  10.     int events;
  11. } ev_io

在這裡,通過從巨集中剝離出來後,可以看到將派生類的自有變數放在了共有部分的後面。這樣,當使用C的指標強制轉換後,一個指向 struct ev_io物件的基類 ev_watcher 的指標p就可以通過 p->active 訪問到派生類中同樣表示active的成員了。

定時器watcher

  1. typedef struct ev_watcher_time
  2. {
  3.     int active;
  4.     int pending;
  5.     int priority;
  6.     void *data;
  7.     void (*cb)(struct ev_loop *loop, struct ev_watcher_time *w, int revents);
  8.     ev_tstamp at; /* 這個at就是派生類中新的自有成員 ,表示的是at時間觸發 */
  9. } ev_watcher_time

這裡定時器事件watcher和IO的不一樣的地方在於,對於定時器會用專門的最小堆去管理。而IO和訊號等其他事件的監控器則是通過單鏈表掛起來的,因此他沒有next成員。

訊號watcher

  1. typedef struct ev_signal
  2. {
  3.     int active;
  4.     int pending;
  5.     int priority;
  6.     void *data;
  7.     void (*cb)(struct ev_loop *loop, struct ev_signal *w, int revents);
  8.     struct ev_watcher_list *next;
  9.     int signum; /* 這個signum就是派生類中新的自有成員 ,表示的是接收到的訊號,和定時器中的at類似 */
  10. } ev_signal

還有其他的事件watcher的資料結構也是和這個類似的,可以對著"ev.h"的程式碼找一下,這裡不再贅述了。最後看一個可以容納所有監控器物件的型別:

  1. union ev_any_watcher
  2. {
  3.   struct ev_watcher w;
  4.   struct ev_watcher_list wl; 
  5.   struct ev_io io;
  6.   struct ev_timer timer;
  7.   struct ev_periodic periodic;
  8.   struct ev_signal signal;
  9.   struct ev_child child;
  10.   struct ev_stat stat;
  11.   struct ev_idle idle;
  12.   struct ev_prepare prepare;
  13.   struct ev_check check;
  14.   struct ev_fork fork;
  15.   struct ev_cleanup cleanup;
  16.   struct ev_embed embed;
  17.   struct ev_async async;
  18. }

4.最重要的 ev_loop

在“ev.c”裡面可以看到這樣的定義:

  1. struct ev_loop
  2. {
  3.     ev_tstamp ev_rt_now;
  4.     #define ev_rt_now ((loop)->ev_rt_now) 
  5.     #define VAR(name,decl) decl;
  6.       #include "ev_vars.h" 
  7.     #undef VAR
  8. };
  9. #include "ev_wrap.h"

之前說過的 “ev_vars.h"和"ev_wrap.h"是為了定義一個數據結構及簡化訪問其成員的,就是說的這個 ev_loop 結構體。 

  1. #define VAR(name,decl) decl;
  2. #define VARx(type,name) VAR(name, type name)

展開就是

  1. #define VARx(type,name) type name

然後再看"ev_vars.h” ,裡面都是 型別-變數的 VARx的巨集,這樣再將其include 到結構體的定義中。這樣就可以看成該結構定義為:

  1. struct ev_loop
  2. {
  3.     ev_tstamp ev_rt_now;
  4.     ev_tstamp now_floor;
  5.     int rfeedmax;
  6.     ... .........;
  7. }

不知道作者的用意何在,目前還沒有看到這樣做的好處在哪裡。

然後 #define ev_rt_now ((loop)->ev_rt_now) 可以和後面的 “ev_warp.h"一起看。實際上就是 #define xxx ((loop)->xxx) 這樣在要用struct ev_loop 的一個例項物件loop的成員時,就可以直接寫成xxx了,這裡再聯想到之前的 EV_P EV_P_ EV_A EV_A_ ,就會發現,在Libev的內部函式中,這樣的配套就可以使程式碼簡潔不少。不過這樣也增加了第一次閱讀其的門檻。相信沒有看過Libev不說其晦澀的。

5.重要的全域性變數

default_loop_struct

在"ev.c"中有

  1. static struct ev_loop default_loop_struct

這個就是strct loop的一個例項物件,表示的是預製事件驅動器。如果在程式碼中使用的是預製事件驅動器,那麼後續的操作就都圍繞著這個資料結構展開了。

為了操作方便,還定義了指向該物件的一個全域性指標:

  1. struct ev_loop *ev_default_loop_ptr

程式碼的框架和主要的資料結構梳理出來了,還有ANFD、ANHEAP等資料結構在後面分析具體監控器是的時候在詳細介紹。後面就要跟程序序的邏輯從而瞭解其設計思想,這樣便可以深入的瞭解一款元件型的開源軟體了。

事件庫之Libev(三)

Libev設計思路

這裡我們管struct ev_loop稱作為事件迴圈驅動器,而將各種watcher稱為事件監控器。

1.分析例子中的IO事件

我們從main開始一步步走。首先執行 struct ev_loop *main_loop = ev_default_loop(0); 通過跟進程式碼可以跟到函式 ev_default_loop 裡面去,其主要邏輯,就是全域性物件指標ev_default_loop_ptr若為空,也就是不曾使用預製的驅動器時,就讓他指向全域性物件default_loop_struct,同時在本函式裡面統一用名字"loop"來表示該預製驅動器的指標。從而與函式引數為 EV_P 以及 EV_A的寫法配合。接著對該指標做 loop_init操作,即初始化預製的事件驅動器。這裡函式的呼叫就是用到了 EV_A_ 這樣的寫法進行簡化。初始化之後如果配置中Libev支援子程序,那麼通過訊號監控器實現了子程序監控器。這裡可以先不用去管他,知道這段程式碼作用即可。 這裡再Libev的函式定義的時候,會看到 “EV_THROW” 這個東西,這裡可以不用管它,他是對CPP中"try … throw"的支援,和EV_CPP(extern "C" {)這樣不同尋常的 extern “C” 一樣是一種編碼技巧。

1.1驅動器的初始化

下面看下驅動器的初始化過程中都做了哪些事情。首先最開始的一段程式碼判斷系統的clock_gettime是否支援CLOCK_REALTIME和CLOCK_MONOTONIC。這兩種時間的區別在於後者不會因為系統時間被修改而被修改,詳細解釋可以參考man page 。接著判斷環境變數對驅動器的影響,這個在官方的Manual中有提到,主要就是影響預設支援的IO複用機制。接著是一連串的初始值的賦值,開始不用瞭解其作用。在後面的分析過程中便可以知道。接著是根據系統支援的IO複用機制,對其進行初始化操作。這裡可以去"ev_epoll.c” 和"ev_select.c"中看一下。 最後是判斷如果系統需要訊號事件,那麼通過一個PIPE的IO事件來實現,這裡暫且不用管他,在理解了IO事件的實現後,自然就知道這裡他做了什麼操作。

對於"ev_epoll.c” 和"ev_select.c"中的 xxx_init 其本質是一致的,就像外掛一樣,遵循一個格式,然後可以靈活的擴充套件。對於epoll主要就是做了一個 epoll_create*的操作(epoll_create1可以支援EPOLL_CLOEXEC)。

  1. backend_mintime = 1e-3; /* epoll does sometimes return early, this is just to avoid the worst */
  2. backend_modify = epoll_modify;
  3. backend_poll = epoll_poll

這裡就可以看成是外掛的模板了,在後面會修改的時候呼叫backend_modify在poll的時候呼叫backend_poll.從而統一了操作。

  1. epoll_eventmax = 64; /* initial number of events receivable per poll */
  2. epoll_events = (struct epoll_event *)ev_malloc (sizeof (struct epoll_event) * epoll_eventmax)

這個就看做為是每個機制特有的部分。熟悉epoll的話,這個就不用說了。

對於select (Linux平臺上的)

  1. backend_mintime = 1e-6;
  2. backend_modify = select_modify;
  3. backend_poll = select_poll

這個和上面一樣,是相當於外掛介面

  1. vec_ri = ev_malloc (sizeof (fd_set)); FD_ZERO ((fd_set *)vec_ri);
  2. vec_ro = ev_malloc (sizeof (fd_set));
  3. vec_wi = ev_malloc (sizeof (fd_set)); FD_ZERO ((fd_set *)vec_wi);
  4. vec_wo = ev_malloc (sizeof (fd_set))

同樣,這個是select特有的,表示讀和寫的fd_set的vector,ri用來裝select返回後符合條件的部分。其他的如poll、kqueue、Solaris port都是類似的,可以自行閱讀。

1.2IO監控器的初始化

上面的過程執行完了ev_default_loop過程,然後到後面的ev_init(&io_w,io_action);,他不是一個函式,而是一個巨集定義:

  1. ((ev_watcher *)(void *)(ev))->active = ((ev_watcher *)(void *)(ev))->pending = 0;
  2. ev_set_priority ((ev), 0);
  3. ev_set_cb ((ev), cb_)

這裡雖然還有兩個函式的呼叫,但是很好理解,就是設定了之前介紹的基類中 “active"表示是否啟用該watcher,“pending”該監控器是否處於pending狀態,“priority"其優先順序以及觸發後執行的動作的回撥函式。

1.3 設定IO事件監控器的觸發條件

在初始化監控器後,還要設定其監控監控的條件。當該條件滿足時便觸發該監控器上註冊的觸發動作。ev_io_set(&io_w,STDIN_FILENO,EV_READ);從引數邊可以猜出他幹了什麼事情。就是設定該監控器監控標準輸入上的讀事件。該呼叫也是一個巨集定義:

  1. (ev)->fd = (fd_); (ev)->events = (events_) | EV__IOFDSET

就是設定派生類IO監控器特有的變數fd和events,表示監控那個檔案fd已經其上的可讀還是可寫事件。 
%TODO:補上EV_IOFDSET的作用

1.4註冊IO監控器到事件驅動器上

準備好了監控器後就要將其註冊到事件驅動器上,這樣就形成了一個完整的事件驅動模型。 ev_io_start(main_loop,&io_w); 。這個函式裡面會第一次見到一個一個巨集 “EV_FREQUENT_CHECK”,是對函式 “ev_verify"的呼叫,那麼ev_verify是幹什麼的呢?用文件的話“This can be used to catch bugs inside libev itself”,如果看其程式碼的話,就是去檢測Libev的內部資料結構,判斷各邊界值是否合理,不合理的時候assert掉。在生產環境下,我覺得根據性格來對待。如果覺得他消耗資源(要檢測很多東西跑很多迴圈)可以編譯的時候關掉該定義。如果需要assert,可以在編譯的時候加上選項。

然後看到 ev_start 呼叫,該函式實際上就是給驅動器的loop->activecnt增一併置loop->active為真(這裡統一用loop表示全域性物件的預製驅動器物件default_loop_struct),他們分別表示事件驅動器上正監控的監控器數目以及是否在為監控器服務。

  1. array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
  2. wlist_add (&anfds[fd].head, (WL)w)

感興趣的可以去看下Libev裡麼動態調整陣列的實現。這裡我們主要看整體邏輯。他的工作過程是先判斷陣列anfds是否還有空間再加對檔案描述符fd的監控,,沒有的話則調整陣列的記憶體大小,使其大小足以容下。

這裡要介紹下之前沒有介紹的一個數據結構,這個沒有上下文比較難理解,因此放在這裡介紹。

  1. typedef struct
  2. {
  3.   WL head;
  4.   unsigned char events; /* the events watched for */
  5.   unsigned char reify; /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
  6.   unsigned char emask; /* the epoll backend stores the actual kernel mask in here */
  7.   unsigned char unused;
  8.   unsigned int egen; /* generation counter to counter epoll bugs */
  9. } ANFD; /* 這裡去掉了對epoll的判斷和windows的IOCP*/

這裡首先只用關注一個 “head” ,他是之前說過的wather的基類連結串列。這裡一個ANFD就表示對一個檔案描述符的監控,那麼對該檔案描述的可讀還是可寫監控,監控的動作是如何定義的,就是通過這個連結串列,把對該檔案描述法的監控器都掛上去,這樣就可以通過檔案描述符找到了。而前面的說的anfds就是這個物件的陣列,下標通過檔案描述符fd進行索引。在Redis-ae那篇文章中已經討論過這樣的可以達到O(1)的索引速度而且空間佔用也是合理的。

接著的“fd_change”與“fd_reify”是呼應的。前者將fd新增到一個fdchanges的陣列中,後者則依次遍歷這個陣列中的fd上的watcher與anfds裡面對飲的watcher進行對比,判斷監控條件是否改變了,如果改變了則呼叫backend_modify也就是epoll_ctl等調整系統對該fd的監控。這個fdchanges陣列的作用就在於此,他記錄了anfds陣列中的watcher監控條件可能被修改的檔案描述符,並在適當的時候將呼叫系統的epoll_ctl或則其他檔案複用機制修改系統監控的條件。這裡我們把這兩個主要的物理結構梳理下: 
anfds的結構

總結一下注冊過程就是通過之前設定了監控條件IO watcher獲得監控的檔案描述符fd,找到其在anfds中對應的ANFD結構,將該watcher掛到該結構的head鏈上。由於對應該fd的監控條件有改動了,因此在fdchanges陣列中記錄下該fd,在後續的步驟中呼叫系統的介面修改對該fd監控的條件。

1.5 啟動事件驅動器

一切準備就緒了就可以開始啟動事情驅動器了。就是 ev_run。 其邏輯很清晰。就是

  1. do{
  2.     xxxx;
  3.     backend_poll(); 
  4.     xxxx
  5. }while(condition_is_ok)

迴圈中開始一段和fork 、 prepare相關這先直接跳過,到分析與之相關的監控事件才去看他。直接到 /* calculate blocking time */ 這裡。熟悉事件模型的話,這裡還是比較常規的。就是從定時器堆中取得最近的時間(當然這裡分析的時候沒有定時器)與loop->timeout_blocktime比較得到阻塞時間。這裡如果設定了驅動器的io_blocktime,那麼在進入到poll之前會先sleep io_blocktime時間從而等待IO或者其他要監控的事件準備。這裡進入到backend_poll中的阻塞時間是包括了io_blocktime的時間。然後進入到backend_poll中。對於epoll就是進入到epoll_wait裡面。

epoll(或者select、kqueue等)返回後,將監控中的檔案描述符fd以及其pending(滿足監控)的條件通過 fd_event做一個監控條件是否改變的判斷後到fd_event_nocheck裡面對anfds[fd]陣列中的fd上的掛的監控器依次做檢測,如果pending條件符合,便通過ev_feed_event將該監控器加入到pendings陣列中pendings[pri]上的pendings[pri][old_lenght+1]的位置上。這裡要介紹一個新的資料結構,他表示pending中的wather也就是監控條件滿足了,但是還沒有觸發動作的狀態。

點選(此處)摺疊或開啟

  1. typedef struct
  2. {
  3.   W w;
  4.   int events; /* the pending event set for the given watcher */
  5. } ANPENDING

 

這裡 W w應該知道是之前說的基類指標。pendings就是這個型別的一個二維陣列陣列。其以watcher的優先順序為一級下標。再以該優先順序上pengding的監控器數目為二級下標,對應的監控器中的pending值就是該下標加一的結果。其定義為 ANPENDING *pendings [NUMPRI]。同anfds一樣,二維陣列的第二維 ANPENDING *是一個動態調整大小的陣列。這樣操作之後。這個一系列的操作可以認為是fd_feed的後續操作,xxx_reify目的最後都是將pending的watcher加入到這個pengdings二維陣列中。後續的幾個xxx_reify也是一樣,等分析到那個型別的監控器型別時在作展開。 
這裡用個圖梳理下結構。 
pendings結構梳理圖

最後在迴圈中執行巨集EV_INVOKE_PENDING,其實是呼叫loop->invoke_cb,如果沒有自定義修改的話(一般不會修改)就是呼叫ev_invoke_pending。該函式會依次遍歷二維陣列pendings,執行pending的每一個watcher上的觸發動作回撥函式。

至此一次IO觸發過程就完成了。

2總結出Libev的設計思路

watcher要算最關鍵的資料結構了,整個邏輯都是圍繞著watcher做操作。Libev內部維護一個基類ev_wathcer和若干個特定監控器的派生類ev_xxx。在使用的時候首先生成一個特定watcher的例項。並通過該派生物件私有的成員設定其觸發條件。然後用anfds或者最小堆管理這些watchers。然後Libev通過backend_poll以及時間堆管理運算出pending的watcher。然後將他們加入到一個以優先順序為一維下標的二維陣列。在合適的時間依次呼叫這些pengding的watcher上註冊的觸發動作回撥函式,這樣便可以按優先順序先後順序實現“only-for-ordering”的優先順序模型。 
思路流程圖
 

事件庫之Libev(四)

另外兩個重要的監控器

前面通過IO監控器將Libev的整個工作流程過了一遍。中間濾過了很多與其他事件監控器相關的部分,但是整體思路以及很明晰了,只要針對其他型別的watcher看下其初始化和註冊過程以及在ev_run中的安排即可。

1.分析定時器監控器

定時器在程式中可以做固定週期tick操作,也可以做一次性的定時操作。Libev中與定時器類似的還有個週期事件watcher。其本質都是一樣的,只是在時間的計算方法上略有不同,並有他自己的一個事件管理的堆。對於定時器事件,我們按照之前說的順序從ev_init開始看起。

1.1定時器監控器的初始化

定時器初始化使用 ev_init(&timer_w,timer_action);,這個過程和之前的IO類似,主要就是設定基類的active、pending、priority以及觸發動作回撥函式cb。

1.2設定定時器監控器的觸發條件

通過 ev_timer_set(&timer_w,2,0);可以設定定時器在2秒鐘後被觸發。如果第三個引數不是0而是一個大於0的正整數n時,那麼在第一次觸發(2秒後),每隔n秒會再次觸發定時器事件。

其為一個巨集定義 do { ((ev_watcher_time *)(ev))->at = (after_); (ev)->repeat = (repeat_); } while (0) 也就是設定派生類定時器watcher的“at”為觸發事件,以及重複條件“repeat”。

1.3將定時器註冊到事件驅動器上

ev_timer_start(main_loop,&timer_w);會將定時器監控器註冊到事件驅動器上。其首先 ev_at (w) += mn_now; 得到未來的時間,這樣放到時間管理的堆“timers”中作為權重。然後通過之前說過的“ev_start”修改驅動器loop的狀態。這裡我們又看到了動態大小的陣列了。Libev的堆的記憶體管理也是通過這樣的關係的。具體這裡堆的實現,感興趣的可以仔細看下實現。這裡的操作就是將這個時間權重放到堆中合適的位置。這裡堆單元的結構為:

點選(此處)摺疊或開啟

  1. typedef struct {
  2.     ev_tstamp at;
  3.     WT w;
  4. } ANHE

 

 

其實質就是一個時刻at上掛一個放定時器watcher的list。當超時時會依次執行這些定時器watcher上的觸發回撥函式。

1.4定時器監控器的觸發

最後看下在一個事件驅動器迴圈中是如何處理定時器監控器的。這裡我們依然拋開其他的部分,只找定時器相關的看。在“/ calculate blocking time /”塊裡面,我們看到計算blocking time的時候會先:

點選(此處)摺疊或開啟

  1. if (timercnt) {
  2.     ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now;
  3.     if (waittime > to) waittime = to;
  4. }

 

如果有定時器,那麼就從定時器堆(一個最小堆)timers中取得堆頂上最小的一個時間。這樣就保證了在這個時間前可以從backend_poll中出來。出來後執行timers_reify處理將pengding的定時器。

在timers_reify中依次取最小堆的堆頂,如果其上的ANHE.at小於當前時間,表示該定時器watcher超時了,那麼將其壓入一個數組中,由於在實際執行pendings二維陣列上對應優先順序上的watcher是從尾往頭方向的,因此這裡先用一個數組依時間先後次存下到一箇中間陣列loop->rfeeds中。然後將其逆序呼叫ev_invoke_pending插入到pendings二維陣列中。這樣在執行pending事件的觸發動作的時候就可以保證,時間靠前的定時器優先執行。函式 feed_reverse和 feed_reverse_done就是將超時的定時器加入到loop->rfeeds暫存陣列以及將暫存陣列中的pending的watcher插入到pengdings陣列的操作。把pending的watcher加入到pendings陣列,後續的操作就和之前的一樣了。回依次執行相應的回撥函式。

這個過程中還判斷定時器的 w->repeat 的值,如果不為0,那麼會重置該定時器的時間,並將其壓入堆中正確的位置,這樣在指定的時間過後又會被執行。如果其為0,那麼呼叫ev_timer_stop關閉該定時器。 其首先通過clear_pending置pendings陣列中記錄的該watcher上的回撥函式為一個不執行任何動作的啞動作。

總結一下定時器就是在backend_poll之前通過定時器堆頂的超時時間,保證blocking的時間不超過最近的定時器時間,在backend_poll返回後,從定時器堆中取得超時的watcher放入到pendings二維陣列中,從而在後續處理中可以執行其上註冊的觸發動作。然後從定時器管理堆上刪除該定時器。最後呼叫和ev_start呼應的ev_stop修改驅動器loop的狀態,即loop->activecnt減少一。並將該watcher的active置零。

對於週期性的事件監控器是同樣的處理過程。只是將timers_reify換成了periodics_reify。其內部會對週期性事件監控器派生類的做類似定時器裡面是否repeat的判斷操作。判斷是否重新調整時間,或者是否重複等邏輯,這些看下程式碼比較容易理解,這裡不再贅述。·

2.分析訊號監控器

分析完了定時器的部分,再看下另一個比較常用的訊號事件的處理。Libev裡面的訊號事件和Tornado.IOLoop是一樣的,通過一個pipe的IO事件來處理。直白的說就是註冊一個雙向的pipe檔案物件,然後監控上面的讀事件,待相應的訊號到來時,就往這個pipe中寫入一個值然他的讀端的讀事件觸發,這樣就可以執行相應註冊的觸發動作回撥函數了。

我們還是從初始化-》設定觸發條件-》註冊到驅動器-》觸發過程這樣的順序介紹。

2.1訊號監控器的初始化

ev_init(&signal_w,signal_action);這個函式和上面的一樣不用說了

2.2設定訊號監控器的觸發條件

ev_signal_set(&signal_w,SIGINT);該函式設定了Libev收到SIGINT訊號是觸發註冊的觸發動作回撥函式。其操作和上面的一樣,就是設定了訊號監控器私有的(ev)->signum為標記。

2.3將訊號監控器註冊到驅動器上

這裡首先介紹一個數據結構:

點選(此處)摺疊或開啟

  1. typedef struct
  2. {
  3.   EV_ATOMIC_T pending;
  4.   EV_P;
  5.   WL head;
  6. } ANSIG;
  7. static ANSIG signals [EV_NSIG - 1]

 

EV_ATOMIC_T pending;可以認為是一個原子物件,對他的讀寫是原子的。一個表示事件驅動器的loop,以及一個watcher的連結串列。

在ev_signal_start中,通過signals陣列儲存訊號監控單元。該陣列和anfds陣列類似,只是他以訊號值為索引。這樣可以立馬找到訊號所在的位置。從 Linux 2.6.27以後,Kernel提供了signalfd來為訊號產生一個檔案描述符從而可以用檔案複用機制epoll、select等來管理訊號。Libev就是用這樣的方式來管理訊號的。 這裡的程式碼用巨集控制了。其邏輯大體是這樣的

點選(此處)摺疊或開啟

  1. #if EV_USE_SIGNALFD
  2.     res = invoke_signalfd
  3. # if EV_USE_SIGNALFD
  4. if (res is not valied)
  5. # endif
  6. {
  7.     use evpipe to instead
  8. }

 

這個是框架。其具體的實現可以參考使用signalfd和evpipe_init實現。其實質就是通過一個類似於管道的檔案描述符fd,設定對該fd的讀事件監聽,當收到訊號時通過signal註冊的回撥函式往該fd裡面寫入,使其讀事件觸發,這樣通過backend_poll返回後就可以處理ev_init為該訊號上註冊的觸發回撥函數了。

在函式evpipe_init裡面也用了一個可以學習的技巧,和上面的#if XXX if() #endif {} 一樣,處理了不支援eventfd的情況。eventfd是Kernel 2.6.22以後才支援的系統呼叫,用來建立一個事件物件實現,程序(執行緒)間的等待/通知機制。他維護了一個可以讀寫的檔案描述符,但是隻能寫入8byte的內容。但是對於我們的使用以及夠了,因為這裡主要是獲得其可讀的狀態。對於不支援eventfd的情況,則使用上面說過的,用系統的pipe呼叫產生的兩個檔案描述符分別做讀寫物件,來完成。

2.4訊號事件監控器的觸發

在上面設定訊號的pipe的IO事件是,根據使用的機制不同,其實現和觸發有點不同。對於signalfd。

點選(此處)摺疊或開啟

  1. ev_io_init (&sigfd_w, sigfdcb, sigfd, EV_READ); /* for signalfd */
  2. ev_set_priority (&sigfd_w, EV_MAXPRI);
  3. ev_io_start (EV_A_ &sigfd_w)

 

也就是註冊了sigfdcb函式。該函式:

點選(此處)摺疊或開啟

  1. ssize_t res = read (sigfd, si, sizeof (si));
  2. for (sip = si; (char *)sip < (char *)si + res; ++sip)
  3.     ev_feed_signal_event (EV_A_ sip->ssi_signo)

 

首先將pipe內容讀光,讓後續的可以pengding在該fd上。然後對該signalfd上的所有訊號弟阿勇ev_feed_signal_event吧每個訊號上的ANSIG->head上掛的watcher都用ev_feed_event加入到pendings二維陣列中。這個過程和IO的完全一樣。

而對於eventfd和pipe則是:

點選(此處)摺疊或開啟

  1. ev_init (&pipe_w, pipecb);
  2.   ev_set_priority (&pipe_w, EV_MAXPRI); 
  3.   ev_io_set (&pipe_w, evpipe [0] < 0 ? evpipe [1] : evpipe [0], EV_READ);
  4.   ev_io_start (EV_A_ &pipe_w)

 

pipe_w是驅動器自身的loop->pipe_w。併為其設定了回撥函式pipecb:

點選(此處)摺疊或開啟

  1. #if EV_USE_EVENTFD
  2.     if (evpipe [0] < 0)
  3.     {
  4.     uint64_t counter;
  5.     read (evpipe [1], &counter, sizeof (uint64_t));
  6.     }
  7.     else
  8. #endif
  9.     {
  10.         char dummy[4]; 
  11.         read (evpipe [0], &dummy, sizeof (dummy));
  12.  
  13.     }
  14. ...
  15. xxx
  16. ...
  17. for (i = EV_NSIG - 1; i--; )
  18.     if (expect_false (signals [i].pending))
  19.         ev_feed_signal_event (EV_A_ i + 1)

 

這裡將上面的技巧#if XXX if() #endif {}拓展為了#if XXX if() {} else #endif {} 。這裡和上面的操作其實是一樣的。後續操作和signalfd裡面一樣,就是讀光pipe裡面的內容,然後依次將watcher加入到pendings陣列中。


 

事件庫之Libev(五)

其他監控器

最主要的幾個監控器搞定了。其他的我覺得比較可以看的還有ev_child和ev_stat。其實和之前的三個基本原理的是一樣。

Libev中的Tips

如果將Libev當成元件去用的話。官方文件是一份很好的選擇。

如果使用Libev但又覺得它沒有提供必要的功能而要去改其程式碼。可能Libuv為我們做了一個很好的示例。Libuv之前是用Libev作為其底層事件庫。後來作者重寫了自己的一套網路庫Libuv。嚴格意義上說,Libev僅僅是一個事件模型框架,並不能算上是一個完整的網路庫,正因為如此他才提供瞭如此多的事件型別。而對於網路庫可能最重要的就是定時器、IO、以及訊號事件。當然網路還包括了socket、收發控制等內容。因此,我的感覺是可以將Libev當成一個很好的學習物件,不論是其設計思想、還是程式碼中個各種小tips、還有其對跨平臺支援的方法都是很好的示例。雖然用巨集包裹的比較嚴密,只要稍加分析,理清其思路還是比較容易的。

將Libev和之前的Redis-ae進行對比。可以發現Libev在設計思想上更完整,提供的服務也更全,但是做的檢測多了,邏輯複雜了,消耗的資源也必定比簡單的封裝更多。從這個兩個模型可以看出事件模型的框架都是:

取得一個合適的時間,用這個時間去poll。然後標記poll之後pending的檔案物件。poll出來後判斷定時器然後統一處理pending物件

這裡繪製一個整體的結構圖,不是很規範UML或者其他什麼學術的圖,只是一個幫助理解的過程:

整體結構圖