1. 程式人生 > >《深入理解Nginx 模組開發與架構解析》筆記之epoll事件模組

《深入理解Nginx 模組開發與架構解析》筆記之epoll事件模組

epoll

Linux epoll

Linux epoll工作模式

  1. 水平觸發LT
  2. 邊緣觸發ET

      前者是預設的,可以處理阻塞和非阻塞套接字,後者只能處理非阻塞套接字。

Linux epoll原理



      如上圖所示,Linux中的epoll簡單來說就是這樣,用一個紅黑樹來存放所有epoll關心的事件,用一個雙向連結串列來存放當前被啟用的事件。

如何使用epoll

  • epoll_create系統呼叫
int epoll_create(int size);

      epoll_create返回一個控制代碼,之後epoll的使用都靠這個控制代碼。size是告訴大致處理的事件數目,在新的Linux核心中這個引數沒有意義。

  • epoll_ctl系統呼叫
int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event);

      向epoll物件新增、修改或刪除事件。具體由第二個引數而定。epfd是epoll_create返回的控制代碼,op引數:

op值 意義
EPOLL_CTL_ADD 新增新的事件到epoll中
EPOLL_CTL_MOD 修改epoll中的事件
EPOLL_CTL_DEL 刪除epoll中的事件

      第三個引數fd是待監測的連線套接字,第四個引數結構:

struct epoll_event{  
    __uint32_t      events;
    epoll_data_t    data;
};
events取值 意義
EPOLLIN 連線上有資料可讀
EPOLLOUT 連線上可以寫入資料
EPOLLRDHUP TCP連線的遠端關閉或半關閉
EPOLLPRI 連線上有緊急資料需要讀
EPOLLERR 連線發生錯誤
EPOLLHUP 連線被掛起
EPOLLET 觸發模式為邊緣觸發(ET)
EPOLLONESHOT 事件只處理一次,下次需要重新加入epoll

      data成員是一個epoll_data聯合體:

typedef union epoll_data{
    void        *ptr;
    int         fd;
    uint32_t    u32;
    uint64_t    u64;
} epoll_data_t;

      在ngx_epoll_module模組中只使用了ptr成員,指向ngx_connection_t。

  • epoll_wait系統呼叫
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeou);

      收集epoll監控的事件中已經發生的事件,如果沒有任何一個發生,則最多等待timeout毫秒後返回。
epfd是epoll描述控制代碼。
events是分配好的epoll_event結構體陣列,epoll會把發生的事件複製到events中。
maxevents是本次可以返回的最大事件數目。
timeout是等到的毫秒數。

Nginx_epoll_module

ngx_epoll_commands配置項陣列

static ngx_command_t  ngx_epoll_commands[] = { 
    /* 這個配置項表示呼叫一次epoll_wait最多可以返回的事件數 */
    { ngx_string("epoll_events"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_num_slot,
      0,  
      offsetof(ngx_epoll_conf_t, events),
      NULL },

    /* 指明在開啟非同步I/O且使用io_setup系統呼叫初始化非同步I/O上下文環境時,初試分配I/O事件個數 */
    { ngx_string("worker_aio_requests"),
      NGX_EVENT_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_num_slot,
      0,  
      offsetof(ngx_epoll_conf_t, aio_requests),
      NULL },

      ngx_null_command
};

ngx_epoll_conf_t配置項結構體

typedef struct {
    ngx_uint_t  events;
    ngx_uint_t  aio_requests;
} ngx_epoll_conf_t;

events是呼叫epoll_wait是傳入的maxevents;epoll_wait的第二個引數events陣列的大小也是有它來決定的。

ngx_epoll_module_ctx實現事件模組介面

static ngx_str_t      epoll_name = ngx_string("epoll");

ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
        NULL,                            /* process the changes */
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};
ngx_epoll_init
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;

    /* 獲取create_conf中生成的ngx_epoll_conf_t結構體,它已經被賦予解析完
    配置檔案後的值 */
    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    if (ep == -1) {
        /* 呼叫epoll_create建立epoll物件,這個size引數無關緊要 */
        ep = epoll_create(cycle->connection_n / 2); 

        if (ep == -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }   

#if (NGX_HAVE_FILE_AIO)
        /* 非同步I/O */
        ngx_epoll_aio_init(cycle, epcf);

#endif
    }   

    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }   
        /* 分配event_list陣列,大小為配置項epoll_events的引數events */
        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }

    //陣列個數
    nevents = epcf->events;

    //指明讀寫I/O的函式
    ngx_io = ngx_os_io;

    //設定ngx_event_actions介面
    ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
    /* NGX_USR_CLEAR_EVENT巨集設定ET模式epoll */
    ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
    ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
                      |NGX_USE_GREEDY_EVENT
                      |NGX_USE_EPOLL_EVENT;

    return NGX_OK;
}
ngx_epoll_add_event

      從上面epoll實現事件介面可以看出,enable和add介面都是使用ngx_epoll_add_event函式的。

static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    int                  op;
    uint32_t             events, prev;
    ngx_event_t         *e;
    ngx_connection_t    *c; 
    struct epoll_event   ee; 

    /* 事件的data成員存放著ngx_connection_t連線 */
    c = ev->data;

    /* 根據event引數確定是讀事件還是寫事件 */
    events = (uint32_t) event;

    if (event == NGX_READ_EVENT) {
        e = c->write;
        prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
        events = EPOLLIN|EPOLLRDHUP;
#endif

    } else {
        e = c->read;
        prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
        events = EPOLLOUT;
#endif
    }   

    /* 根據active標誌位確定事件是否活躍,以此決定是修改還是新增事件 */
    if (e->active) {
        op = EPOLL_CTL_MOD;
        events |= prev;

    } else {
        op = EPOLL_CTL_ADD;
    }   

    ee.events = events | (uint32_t) flags;

    /* ptr成員儲存的是ngx_connection_t連線 */
    ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "epoll add event: fd:%d op:%d ev:%08XD",
                   c->fd, op, ee.events);

    /* 呼叫epoll_ctl函式向epoll中新增事件或者在epoll中修改事件 */
    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }   

    /* 將事件置位活躍 */
    ev->active = 1;
#if 0
    ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

    return NGX_OK;
}
ngx_epoll_del_event

      ngx_epoll_del_event和ngx_epoll_add_event是一樣的。

static ngx_int_t
ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    int                  op;
    uint32_t             prev;
    ngx_event_t         *e;
    ngx_connection_t    *c;
    struct epoll_event   ee;

    /*
     * when the file descriptor is closed, the epoll automatically deletes
     * it from its queue, so we do not need to delete explicitly the event
     * before the closing the file descriptor
     */

    /* 設定事件關閉 */
    if (flags & NGX_CLOSE_EVENT) {
        ev->active = 0;
        return NGX_OK;
    }
    //獲得ngx_connection_t連線
    c = ev->data;

    /* 判斷event引數的意圖 */
    if (event == NGX_READ_EVENT) {
        e = c->write;
        prev = EPOLLOUT;

    } else {
        e = c->read;
        prev = EPOLLIN|EPOLLRDHUP;
    }

    /* 判斷事件是否是活躍的 */
    if (e->active) {
        op = EPOLL_CTL_MOD;
        ee.events = prev | (uint32_t) flags;
        ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

    } else {
        op = EPOLL_CTL_DEL;
        ee.events = 0;
        ee.data.ptr = NULL;
    }

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "epoll del event: fd:%d op:%d ev:%08XD",
                   c->fd, op, ee.events);

    //呼叫epol_ctl函式
    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }

    //將事件設定為非活躍
    ev->active = 0;

    return NGX_OK;
}

ngx_epoll_process_events

      這個函式實現了epoll對事件的收集、分發。

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev, **queue;
    ngx_connection_t  *c; 

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);

    /* 呼叫epoll_wait來獲取當前已準備的事件 */
    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;

    /* 是否更新時間 */
    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }   

    /* epoll_wait出錯 */
    if (err) {
        if (err == NGX_EINTR) {

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }   

            level = NGX_LOG_INFO;

        } else {
            level = NGX_LOG_ALERT;
        }   

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }   

    /* epoll_wait沒有獲取到事件 */
    if (events == 0) {
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }   

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
                             return NGX_ERROR;
    }

    /* 上鎖,處理事件 */
    ngx_mutex_lock(ngx_posted_events_mutex);

    /* 遍歷所有準備就緒事件 */
    for (i = 0; i < events; i++) {
        /* 獲取當前事件的ngx_connection_t連線 */
        c = event_list[i].data.ptr;

        /* 這裡有個有趣的地方,ngx_connection_t的最後一位總是為0,所以Nginx
        就拿它來做instance標誌位,判斷事件是否過時 */
        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

    /* 獲取讀事件 */
        rev = c->read;

        // 事件是否過時
        if (c->fd == -1 || rev->instance != instance) {

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        /* 取出事件型別,陣列中的每一個元素都是epoll_event結構體 */
        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);

        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);
        }

#if 0
        if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "strange epoll_wait() events fd:%d ev:%04XD",
                          c->fd, revents);
        }
#endif

        if ((revents & (EPOLLERR|EPOLLHUP))
             && (revents & (EPOLLIN|EPOLLOUT)) == 0)
        {
            /*
             * if the error events were returned without EPOLLIN or EPOLLOUT,
             * then add these flags to handle the events at least in one
             * active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }

        /* 是否是讀事件、是否活躍 */
        if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
            if (revents & EPOLLRDHUP) {
                rev->pending_eof = 1;
            }
#endif

            if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
                rev->posted_ready = 1;

            } else {
                rev->ready = 1;
            }

            //flags中含有NGX_POST_EVENTS表示這批事件需要延後處理
            if (flags & NGX_POST_EVENTS) {
            //加入到延遲操作的佇列中
                queue = (ngx_event_t **) (rev->accept ?
                               &ngx_posted_accept_events : &ngx_posted_events);

                ngx_locked_post_event(rev, queue);

            } else {
                //無需延後,直接處理
                rev->handler(rev);
            }
        }

    /* 取出寫事件 */
        wev = c->write;

        if ((revents & EPOLLOUT) && wev->active) {

            // 事件是否過期
            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            //是否延後
            if (flags & NGX_POST_THREAD_EVENTS) {
                wev->posted_ready = 1;

            } else {
                wev->ready = 1;
            }

            if (flags & NGX_POST_EVENTS) {
                    //加入到延遲操作的佇列中
                ngx_locked_post_event(wev, &ngx_posted_events);

            } else {
                    //直接處理
                wev->handler(wev);
            }
        }
    }

    ngx_mutex_unlock(ngx_posted_events_mutex);

    return NGX_OK;
}

      這裡值得一提的是instance這個“用來判斷事件是否過期的標誌位”是如果工作的。
      Nginx利用了指標的最後一位一定是0(指標一定是與作業系統的位數對齊的)這一特性。所以,在event_list[i].data.ptr獲得ngx_connection_t指標的同時,將最後一位設定為這個事件instance標誌。在去除來時,進行修改即可。
      那麼事件為什麼會過期呢?比如:有3個事件等待處理,1事件中處理時,需要把3事件給關閉,如果只是把3事件的fd套接字設定為-1(原先為50),並呼叫ngx_free_connection是否有用的?答案是,依然可能失效。因為可能在處理2事件的時候呼叫ngx_get_connection又分配到了fd=50的套接字。那麼,在處理3事件時,這個事件就不是過去的那個事件了,是過期的!但是3事件依舊以為“自己還是過去那個自己”。
      於是,面對這種情況,Nginx在ngx_get_connection函式中,將從連線池中獲取的新連線instance標誌位取反。那麼,當下一次使用這個連線時就發現instance發生了變化,認定為過期事件。

ngx_epoll_module epoll模組實現

ngx_module_t  ngx_epoll_module = { 
    NGX_MODULE_V1,
    &ngx_epoll_module_ctx,               /* module context */
    ngx_epoll_commands,                  /* module directives */
    NGX_EVENT_MODULE,                    /* module type */
    NULL,                                /* init master */
    NULL,                                /* init module */
    NULL,                                /* init process */
    NULL,                                /* init thread */
    NULL,                                /* exit thread */
    NULL,                                /* exit process */
    NULL,                                /* exit master */
    NGX_MODULE_V1_PADDING
};