1. 程式人生 > >Libevent原始碼分析-----訊號event的處理

Libevent原始碼分析-----訊號event的處理

訊號event的工作原理:

        前面講解了Libevent如何監聽一個IO事件,現在來講一下Libevent如何監聽訊號。Libevent對於訊號的處理是採用統一事件源的方式。簡單地說,就是把訊號也轉換成IO事件,整合到Libevent中。

        統一事件源的工作原理如下:假如使用者要監聽SIGINT訊號,那麼在實現的內部就對SIGINT這個訊號設定捕抓函式。此外,在實現的內部還要建立一條管道(pipe),並把這個管道加入到多路IO複用函式中。當SIGINT這個訊號發生後,捕抓函式將會被呼叫。而這個捕抓函式的工作就是往管道寫入一個字元(這個字元往往等於所捕抓到訊號的訊號值)。此時,這個管道就變成是可讀的了,多路IO複用函式能檢測到這個管道變成可讀的了。換言之,多路IO複用函式檢測到SIGINT訊號的發生,也就完成了對訊號的監聽工作。這個過程如下圖所示:

        

        瞭解完統一事件源的工作原理,現在來看一下Libevent具體的實現細節。按照上述的介紹,內部實現的工作有:

  1. 建立一個管道(Libevent實際上使用的是socketpair)
  2. 為這個socketpair的一個讀端建立一個event,並將之加入到多路IO複用函式的監聽之中
  3. 設定訊號捕抓函式
  4. 有訊號發生,就往socketpair寫入一個位元組

        統一事件源能夠工作的一個原因是:多路IO複用函式都是可中斷的。即處理完訊號後,會從多路IO複用函式中退出,並將errno賦值為EINTR。有些OS的某些系統呼叫,比如Linux的read,即使被訊號終端了,還是會自啟動的。即不會從read函式中退出來。

用於訊號event的結構體和變數:

        event_base為訊號監聽提供了的成員如下:

//event-internal.h檔案

struct event_base {


const struct eventop *evsigsel;

struct evsig_info sig;


...

struct event_signal_map sigmap;

...

};



//evsignal-internal.h檔案

struct evsig_info {

//用於監聽socketpair讀端的event. ev_signal_pair[1]為讀端

struct event ev_signal;

//socketpair

evutil_socket_t ev_signal_pair[2];

//用來標誌是否已經將ev_signal這個event加入到event_base中了

int ev_signal_added;

//使用者一共要監聽多少個訊號

int ev_n_signals_added;


//陣列。使用者可能已經設定過某個訊號的訊號捕抓函式。但

//Libevent還是要為這個訊號設定另外一個訊號捕抓函式,

//此時,就要儲存使用者之前設定的訊號捕抓函式。當用戶不要

//監聽這個訊號時,就能夠恢復使用者之前的捕抓函式。

//因為是有多個訊號,所以得用一個數組儲存。

#ifdef _EVENT_HAVE_SIGACTION

struct sigaction **sh_old;

#else//儲存的是捕抓函式的函式指標,又因為是陣列。所以是二級指標

ev_sighandler_t **sh_old;

#endif

/* Size of sh_old. */

int sh_old_max; //陣列的長度

};

        在上面程式碼中,已經可以看到用於socketpair的ev_signal_pair變數,還有struct event結構體變數ev_signal。那麼Libevent是在何時建立socketpair以及如何將socketpair和ev_signal相關聯的呢?

初始化:

        在前面的博文《跨平臺Reactor介面的實現》中,介紹了Libevent是如何選擇一個多路IO複用函式的。在選定一個多路IO複用函式後,就會呼叫下面一行程式碼。

base->evbase = base->evsel->init(base);

        這是初始化程式碼函式。下面給出poll的init函式。

//poll.c檔案

static void *

poll_init(struct event_base *base)

{

struct pollop *pollop;


if (!(pollop = mm_calloc(1, sizeof(struct pollop))))

return (NULL);


evsig_init(base);


return (pollop);

}

        可以看到,其呼叫了evsig_init函式。而正是這個evsig_init函式完成了建立socketpair並將socketpair的一個讀端與ev_signal相關聯。

//signal.c檔案

int

evsig_init(struct event_base *base)

{

//建立一個socketpair

if (evutil_socketpair(

AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair) == -1) {

#ifdef WIN32

/* Make this nonfatal on win32, where sometimes people

have localhost firewalled. */

event_sock_warn(-1, "%s: socketpair", __func__);

#else

event_sock_err(1, -1, "%s: socketpair", __func__);

#endif

return -1;

}


//子程序不能訪問該socketpair

evutil_make_socket_closeonexec(base->sig.ev_signal_pair[0]);

evutil_make_socket_closeonexec(base->sig.ev_signal_pair[1]);

base->sig.sh_old = NULL;

base->sig.sh_old_max = 0;



evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]);

evutil_make_socket_nonblocking(base->sig.ev_signal_pair[1]);


//將ev_signal_pair[1]與ev_signal這個event相關聯。ev_signal_pair[1]為讀端

//該函式的作用等同於event_new。實際上event_new內部也是呼叫event_assign函式完成工作的

event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[1],

EV_READ | EV_PERSIST, evsig_cb, base);


//標明是內部使用的

base->sig.ev_signal.ev_flags |= EVLIST_INTERNAL;

//Libevent中,event是有優先順序的。前一篇博文已經說到這一點

event_priority_set(&base->sig.ev_signal, 0); //最高優先順序


base->evsigsel = &evsigops;


return 0;

}

        socketpair的兩個端都呼叫evutil_make_socket_closeonexec,因為不能讓子程序可以訪問的這個socketpair。因為子程序的訪問可能會出現擾亂。比如,子程序往socketpair傳送資訊,使得父程序的多路IO複用函式誤以為訊號發生了;父程序確實發生了訊號,也往socketpair傳送了一個位元組,但卻被子程序接收了這個位元組。父程序沒有監聽到可讀。

        在Windows中,並沒有直接的可以使用的socketpair API。此時,Libevent就自己實現了一個socketpair。具體可以參考《通用型別和函式》。

        在函式的最後可以看到event_base的一個成員evsignal被賦值。evsignal是一個IO複用結構體,而evsigops是專門用於訊號處理的IO複用結構體變數。定義如下:

//signal.c檔案

static const struct eventop evsigops = {

"signal",

NULL,

evsig_add,

evsig_del,

NULL,

NULL,

0, 0, 0

};

        該結構體只有evsig_add和evsig_del這兩個函式指標。實際在工作時有這兩個函式就足夠了。

將訊號event加入到event_base:

        前面的程式碼已經完成了“建立socketpair並將socketpair的一個讀端於ev_signal相關聯”。接下來看其他的工作。假如要對一個綁定了某個訊號的event呼叫event_add函式,那麼在event_add的內部會呼叫event_add_internal函式。而event_add_internal函式又會呼叫evmap_signal_add函式。如果看了之前的博文,應該對這個流程不陌生。下面看看evmap_signal_add函式:

//evmap.c檔案

int

evmap_signal_add(struct event_base *base, int sig, struct event *ev)

{

//注意這裡呼叫的是base的evsigsel變數。而不是evsel。

const struct eventop *evsel = base->evsigsel;

struct event_signal_map *map = &base->sigmap;


...


if (TAILQ_EMPTY(&ctx->events)) {

//實際呼叫的是evsig_add函式

if (evsel->add(base, ev->ev_fd, 0, EV_SIGNAL, NULL)

== -1)

return (-1);

}


return (1);

}

        上面函式的內部呼叫了IO複用結構體的add函式指標,即呼叫了evsig_add。現在我們深入evsig_add函式。

/signal.c檔案

static int

evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)

{

struct evsig_info *sig = &base->sig;

(void)p;


//NSIG是訊號的個數。定義在系統標頭檔案中

EVUTIL_ASSERT(evsignal >= 0 && evsignal < NSIG);


/* catch signals if they happen quickly */

//加鎖保護。但實際其鎖變數為NULL。所以並沒有保護。應該會在以後的版本有所改正

//在2.1.4-alpha版本中,就已經改進了這個問題。為鎖變數分配了鎖

EVSIGBASE_LOCK();

//如果有多個event_base,那麼捕抓訊號這個工作只能由其中一個完成。

if (evsig_base != base && evsig_base_n_signals_added) {

event_warnx("Added a signal to event base %p with signals "

"already added to event_base %p. Only one can have "

"signals at a time with the %s backend. The base with "

"the most recently added signal or the most recent "

"event_base_loop() call gets preference; do "

"not rely on this behavior in future Libevent versions.",

base, evsig_base, base->evsel->name);

}

evsig_base = base;

evsig_base_n_signals_added = ++sig->ev_n_signals_added;

evsig_base_fd = base->sig.ev_signal_pair[0]; //寫端。0是寫端,這確實與之前所接觸到的有所不同

EVSIGBASE_UNLOCK();


//設定Libevent的訊號捕抓函式

if (_evsig_set_handler(base, (int)evsignal, evsig_handler) == -1) {

goto err;

}


//event_base第一次監聽訊號事件。要新增ev_signal到event_base中

if (!sig->ev_signal_added) {

//注意,本函式的呼叫路徑為event_add->event_add_internal->evmap_signal_map->evsig_add

//所以這裡是遞迴呼叫event_add函式。而event_add函式是會加鎖的。所以需要鎖為遞迴鎖

if (event_add(&sig->ev_signal, NULL))//新增一個內部的event

goto err;

sig->ev_signal_added = 1;

}


return (0);


err:

EVSIGBASE_LOCK();

--evsig_base_n_signals_added;

--sig->ev_n_signals_added;

EVSIGBASE_UNLOCK();

return (-1);

}

        從後面的那個if語句可以得知,當sig->ev_signal_added變數為0時(即使用者第一次監聽一個訊號),就會將ev_signal這個event加入到event_base中。從前面的“統一事件源”可以得知,這個ev_signal的作用就是通知event_base,有訊號發生了。只需一個event即可完成工作,即使使用者要監聽多個不同的訊號,因為這個event已經和socketpair的讀端相關聯了。如果要監聽多個訊號,那麼就在訊號處理函式中往這個socketpair寫入不同的值即可。event_base能監聽到可讀,並可以從讀到的內容可以判斷是哪個訊號發生了。

       從程式碼中也可得知,Libevent並不會為每一個訊號監聽建立一個event。它只會建立一個全域性的專門用於監聽訊號的event。這個也是“統一事件源”的工作原理。

設定訊號捕抓函式:

       evsig_add函式還呼叫了_evsig_set_handler函式完成設定Libevent內部的訊號捕抓函式。

//signal.c檔案

typedef void (*ev_sighandler_t)(int);


//evsignal是訊號值,handler是訊號捕抓函式

int

_evsig_set_handler(struct event_base *base,

int evsignal, void (__cdecl *handler)(int))

{

//如果有sigaction就優先使用之

#ifdef _EVENT_HAVE_SIGACTION

struct sigaction sa;

#else

ev_sighandler_t sh;

#endif

struct evsig_info *sig = &base->sig;

void *p;



//陣列的一個元素就存放一個訊號。訊號值等於其下標

if (evsignal >= sig->sh_old_max) { //不夠記憶體。重新分配

int new_max = evsignal + 1;

event_debug(("%s: evsignal (%d) >= sh_old_max (%d), resizing",

__func__, evsignal, sig->sh_old_max));

p = mm_realloc(sig->sh_old, new_max * sizeof(*sig->sh_old));

if (p == NULL) {

event_warn("realloc");

return (-1);

}


memset((char *)p + sig->sh_old_max * sizeof(*sig->sh_old),

0, (new_max - sig->sh_old_max) * sizeof(*sig->sh_old));


sig->sh_old_max = new_max;

sig->sh_old = p;

}


//注意sh_old是一個二級指標。元素是一個一級指標。為這個一級指標分配記憶體

/* allocate space for previous handler out of dynamic array */

sig->sh_old[evsignal] = mm_malloc(sizeof *sig->sh_old[evsignal]);

if (sig->sh_old[evsignal] == NULL) {

event_warn("malloc");

return (-1);

}


/* save previous handler and setup new handler */

#ifdef _EVENT_HAVE_SIGACTION

memset(&sa, 0, sizeof(sa));

sa.sa_handler = handler;

sa.sa_flags |= SA_RESTART;

sigfillset(&sa.sa_mask);


//設定訊號處理函式

if (sigaction(evsignal, &sa, sig->sh_old[evsignal]) == -1) {

event_warn("sigaction");

mm_free(sig->sh_old[evsignal]);

sig->sh_old[evsignal] = NULL;

return (-1);

}

#else

//設定訊號處理函式

if ((sh = signal(evsignal, handler)) == SIG_ERR) {

event_warn("signal");

mm_free(sig->sh_old[evsignal]);

sig->sh_old[evsignal] = NULL;

return (-1);

}

//儲存之前的訊號捕抓函式。當用戶event_del這個訊號監聽後,就可以恢復了

*sig->sh_old[evsignal] = sh;

#endif


return (0);

}

        如果看過《UNIX環境高階程式設計》訊號那章的話,上面這段程式碼很容易看懂。這裡就不講了。

        這裡我們做一個猜測:當我們對某個訊號進行event_new和event_add後,就不應該再次設定該訊號的訊號捕抓函式。否則event_base將無法監聽到訊號的發生。下面程式碼驗證這猜測。

#include<unistd.h>

#include<stdio.h>

#include<signal.h>

#include<event.h>



void sig_cb(int fd, short events, void *arg)

{

printf("in the sig_cb\n");

}


void signal_handle(int sig)

{

printf("catch the sig %d\n", sig);

}


int main()

{


struct event_base *base = event_base_new();


struct event *ev = evsignal_new(base, SIGUSR1, sig_cb, NULL);

event_add(ev, NULL);


signal(SIGUSR1, signal_handle);


printf("pid = %d\n", getpid());


printf("begin\n");

event_base_dispatch(base);

printf("end\n");


return 0;

}

        執行上面程式碼, 通過在外部給這個程序發生訊號的方式。可以看到,event_base確實無法監聽到訊號了。所有訊號都被signal_handle捕抓了。

捕抓訊號:

        前面的程式碼中有兩個函式並沒有講,分別是訊號捕抓函式evsig_handler和呼叫event_assign時的訊號回撥函式evsig_cb。

//signal.c檔案

static void __cdecl

evsig_handler(int sig)

{

...

ev_uint8_t msg;


if (evsig_base == NULL) {

event_warnx(

"%s: received signal %d, but have no base configured",

__func__, sig);

return;

}


#ifndef _EVENT_HAVE_SIGACTION

//這主要是為了應對舊時代的訊號不可靠

//現在的OS並不會出現這個問題

signal(sig, evsig_handler);

#endif


/* Wake up our notification mechanism */

msg = sig;

send(evsig_base_fd, (char*)&msg, 1, 0); //向socketpair寫入一個位元組


...

}

        從evsig_handler函式的實現可以看到,實現得相當簡單。只是將訊號對應的值寫入到socketpair中。evsig_base_fd是socketpair的寫端,這是一個全域性變數,在evsig_add函式中被賦值的。

        從“統一事件源”的工作原理來看,現在已經完成了對訊號的捕抓,已經將該訊號的當作IO事件寫入到socketpair中了。現在event_base應該已經監聽到socketpair可讀了,並且會為呼叫回撥函式evsig_cb了。下面看看evsig_cb函式。

//signal.c檔案

static void

evsig_cb(evutil_socket_t fd, short what, void *arg)

{

static char signals[1024];

ev_ssize_t n;

int i;


//NSIG是訊號的個數

int ncaught[NSIG];

struct event_base *base;


base = arg;


memset(&ncaught, 0, sizeof(ncaught));


while (1) {

//讀取socketpair中的資料。從中可以知道有哪些訊號發生了

//已經socketpair的讀端已經設定為非阻塞的。所以不會被阻塞在

//recv函式中。這個迴圈要把socketpair的所有資料都讀取出來

n = recv(fd, signals, sizeof(signals), 0);

if (n == -1) {

int err = evutil_socket_geterror(fd);

if (! EVUTIL_ERR_RW_RETRIABLE(err))

event_sock_err(1, fd, "%s: recv", __func__);

break;

} else if (n == 0) {

/* XXX warn? */

break;

}


//遍歷資料陣列,把每一個位元組當作一個訊號

for (i = 0; i < n; ++i) {

ev_uint8_t sig = signals[i];

if (sig < NSIG)

ncaught[sig]++; //該訊號發生的次數

}

}


EVBASE_ACQUIRE_LOCK(base, th_base_lock);

for (i = 0; i < NSIG; ++i) {

if (ncaught[i]) //有訊號發生就為之呼叫evmap_signal_active

evmap_signal_active(base, i, ncaught[i]);

}

EVBASE_RELEASE_LOCK(base, th_base_lock);

}

        該回調函式的作用是讀取socketpair的所有資料,並將資料當作訊號,再根據訊號值呼叫evmap_signal_active。

        有一點要注意,evsig_cb這個回撥函式並不是使用者為監聽一個訊號呼叫event_new時設定的使用者回撥函式,而是Libevent內部為了處理訊號而設定的內部回撥函式。累!!

啟用訊號event:

        雖然如此,但是現在的情況是:當有訊號發生時,就會呼叫evmap_signal_active函式。

//event-internal.h檔案

#define ev_signal_next _ev.ev_signal.ev_signal_next


#define ev_ncalls _ev.ev_signal.ev_ncalls

#define ev_pncalls _ev.ev_signal.ev_pncalls


//evmap.c檔案

void //後兩個引數分別是訊號值和發生的次數。

evmap_signal_active(struct event_base *base, evutil_socket_t sig, int ncalls)

{

struct event_signal_map *map = &base->sigmap;

struct evmap_signal *ctx;

struct event *ev;


//通過這個fd找到對應的TAILQ_HEAD

GET_SIGNAL_SLOT(ctx, map, sig, evmap_signal);


//遍歷該fd的佇列

TAILQ_FOREACH(ev, &ctx->events, ev_signal_next)

event_active_nolock(ev, EV_SIGNAL, ncalls);

}



//event.c檔案

void

event_active_nolock(struct event *ev, int res, short ncalls)

{

struct event_base *base;



base = ev->ev_base;

ev->ev_res = res;


//這將停止處理低優先順序的event。一路回退到event_base_loop中。

if (ev->ev_pri < base->event_running_priority)

base->event_continue = 1;


if (ev->ev_events & EV_SIGNAL) {

#ifndef _EVENT_DISABLE_THREAD_SUPPORT

if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {

++base->current_event_waiters;

//由於此時是主執行緒執行,所以並不會進行這個判斷裡面

EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);

}

#endif

ev->ev_ncalls = ncalls;

ev->ev_pncalls = NULL;

}


//插入到啟用佇列中.插入到隊尾

event_queue_insert(base, ev, EVLIST_ACTIVE);


}

        通過evmap_signal_active、event_active_nolock和event_queue_insert這三個函式的呼叫後,就可以把一個event插入到啟用隊列了。

        由於這些函式的執行本身就是在Libevent處理event的回撥函式之中的(Libevent正在處理內部的訊號處理event)。所以並不需要從event_base_loop裡的while迴圈裡面再次執行一次evsel->dispatch(),才能執行到這次訊號event。即無需等到下一次處理啟用佇列,就可以執行該訊號event了。分析如下:

        首先要明確,現在執行上面三個函式相當於在執行event的回撥函式。所以其是執行在event_process_active函式之中的。為什麼是在這裡,可以參考《Libevent工作流程探究》一文。

//event.c檔案

static int

event_process_active(struct event_base *base)

{

struct event_list *activeq = NULL;

int i, c = 0;


//從高優先順序到低優先順序遍歷優先順序陣列

for (i = 0; i < base->nactivequeues; ++i) {

//對於特定的優先順序,遍歷該優先順序的所有啟用event

if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {

base->event_running_priority = i;

activeq = &base->activequeues[i];

c = event_process_active_single_queue(base, activeq);

}

}


return c;

}


static int

event_process_active_single_queue(struct event_base *base,

struct event_list *activeq)

{

struct event *ev;


//遍歷該優先順序的所有event,並處理之

for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {


...//開始處理這個event。會呼叫event的回撥函式

}


}

        從上面的程式碼可以看到,Libevent在處理內部的那個訊號處理event的回撥函式時,其實是在event_process_active_single_queue的一個迴圈裡面。因為Libevent內部的訊號處理event的優先順序最高優先順序,並且在前面的將使用者訊號event插入到佇列(即event_queue_insert),在插入到佇列的尾部。所以無論使用者的這個訊號event的優先順序是多少,都是在Libevent的內部訊號處理event的後面。所以在遍歷上面兩個函式的裡外兩個迴圈時,肯定會執行到使用者的訊號event。

執行已啟用訊號event:

        現在看看Libevent是怎麼處理已啟用的訊號event的。

//event.c檔案

static inline void

event_signal_closure(struct event_base *base, struct event *ev)

{

short ncalls;

int should_break;


/* Allows deletes to work */

ncalls = ev->ev_ncalls;

if (ncalls != 0)

ev->ev_pncalls = &ncalls;


//while迴圈裡面會呼叫使用者設定的回撥函式。該回調函式可能會執行很久

//所以要解鎖先.

EVBASE_RELEASE_LOCK(base, th_base_lock);

//如果該訊號發生了多次,那麼就需要多次執行回撥函式

while (ncalls) {

ncalls--;

ev->ev_ncalls = ncalls;

if (ncalls == 0)

ev->ev_pncalls = NULL;

(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);


EVBASE_ACQUIRE_LOCK(base, th_base_lock);

//其他執行緒呼叫event_base_loopbreak函式中斷之

should_break = base->event_break;

EVBASE_RELEASE_LOCK(base, th_base_lock);


if (should_break) {

if (ncalls != 0)

ev->ev_pncalls = NULL;

return;

}

}

}

        可以看到,如果對應的訊號發生了多次,那麼該訊號event的回撥函式將被執行多次。