1. 程式人生 > >Nginx的定時事件的實現(timer)

Nginx的定時事件的實現(timer)

在前面的文章裡面就說到了在事件迴圈中除了要處理所有的從epoll中獲取的事件之外,還要處理一些timer事件,這篇文章就講講Nginx的timer是如何實現的。

在講Nginx的實現之前,我們可以先回顧一下linux的定時器的實現。在linux中通過每次系統定時器時鐘的中斷的中斷處理程式來設定相應的軟中斷位,該軟中斷的中斷處理程式目的就是為了掃描系統中所有掛起的定時器,如果他們已經超時的話,那麼就呼叫他們相應的函式,這樣說白了就是利用中斷來實現定時器。

而在Nginx中,timer是自己實現的,而且實現的方法完全不同,而是通過紅黑樹來維護所有的timer節點,在worker程序的每一次迴圈中都會呼叫ngx_process_events_and_timers

函式,在該函式中就會呼叫處理定時器的函式ngx_event_expire_timers,每次該函式都不斷的從紅黑樹中取出時間值最小的,檢視他們是否已經超時,然後執行他們的函式,直到取出的節點的時間沒有超時為止。其實在Nginx的執行環境中,這種實現方式可能比linux本身的實現方式更為高效。

首先我們來看Nginx的定時器的初始化,在ngx_event_process_init函式中( Ngx_event.c):

    /*初始化計時器,此處將會建立起一顆紅黑色,來維護計時器。*/    
    if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
        return NGX_ERROR;
    }

嗯,就是這個函式,那麼接下來我們來看這個函式吧,

複製程式碼

//timer的初始化
ngx_int_t
ngx_event_timer_init(ngx_log_t *log)
{
//初始化紅黑樹
    ngx_rbtree_init(&ngx_event_timer_rbtree, &ngx_event_timer_sentinel,
                    ngx_rbtree_insert_timer_value);

#if (NGX_THREADS)

    if (ngx_event_timer_mutex) {
        ngx_event_timer_mutex->log = log;
        return NGX_OK;
    }

    ngx_event_timer_mutex = ngx_mutex_init(log, 0);
    if (ngx_event_timer_mutex == NULL) {
        return NGX_ERROR;
    }

#endif
    return NGX_OK;
}

複製程式碼

其實該函式非常的簡單,說白了就是直接呼叫ngx_rbtree_init函式來初始化一顆紅黑樹而已(紅黑樹是Nginx自己實現的)。

接下來來看如何定義一個timer事件:

複製程式碼

//將一個事件加入到紅黑樹當中,它的超時未timer時間
static ngx_inline void
ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer)   //timer說白了就是一個int的值,表示超時的事件,用於表示紅黑樹節點的key
{
    ngx_msec_t      key;
    ngx_msec_int_t  diff;

    key = ngx_current_msec + timer;  //表示該event的超時時間,為當前時間的值加上超時變數

    if (ev->timer_set) {

        /*
         * Use a previous timer value if difference between it and a new
         * value is less than NGX_TIMER_LAZY_DELAY milliseconds: this allows
         * to minimize the rbtree operations for fast connections.
         */

        diff = (ngx_msec_int_t) (key - ev->timer.key);

        if (ngx_abs(diff) < NGX_TIMER_LAZY_DELAY) {
            ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                           "event timer: %d, old: %M, new: %M",
                            ngx_event_ident(ev->data), ev->timer.key, key);
            return;
        }

        ngx_del_timer(ev);
    }

    ev->timer.key = key;

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "event timer add: %d: %M:%M",
                    ngx_event_ident(ev->data), timer, ev->timer.key);

    ngx_mutex_lock(ngx_event_timer_mutex);

    ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer);    //事件的timer域插入到紅黑樹當中

    ngx_mutex_unlock(ngx_event_timer_mutex);

    ev->timer_set = 1;
}

複製程式碼

該函式用於將事件加入到紅黑樹中,首先設定超時時間,也就是當前的時間加上傳進來的超時時間。然後再將timer域加入到紅黑樹中就可以了,這裡timer域的定義說白了是一棵紅黑樹節點。然後還有一個函式ngx_event_del_timer,它用於將某個事件從紅黑樹當中移除。

接下來我們來看看Nginx到底是如何處理定時事件的。在ngx_process_events_and_timers函式中首先會有如下的程式碼

複製程式碼

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();  //找到當前紅黑樹當中的最小的事件,傳遞給epoll的wait,保證epoll可以該時間內可以超時,可以使得超時的事件得到處理
        flags = NGX_UPDATE_TIME;

#if (NGX_THREADS)

        if (timer == NGX_TIMER_INFINITE || timer > 500) {
            timer = 500;
        }

#endif
    }

複製程式碼

該段程式碼主要是呼叫ngx_event_find_timer函式獲取當前紅黑樹中超時時間最小的一個節點,該函式的定義如下:

複製程式碼

//用於返回當前紅黑樹當中的超時時間,說白了就是返回紅黑樹中最左邊的元素的超時時間
ngx_msec_t
ngx_event_find_timer(void)
{
    ngx_msec_int_t      timer;
    ngx_rbtree_node_t  *node, *root, *sentinel;

    if (ngx_event_timer_rbtree.root == &ngx_event_timer_sentinel) {
        return NGX_TIMER_INFINITE;
    }

    ngx_mutex_lock(ngx_event_timer_mutex);

    root = ngx_event_timer_rbtree.root;
    sentinel = ngx_event_timer_rbtree.sentinel;

    node = ngx_rbtree_min(root, sentinel);  //找到紅黑樹中key最小的節點

    ngx_mutex_unlock(ngx_event_timer_mutex);

    timer = (ngx_msec_int_t) (node->key - ngx_current_msec);

    return (ngx_msec_t) (timer > 0 ? timer : 0);
}

複製程式碼

該函式其實還是很簡單的,說白了就是找到紅黑樹中key的值最小的節點,然後返回它還剩下的超時時間就可以了。在ngx_process_events_and_timers函式中之所以要獲取這個值,是為了為epoll的wait提供一個超時時間,以防止epoll的wait阻塞時間太長,影響了timer的處理。接著或有如下的程式碼:

複製程式碼

    /*delta是上文對epoll wait事件的耗時統計,存在毫秒級的耗時 
        就對所有事件的timer進行檢查,如果time out就從timer rbtree中, 
        刪除到期的timer,同時呼叫相應事件的handler函式完成處理。 
      */  
    if (delta) {
        ngx_event_expire_timers();
    }

複製程式碼

在ngx_process_events_and_timers函式中呼叫ngx_event_expire_timers函式來處理所有的定時事件。嗯,這裡可以看到一個比較奇怪的現象,幹嘛要判斷delta的值呢,嗯,其實這個值是統計處理其餘事件的用時,如果用時超過了毫秒,那麼才會真正的呼叫ngx_event_expire_timers函式來處理所有的定時,否則不會,因為距離上次迴圈間隔太小,完全沒有必要。嗯,效能的提升也就是從這一點一滴中獲取的吧。接下來來看ngx_event_expire_timers函式吧:

複製程式碼

//處理紅黑樹中所有超時的事件
void
ngx_event_expire_timers(void)
{
    ngx_event_t        *ev;
    ngx_rbtree_node_t  *node, *root, *sentinel;

    sentinel = ngx_event_timer_rbtree.sentinel;

//死迴圈,找到所有的超時的timer,然後處理他們
    for ( ;; ) {

        ngx_mutex_lock(ngx_event_timer_mutex);

        root = ngx_event_timer_rbtree.root;

        if (root == sentinel) {
            return;
        }

        node = ngx_rbtree_min(root, sentinel);   //獲取key最小的節點

        /* node->key <= ngx_current_time */

        if ((ngx_msec_int_t) (node->key - ngx_current_msec) <= 0) {   //判斷該節點是否超時,如果超時的話,就執行處理函式,否則就可以跳出迴圈了
            //通過偏移來獲取當前timer所在的event
            ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));

            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                           "event timer del: %d: %M",
                           ngx_event_ident(ev->data), ev->timer.key);
//將當前timer移除
            ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);

            ngx_mutex_unlock(ngx_event_timer_mutex);

            ev->timer_set = 0;

            ev->timedout = 1;

            ev->handler(ev);   //呼叫event的handler來處理這個事件

            continue;
        }

        break;
    }

    ngx_mutex_unlock(ngx_event_timer_mutex);
}

複製程式碼

該函式其實很簡單的,一看就看明白了,說白了就是一個死迴圈,不斷的從紅黑樹中獲取key最小的元素,如果超時的話,就通過偏移量來獲取其所在的event,然後執行handler,直到找到一個沒有超時的timer為止,跳出迴圈。

嗯,到這裡timer就講完了。