1. 程式人生 > >nginx事件模組之ngx_epoll_module原始碼分析

nginx事件模組之ngx_epoll_module原始碼分析

ngx_epoll_module是nginx眾多事件模組的其中一個,它利用linux的epoll模型實現nginx事件框架所定義的事件模組介面。執行在linux系統上的nginx預設使用該模組作為事件框架的底層實現。

ngx_epoll_module主要是實現了ngx_event.h中規定的事件模組的介面,即實現了ngx_event_module_t中定義的那組回撥函式,如下程式碼片段所示:

static ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name, /* epoll */
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */

    {
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
#if (NGX_HAVE_EVENTFD)
        ngx_epoll_notify,                /* trigger a notify */
#else
        NULL,                            /* trigger a notify */
#endif
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

模組一共提供了兩個指令,epoll_events和worker_aio_requests,分別對應於一次epoll_wait允許處理的最大事件數和非同步請求數,它們的預設值分別是512和32。

ngx_epoll_create_conf和ngx_epoll_init_conf用於建立和初始化模組配置,這裡略過,重點關注ngx_epoll_init,ngx_epoll_add_event,ngx_epoll_del_event,ngx_epoll_process_events以及ngx_epoll_add_connection和ngx_epoll_del_connection。

ngx_epoll_init


在所有介面中,ngx_epoll_init是最先被呼叫的介面,因為它的工作是為模組的執行建立和配置環境。ngx_epoll_init介面在worker程序啟動過程中,開始提供服務前被呼叫。它只會被呼叫一次

static ngx_int_t 
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;

    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    /* 建立epoll檔案 */
    if (ep == -1) {
        ep = epoll_create(cycle->connection_n / 2);

        if (ep == -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }

/* 事件通知環境初始化 */
#if (NGX_HAVE_EVENTFD)
        if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {
            ngx_epoll_module_ctx.actions.notify = NULL;
        }
#endif

/* 檔案非同步IO初始化 */
#if (NGX_HAVE_FILE_AIO)
        ngx_epoll_aio_init(cycle, epcf);
#endif

/* 測試是否支援EPOLLRDHUP */
#if (NGX_HAVE_EPOLLRDHUP)
        ngx_epoll_test_rdhup(cycle);
#endif
    }

    /* 建立epoll事件陣列,陣列大小與配置的events一致,預設是512 */
    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }

        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }

    nevents = epcf->events;
    
    /* 設定ngx_io為ngx_os_io。
     * 其它模組進行網路io的時候呼叫的是
     * ngx_io中的介面,因此在worker程序開始工作前
     * 就必須先設定好,實際上其它事件模組也是這麼做的
     * /
    ngx_io = ngx_os_io;
    
    /* 這一步很關鍵。nginx事件框架和外部介面呼叫的是ngx_event_actions
     * 中定義的介面,雖然在ngx_event_core_module的
     * ngx_event_core_init_conf函式中已經選定了要使用的事件模組,
     * 但是並沒有將ngx_event_actions與具體的事件模組進行對映,不能
     * 正常工作,所以這裡將ngx_event_actions對映到epoll模組
     * 的介面上。通過這樣的方式,呼叫ngx_event_actions的介面實際上就是
     * 呼叫epoll模組的介面。
     * /
    ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
    /* epoll邊緣觸發 */
    ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
    /* epoll水平觸發 */
    ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
    /* 這兩個標誌在別處會用到,所以需要設定 */
                      |NGX_USE_GREEDY_EVENT
                      |NGX_USE_EPOLL_EVENT;

    return NGX_OK;
}

ngx_epoll_add_event

ngx_epoll_add_event用於向事件框架中新增事件,核心是使用epoll_ctl實現

static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    int                  op;
    uint32_t             events, prev;
    ngx_event_t         *e;
    ngx_connection_t    *c;
    struct epoll_event   ee;

    /* 除了個別場景,大部分情況下事件的data都指向所對應的連線 */
    c = ev->data;

    events = (uint32_t) event;
    
    if (event == NGX_READ_EVENT) {
        e = c->write;
        prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
        events = EPOLLIN|EPOLLRDHUP;
#endif

    } else {
        e = c->read;
        prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
        events = EPOLLOUT;
#endif
    }

    if (e->active) {
        op = EPOLL_CTL_MOD;
        events |= prev;

    } else {
        op = EPOLL_CTL_ADD;
    }

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
    if (flags & NGX_EXCLUSIVE_EVENT) {
        events &= ~EPOLLRDHUP;
    }
#endif

    ee.events = events | (uint32_t) flags;
    ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "epoll add event: fd:%d op:%d ev:%08XD",
                   c->fd, op, ee.events);

    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }

    ev->active = 1;
#if 0
    ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

    return NGX_OK;
}

ngx_epoll_del_event

ngx_epoll_del_event用於刪除事件,同樣也是使用epoll_ctl實現

ngx_epoll_process_events

該函式是處理事件的核心所在,在使用epoll作為IO多路複用模型的情況下,ngx_process_events_and_timers函式中呼叫的ngx_process_events實際上就是這個函式。

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev;
    ngx_queue_t       *queue;
    ngx_connection_t  *c;

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);

    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;

    /* 如果設定了NGX_UPDATE_TIME或者設定了時間精度並且觸發了事件精度事件,
     * 就更新快取時間 */
    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }

    /* epoll_wait呼叫失敗,如果系統錯誤碼是EINTR,說明是被優先順序更高的系統調
     * 用打斷(例如接收到訊號),這種情況下不視為錯誤,其它情況下視為
     *epoll_wait失敗 */
    if (err) {
        if (err == NGX_EINTR) {

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;

        } else {
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }

    /* events為0,說明沒有事件發生。如果timer是-1,epoll_wait在沒有
     * 事件發生時不會返回,這說明在某個地方出現了問題 */
    if (events == 0) {
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
        return NGX_ERROR;
    }

    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;

        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        rev = c->read;

        /* 如果fd是-1,或者instance與連線的instance標誌不一致,
         * 說明在這個時刻,該讀事件已經是過期事件,不應該再處理。
        if (c->fd == -1 || rev->instance != instance) {

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);

        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);

            /*
             * if the error events were returned, add EPOLLIN and EPOLLOUT
             * to handle the events at least in one active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }

#if 0
        if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "strange epoll_wait() events fd:%d ev:%04XD",
                          c->fd, revents);
        }
#endif

        if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
            if (revents & EPOLLRDHUP) {
                rev->pending_eof = 1;
            }

            rev->available = 1;
#endif

            rev->ready = 1;
            
            /* 如果設定了NGX_POST_EVENTS,說明該事件要延後處理。
             * 如果是accept事件,就把事件放到accept_post佇列中,否則
             * 放到普通的post佇列中
            if (flags & NGX_POST_EVENTS) {
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                ngx_post_event(rev, queue);

            } else {
                /* 不需要延後處理,立即呼叫handler */
                rev->handler(rev);
            }
        }

        wev = c->write;

        if ((revents & EPOLLOUT) && wev->active) {
            
        /* 如果fd是-1,或者instance與連線的instance標誌不一致,
         * 說明在這個時刻,該寫事件已經是過期事件,不應該再處理。
            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            wev->ready = 1;
#if (NGX_THREADS)
            wev->complete = 1;
#endif
            /* 如果設定了NGX_POST_EVENTS,說明該事件要延後處理,
             * 將其放到post事件佇列中 */
            if (flags & NGX_POST_EVENTS) {
                ngx_post_event(wev, &ngx_posted_events);

            } else {
                /* 不需要延後,立即呼叫handler處理 */
                wev->handler(wev);
            }
        }
    }

    return NGX_OK;
}

ngx_epoll_add_connection

新增連線,相當於把連線相關的事件及其觸發方式(讀寫,對端關閉,邊緣觸發)一次性註冊,並且設定讀寫事件為啟用狀態,程式碼如下

static ngx_int_t
ngx_epoll_add_connection(ngx_connection_t *c)
{
    struct epoll_event  ee;

    ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP;
    ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                   "epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);

    if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
                      "epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
        return NGX_ERROR;
    }

    c->read->active = 1;
    c->write->active = 1;

    return NGX_OK;
}

ngx_epoll_del_connection

刪除連線。把連線相關的事件從框架中刪除,並且把連線的讀寫事件設定為非啟用

static ngx_int_t
ngx_epoll_del_connection(ngx_connection_t *c, ngx_uint_t flags)
{
    int                 op;
    struct epoll_event  ee;

    /*
     * when the file descriptor is closed the epoll automatically deletes
     * it from its queue so we do not need to delete explicitly the event
     * before the closing the file descriptor
     */

    if (flags & NGX_CLOSE_EVENT) {
        c->read->active = 0;
        c->write->active = 0;
        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
                   "epoll del connection: fd:%d", c->fd);

    op = EPOLL_CTL_DEL;
    ee.events = 0;
    ee.data.ptr = NULL;

    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }

    c->read->active = 0;
    c->write->active = 0;

    return NGX_OK;
}

相關推薦

nginx事件模組ngx_epoll_module原始碼分析

ngx_epoll_module是nginx眾多事件模組的其中一個,它利用linux的epoll模型實現nginx事件框架所定義的事件模組介面。執行在linux系統上的nginx預設使用該模組作為事件框架的底層實現。ngx_epoll_module主要是實現了ngx_even

【kubernetes/k8s原始碼分析】kubelet原始碼分析cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

【kubernetes/k8s原始碼分析】 controller-managerreplicaset原始碼分析

ReplicaSet簡介     Kubernetes 中建議使用 ReplicaSet來取代 ReplicationController。ReplicaSet 跟 ReplicationController 沒有本質的不同, ReplicaSet 支援集合式的

【kubernetes/k8s原始碼分析】 client-go包Informer原始碼分析

Informer 簡介        Informer 是 Client-go 中的一個核心工具包。如果 Kubernetes 的某個元件,需要 List/Get Kubernetes 中的 Object(包括pod,service等等),可以直接使用

【go原始碼分析】go原始碼slice原始碼分析

Go 語言切片是對陣列的抽象。 Go 陣列的長度不可改變,與陣列相比切片的長度是不固定的,可以追加元素,在追加時可能使切片的容量增大。 len() 和 cap() 函式     切片是可索引的,並且可以由 len() 方法獲取長度。    

【go原始碼分析】go原始碼list原始碼分析

本文針對go 1.11版本,路徑src/container/list/list.go 資料結構 Element結構體 Value 前驅 後繼 // Element is an element of a linked list. type Element st

RecyclerViewSnapHelper原始碼分析

很久沒有寫Android控制元件了,正好最近專案有個自定義控制元件的需求,整理了下做個總結,主要是實現類似於抖音翻頁的效果,但是有有點不同,需要在底部漏出後面的view,這樣說可能不好理解,看下Demo,按頁滑動,後面的View有放大縮放的動畫,滑動速度過小時會有回到原位的效果,下滑也是按頁滑動的效果。

Nginx-rtmp直播業務流程分析--比較詳細

1. 綜述 1.1 直播原理 使用 obs 向 nginx 推送一個直播流,該直播流經 nginx-rtmp 的 ngx_rtmp_live_module 模組轉發給 application live 應用, 然後使用 vlc 連線 live,播放該直播流。 1.2 nginx.conf

springMVC原始碼學習addFlashAttribute原始碼分析

本文主要從falshMap初始化,存,取,消毀來進行原始碼分析,springmvc版本4.3.18。關於使用及驗證請參考另一篇https://www.cnblogs.com/pu20065226/p/10032048.html 1.初始化和呼叫,首先是入springMVC 入口webmvc包中org.spr

java集合----ArrayList原始碼分析(基於jdk1.8)

一、ArrayList 1、ArrayList是什麼: ArrayList就是動態陣列,用MSDN中的說法,就是Array的複雜版本,它提供了動態的增加和減少元素,實現了ICollection和IList介面,靈活的設定陣列的大小等好處,實現了Randomaccess介面,支援快速隨

java集合----HashMap原始碼分析(基於JDK1.7與1.8)

一、什麼是HashMap 百度百科這樣解釋: 簡而言之,HashMap儲存的是鍵值對(key和value),通過key對映到value,具有很快的訪問速度。HashMap是非執行緒安全的,也就是說在多執行緒併發環境下會出現問題(死迴圈) 二、內部實現 (1)結構 HashM

ROS Navigation Stackdwa_local_planner原始碼分析

DWA和base_local_planner的關係 在base_local_planner包中有兩個檔案叫trajectory_planner.cpp 以及對應的ros實現,其和DWA是同一層的。 由於nav_core提供了統一的介面,因此我們可以先看看統一的介面有哪些,那我們便知

go 原始碼學習---Tail 原始碼分析

已經有兩個月沒有寫部落格了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大資料相關的內容,導致最近工作較忙,部落格停止了更新,正好想撿起之前go的東西,所以找了一個原始碼學習 這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就學

Java執行緒池ThreadPoolExecutor原始碼分析

一、引言 Java併發工具包自帶了很多常用的執行緒池,程式可以將定義的Runnable、Callable任務提交到執行緒池當中執行,由執行緒池負責非同步執行其中的任務。 Java執行緒池框架結構圖: 其中,Executors是一個執行緒池靜態工廠類,可以呼叫其

【轉】Java併發程式設計筆記CopyOnWriteArrayList原始碼分析

併發包中併發List只有CopyOnWriteArrayList這一個,CopyOnWriteArrayList是一個執行緒安全的ArrayList,對其進行修改操作和元素迭代操作都是在底層建立一個拷貝陣列(快照)上進行的,也就是寫時拷貝策略。 我們首先看一下CopyOnW

詳細解析Android的View事件分發機制 附帶原始碼分析

前言 在Android中,事件分發機制是一塊很重要的知識點,掌握這個機制能幫你在平時的開發中解決掉很多的View事件衝突問題,這個問題也是面試中問的比較多的一個問題了,今天就來總結下這個知識點。 事件分發機制 事件分發原因 Android中頁面上的View是以

實戰java高併發程式設計CountDownLatch原始碼分析

首先看第一個! CountDownLatch 使用場景 CountDownLatch類是常見的併發同步控制類,適用於某一執行緒的執行在其他多個執行緒執行完成之後,比如火箭發射前需要各項指標檢查,只有當各項指標檢查完才能發射,再比如解析多個excel文件,只有當

nginx事件模組 -- 第一篇

微信公眾號:鄭爾多斯關注可瞭解更多的Nginx知識。任何問題或建議,請公眾號留言;關注公眾號,有趣有內涵的文章第一時間送達! 事件機制 下面是我們對nginx事件相關的配置,如下: 1events {2    worker_co

原始碼分析String原始碼分析

    前面已經分析過String原始碼為什麼是不可變的,同時通過我們常用的String的相關的類StringBuffer和StringBuilder,我們可以發現String類中欄位名被定義為了final型別,這樣的話將只能被賦值一次。接下來,繼續看String原始碼實現的

實戰java高併發程式設計ReentrantReadWriteLoc原始碼分析

前面分析了併發工具類CountDownLatch和CyclicBarrier,本文分享分析比較重要的ReentrantReadWriteLock。 使用場景 以前的同步方式需要對讀、寫操作進行同步,讀讀之間,讀寫之間,寫寫之間等;工程師們發現讀讀之間並不會影響資