1. 程式人生 > 實用技巧 >I/O多路複用 select 原始碼解析

I/O多路複用 select 原始碼解析

  1. I/O多路複用介紹
    在介紹I/O多路複用之前,我們先看看一個最基本的socket client/server端是如何接收處理資料的:

client: socket -> bind -> connect -> read/write -> close

server: socket -> bind -> listen -> accept -> read/write -> close

對於server端,這種方式邏輯簡單實現容易,但是不足之處也比較明顯,那就是隻支援一個client的連線和資料收發。那如何支援多個client呢?很容易想到使用多執行緒,即每當accept一個新連線的時候,就建立一個新的執行緒,在新的執行緒裡處理該連線對應socket的資料收發。但是多執行緒也有自己的問題,比如當client非常多時,系統需要建立和維護大量的執行緒,這對系統資源消耗比較大,同時對共享資源的訪問需要各種加鎖解鎖等操作,影響系統效率。

那有沒有一種方法,只用一個執行緒就能支援多個client的資料收發互動呢?答案是肯定的,那就是I/O多路複用。I/O多路複用通過一種機制,使單執行緒可以監視多個檔案描述符,一旦有一個或多個fd有事件發生就通知應用程式,然後進行相應fd的讀寫等操作。與多執行緒相比,I/O 多路複用的最大優勢是系統開銷小,不需要建立和管理大量的執行緒,提高系統效率。當前常見的多路複用機制有select, poll, epoll等,各個機制都有自己特點。本篇介紹select的實現機制,通過對原始碼的解析,瞭解其實現機制,進而理解其優缺點,如為何select預設最大監聽fd數量為1024個,為何每次呼叫select前需要重新註冊fd監聽事件和設定超時引數?帶著這些疑問,我們開始下面的講解。

  1. 原始碼
    a. 下載地址
    https://www.kernel.org/

該網頁上可以下載最新版本的linux kernel程式碼,本文以5.2.13為例講解

b. 函式呼叫關係
壓縮包解壓後,找到fs/select.c檔案,該檔案就是select函式和poll函式的程式碼實現,本篇只介紹select函式相關的實現

select函式入口:

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
return kern_select(n, inp, outp, exp, tvp);
}
其主要函式呼叫關係:select -> kern_select -> core_sys_select -> do_select

  1. fd_set
    a. 型別解析
    在開始select解析之前,首先介紹一個非常重要的資料結構fd_set,其定義在檔案 include/linux/types.h:

typedef __kernel_fd_set fd_set;
我們轉到源結構__kernel_fd_set,其定義在檔案 include/uapi/linux/posix_types.h:

undef __FD_SETSIZE

define __FD_SETSIZE 1024

typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

...

endif /* _LINUX_POSIX_TYPES_H */

可以看到,fd_set本質是一個unsigned long型的陣列,其陣列長度與__FD_SETSIZE有關。這個巨集定義有什麼作用呢,這個值1024與select預設最大監聽fd數量1024個有沒有關係呢?

我們知道,linux系統中一切皆是檔案,如鍵盤,螢幕,串列埠,socket等等,每開啟一個裝置系統都會分配一個fd來指示該檔案,其值從0開始。如程序啟動的時候系統預設開啟三個檔案(0:stdin, 1:stdout, 2:stderr),因此我們可以直接從鍵盤讀取輸入資料或直接輸出資料到螢幕,而不需要手動呼叫open()函式。當我們繼續開啟其他檔案時,系統會為該檔案分配新的fd,新的fd是該程序的檔案描述符表中值最小的可用檔案描述符,即每次分配的fd值自增++。Linux 中一個程序預設最多能開啟1024(NR_OPEN_DEFAULT)個檔案,因此預設fd取值範圍為0-1023。

select與fd又有什麼關係呢?我們可以想到select函式定會有一個輸入引數(後面可以看到有三個相關引數),用來儲存需要監聽的fd,那麼這個引數是什麼型別呢,如何能儲存需要監聽的多個fd呢?答案就是用1024個bit位來儲存。由於fd範圍是0-1023,我們可以以fd為索引,當需要監聽該fd時,將對應索引的bit位置1,當不需要監聽該fd時,將對應索引的bit位置0,這樣就可以儲存所有設定的監聽描述符了。如果1024個bit位全部置1,表示最多可同時監聽1024個fd,這就是為什麼select預設最多隻能監聽1024個fd的原因了。那1024個bit以什麼方式存在呢?為了後續處理效率,系統選用unsigned long型陣列為載體來儲存,即unsigned long fds[1024 / (8 * sizeof(long))]。如果我們將1024這個值用巨集定義__FD_SETSIZE來指示,即為unsigned long fds[__FD_SETSIZE / (8 * sizeof(long))],是不是有些眼熟,這不就是最開始__kernel_fd_set/fd_set的定義嘛!由此我們知道,select就是使用unsigned long陣列來儲存監聽的fd,陣列中的每個unsigned long以及unsigned long 中的每個bit,都表示一個fd是否被監聽。假設一個long 4個位元組,則fd_set圖示如下:

上述過程我們發現有兩個巨集定義,__FD_SETSIZE 和 NR_OPEN_DEFAULT,那麼這兩個值與select有什麼關係呢?NR_OPEN_DEFAULT 表示一個程序最多可以開啟的檔案個數,即約束了fd的取值為 0 - NR_OPEN_DEFAULT-1,而 __FD_SETSIZE 表示 select 用於儲存fd所用的bit位個數。如果NR_OPEN_DEFAULT > __FD_SETSIZE, 則select可能無法監聽所有開啟的檔案,即fd範圍在 __FD_SETSIZE - NR_OPEN_DEFAULT之間的檔案無法監聽。如果NR_OPEN_DEFAULT < __FD_SETSIZE, 則select可以監聽所有的檔案,只是會有多餘無用的bit位。兩個巨集定義預設值都為1024,如果修改需要重新編譯系統,並且最好以sizeof(long)為單位進行修改,後面可以看到如果取值不合適,會影響select的執行效率。

b.FD_ZERO、FD_SET、FD_CLR、FD_ISSET
瞭解了fd_set的結構和實現之後,再來看看跟它相關的幾個常見的函式,主要就是對特定的bit位進行置1,置0操作,以及查詢某個bit位當前是置1還是置0等:

void FD_ZERO(fd_set *fdset); // fd_set中的所有bit位置0
void FD_SET(int fd, fd_set *fdset); // fd_set中以檔案描述符fd為索引的bit位置1
void FD_CLR(int fd, fd_set *fdset); // fd_set中以檔案描述符fd為索引的bit位置0
int FD_ISSET(int fd, fd_set *fdset); // 查詢fd_set中以fd值為索引的bit位是否置位,用於判斷fd是否發生了事件

  1. select 函式原型
    瞭解了fd_set的實現之後,我們就可以來看看select函式的原型聲明瞭。其函式宣告如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
a.nfds
後面我們會分析到,select內部是通過迴圈判斷fd_set中所有置1的bit位所對應的fd是否有事件發生,當最高置1的bit位判斷完成後,迴圈就可以提前結束了,因為再往高位所有的bit位都置0了,也就是那些fd並沒有被監聽,因此可以提前退出迴圈提高效率。如下圖所示:

nfds引數即表示查詢完所有置1的bit位所需要的迴圈次數,由於索引值就是fd的值,因此該引數取值通常為所有的監聽描述符中最大值+1。當然直接取1024也沒有問題,只是select內部可能會多做些無用的迴圈判斷,稍稍影響些效率。

b.readfd/writefds/exceptfds
三個監聽檔案描述符集合,可分別設定讀/寫/異常事件的監聽描述符集合,如果所有fd均不需要監聽某個時間則該集合可以設為NULL

c.timeout
超時時間,通過s和us兩個欄位設定,它表示如果所有的fd都沒有任何事件發生的時候,函式返回之前需要等待的時間。引數有三種取值對應不同的處理機制:

1. NULL, 表示永遠等待下去,直到有一個或多個fd發生了事件函式才會返回,返回值為所有fd發生的所有事件的個數之和

2. 具體等待時間,即s和us至少一個不為0,表示當所有的fd都沒有事件發生的時候,函式返回前需要等待的時間。在等待期間如果任何一個fd發生任何一個事件,則函式返回,返回值為所有fd發生的所有事件的個數之和;如果時間到了仍沒有fd發生任何事件,則函式返回0

3. 不等待,s和us均為0,即非阻塞方式,表示檢查完所有監聽的fd,不管有沒有事件發生函式均立即返回。如果有事件發生,則返回值為所有fd發生的所有事件的個數之和;如果沒有任何fd發生任何事件,則函式返回0

d.返回值
上面已經提到過了,返回值表示所有fd發生的所有事件個數之和。由於返回值僅僅是一個int值,因此select只能告訴我們總共有多少個事件發生了,但不能告訴我們是哪個fd發生了事件(可能是一個,多個,甚至全部),如果同一個fd發生了多個事件,也不能告訴我們具體發生的是哪些事件。所以當select返回後,使用者只能通過FD_ISSET函式迴圈判斷每個fd是否有事件發生,如果有事件發生再判斷該fd發生的是可讀、可寫還是異常事件,然後再進行相應的讀寫操作。所以select具有O(n)的輪詢複雜度,監聽的fd越多迴圈時間就越長。後面當分析epoll的實現的時候,我們會發現epoll只返回發生事件的fd資訊,使用者可以直接進行讀寫操作,大大提高了效率

  1. core_sys_select 解析
    int core_sys_select(int n, fd_set *inp, fd_set *outp, fd_set exp, struct timespec64 end_time)
    {
    ...
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; // SELECT_STACK_ALLOC = 256
    size = FDS_BYTES(n); // n個fd,n個bit,使用long型陣列儲存,需要陣列長度 size = (n+8
    sizeof(long)-1)/8
    (sizeof(long))
    bits = stack_fds; // 核心預分配的long型陣列空間,共256個位元組,陣列長度 256/4 = 64

    fds.in = bits;
    fds.out = bits + size;
    fds.ex = bits + 2size;
    fds.res_in = bits + 3
    size;
    fds.res_out = bits + 4size;
    fds.res_ex = bits + 5
    size; // 6個long型陣列指標,分別用於儲存輸入和輸出的in/out/ex描述符集合

    get_fd_set(n, inp, fds.in);
    get_fd_set(n, outp, fds.out);
    get_fd_set(n, exp, fds.ex); // 使用者監聽的三組fd分別拷貝到核心態,每組拷貝size個long型長度

    zero_fd_set(n, fds.res_in); // 清空用於儲存結果的三組long型陣列,每組清空size個long型長度
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    ret = do_select(n, &fds, end_time); // 迴圈查詢所有fd的驅動poll函式,檢查是否有可讀可寫事件,有則返回,無則進入睡眠等待超時或等待事件發生

    if (ret <= 0)
    return ret; // 返回0表示沒有任何fd有事件發生,<0表示發生錯誤

    set_fd_set(n, inp, fds.res_in);
    set_fd_set(n, outp, fds.res_out);
    set_fd_set(n, exp, fds.res_ex); // 拷貝儲存結果的三組long型陣列資料到使用者態

    return ret; // 返回所有fd發生的所有事件個數之和
    }
    a.FDS_BYTES(n)
    我們先來看巨集定義FDS_BYTES(n),經過多層巨集定義呼叫,最終為:

define FDS_BYTES(n) = (n+8sizeof(long)-1)/8(sizeof(long))
即n個bit位由long型陣列來儲存所需要的陣列長度。我們以long為4個位元組來說明,那麼1-32個bit位只需要長度為1的long型陣列,如果有33個bit位,則就需要長度為2的long型陣列了。因此如前所述,當我們在給select傳入第一個引數nfds的時候,該引數取值最好為所有監聽fd中最大值+1,讓系統分配能存下nfds個bit位的最小long型陣列長度,如果直接傳入1024,這裡就會多分配無用的陣列長度,後續也會多做無用的迴圈判斷

b.stack_fds
stack_fds就是系統預先分配的long型陣列空間,用於儲存所有需要監聽的fd,以及用於儲存查詢結果,返回讀,寫,異常事件的查詢結果。即long型陣列空間所需長度為6FDS_BYTES(n),如果nfds取值最大1024,即程序所有開啟的1024個fd全部進行可讀可寫和異常監聽,那麼此時FDS_BYTES(n) = (1024+8sizeof(long)-1)/8(sizeof(long)) = 32,即儲存監聽可讀事件的fd需要長度為32的long型陣列,加上監聽可寫和異常事件的fd,以及返回結果的儲存,總共需要長度為 326 = 192的long型陣列,而long型陣列stack_fds預設長度為 256/4 = 64,因此當這個預設長度儲存不下時,就需要對陣列長度進行擴充套件,擴充套件為6*FDS_BYTES(n)。原始碼如下:

...
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6)
{
/* Not enough space in on-stack array; must use kmalloc */
...
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
}
c. 函式流程圖
當long型陣列stack_fds空間足夠了之後,通過6個間距為FDS_BYTES(n)的long型指標,標記6段長度為FDS_BYTES(n)的long型陣列空間,用來儲存所有輸入和輸出的fd。函式處理流程如下圖:

通過get_fd_set函式將需要監聽可讀可寫和異常事件的fd拷貝到三個輸入long型陣列空間fds->in/out/ex,通過zero_fd_set先將三個輸出long型陣列空間fds->res_in/out/ex清空,用於儲存查詢結果
通過do_select函式迴圈查詢所有fd發生的事件,如果返回值<0表示內部發生異常,如果=0表示沒有事件發生,如果>0表示共發生了多少個事件

將查詢結果賦值到三個輸出long型陣列空間fds->res_in/out/ex

通過set_fd_set將查詢結果從輸出long型陣列空間fds->res_in/out/ex拷貝回用戶輸入的fd_set,然後返回發生的事件個數,並作為select函式的最終返回值

由第4步可知,select返回監聽結果是複用輸入的fd_set的,因此當呼叫完一次select函式後,需要重新置位需要監聽的fd。否則假設我們要監聽5個fd,結果第一次select返回發現其中只有2個fd有事件發生,這時另外3個fd標誌位是被清零了,如果下次呼叫select之前不重新置位,那麼這3個fd就不會再進入查詢迴圈中,即便有事件發生,select函式再也不會返回這些fd的結果了。因此在呼叫完select之後,一定要將需要監聽的fd重新置位,具體可參考後面的示例程式

  1. do_select 解析
    通過core_sys_select函式的解析,我們發現真正完成查詢事件發生結果的是do_select函式。該函式首先獲取三個輸入long型陣列,獲取需要查詢的fd,查詢完成後,再將結果寫入到三個輸出long型陣列中。下面我們通過5種場景,分別分析各個場景的處理機制:

  2. 有fd收到資料

  3. 沒有fd收到資料,超時時間設為0,即不等待

  4. 沒有fd收到資料,超時時間設為NULL,即永遠等待直到有fd收到資料

  5. 沒有fd收到資料,超時時間設為具體值,定時結束之前有fd收到資料

  6. 沒有fd收到資料,超時時間設為具體值,定時結束之前沒有fd收到資料

static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
...
int timed_out = 0;
u64 slack = 0;

// 註冊_qproc函式,用於查詢事件發生結果時將當前程序加入讀寫等待佇列
poll_initwait(&table);

// 我們有如下5種場景,下面分析一下分別會有什麼樣的處理
// 1. 有fd收到資料
// 2. 沒有fd收到資料,超時時間設為0,即不等待
// 3. 沒有fd收到資料,超時時間設為NULL,即永遠等待直到有fd收到資料
// 4. 沒有fd收到資料,超時時間設為具體值,定時結束之前有fd收到資料
// 5. 沒有fd收到資料,超時時間設為具體值,定時結束之前沒有fd收到資料

// 對於場景2,超時時間設為0,設定timed_out = 1,該標誌位用於場景2退出for (;;)迴圈
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) 
    timed_out = 1;

// 對於場景4和5,超時時間設為具體值,則將超時時間結構體轉為U64整數,便於後續定時器處理
if (end_time && !timed_out) 
    slack = select_estimate_accuracy(end_time);

retval = 0; // 統計所有fd發生的所有事件個數之和,也是select函式的返回值
for (;;)    // 不同場景有不同的機制來退出該死迴圈
{
    // 6個long型指標inp/outp/exp/rinp/routp/rexp,分別指向stack_fds中用於輸入和輸出的in/out/ex檔案描述符集合
    inp   = fds->in;
    outp  = fds->out;
    exp   = fds->ex;
    rinp  = fds->res_in;
    routp = fds->res_out;
    rexp  = fds->res_ex;

    // 迴圈遍歷所有的n個bit位
    for (i = 0; i < n; ++rinp, ++routp, ++rexp)
    {
        unsigned long in, out, ex, all_bits, bit = 1;
        unsigned long res_in = 0, res_out = 0, res_ex = 0;

        // n個bit位以long型為單位進行多組(即32個為一組)迴圈遍歷
        in = *inp++; out = *outp++; ex = *exp++;
        all_bits = in | out | ex;
        if (all_bits == 0) { // 該組的32(BITS_PER_LONG)個fd都不需要監聽in/out/ex事件,則調過該組迴圈
            i += BITS_PER_LONG;
            continue;
        }

        // 迴圈遍歷一個組的32個bit位
        for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1)
        {
            // 當遍歷的是最後一組時,該組fd個數可能不到32個了,則遍歷完最後一個fd就可以提前退出該組迴圈
            if (i >= n)
                break;

            // 該bit位對應的fd沒有註冊監聽in/out/ex事件,則跳過該bit位迴圈
            if (!(bit & all_bits))
                continue;

            f = fdget(i); // 通過fd值獲取file結構

            // vfs_poll呼叫驅動程式 file->f_op->poll(file, pt),查詢該fd發生的事件,驅動函式poll功能:
            // 1. 返回該fd發生的in/out/ex事件,用mask標識發生的事件
            // 2. 將當前程序加入到該fd的讀寫等待佇列,當程序喚醒後再從等待佇列中移除該程序相關的節點
            mask = vfs_poll(f.file, wait);

            // 當fd發生了in/out/ex事件,且使用者設定監聽該事件,則儲存查詢結果:
            // 1. 儲存發生事件的fd值
            // 2. retval++,即所有fd發生的所有事件個數之和,即如果同一個fd發生2個事件,統計結果是+2而不是+1,這就是select返回值的意義
            if ((mask & POLLIN_SET) && (in & bit)) {
                res_in |= bit;
                retval++;
                wait->_qproc = NULL;
            }
            if ((mask & POLLOUT_SET) && (out & bit)) {
                res_out |= bit;
                retval++;
                wait->_qproc = NULL;
            }
            if ((mask & POLLEX_SET) && (ex & bit)) {
                res_ex |= bit;
                retval++;
                wait->_qproc = NULL;
            }
        }

        // 迴圈遍歷完該組的32個fd後,將發生事件的fd分別寫到用於儲存in/out/ex結果的三組long型陣列中
        if (res_in)
            *rinp = res_in;
        if (res_out)
            *routp = res_out;
        if (res_ex)
            *rexp = res_ex;
    }

    // 場景1,有事件發生,retval不為0,退出for (;;)迴圈,函式返回事件發生個數之和
    // 場景2,沒有事件發生,retval為0,但因為timed_out已被設為1,因此退出for (;;)迴圈,函式返回事件發生個數之和,即為0
    if (retval || timed_out || signal_pending(current)) 
        break;

    // 場景3,4,5,沒有事件發生,則需等待
    if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))
    {
            // 設定當前程序為睡眠狀態
            set_current_state(TASK_INTERRUPTIBLE);

            // 場景3,沒有事件發生,永遠等待直到有任何fd收到資料
            if (!expires)
                // 該程序移除執行佇列,同時主動釋放CPU控制權讓排程器排程執行另一個程序
                // 該程序將不再被排程,直到有fd收到資料時其驅動程式將該程序喚醒
                schedule(); 
                return -EINTR;

            // 場景4和5,沒有事件發生,開啟定時器等待特定時間
            // 設定定時時間
            hrtimer_set_expires_range_ns(&t.timer, *expires, delta);

            // 初始化定時器
            hrtimer_init_sleeper(&t, current);
                t->timer.function = hrtimer_wakeup; // 設定超時回撥函式hrtimer_wakeup
                t->task = current; // 場景4,設定t->task指向當前程序,用於poll_schedule_timeout判斷程序喚醒是因為有fd收到資料
                    // 如果直到定時器超時都沒有收到資料,則觸發超時回撥函式
                    static enum hrtimer_restart hrtimer_wakeup(struct hrtimer *timer)
                        t->task = NULL; // 場景5,設定t->task為空,用於poll_schedule_timeout判斷程序喚醒是因為超時
                        wake_up_process(task); // 喚醒當前程序

            // 啟動定時器
            hrtimer_start_expires(&t.timer, mode);

            // 該程序移除執行佇列,同時主動釋放CPU控制權讓排程器排程執行另一個程序
            // 該程序將不再被排程,直到有fd收到資料或者定時器超時將該程序喚醒
            schedule();

            set_current_state(TASK_RUNNING); // 程序被驅動程式或超時回撥函式喚醒,退出睡眠狀態

            // 根據t->task值判斷程序喚醒是因為超時還是因為有fd收到資料
            // 場景4,超時前有fd收到資料,不觸發回撥函式,t->task沒有被置空,poll_schedule_timeout返回-EINTR
            // 場景5,一直沒有fd收到資料,觸發回撥函式,t->task被置空,poll_schedule_timeout返回0
            return !t.task ? 0 : -EINTR; 

        // 根據poll_schedule_timeout返回值判斷程序喚醒是否因為超時,如果是則設定timed_out = 1
        timed_out = 1;
    }

    /*
    至此,我們完成了第一次for (;;)迴圈
    1. 對於場景1,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和
    2. 對於場景2,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和,此時和為0
    3. 對於場景3,4,5,在第一次for (;;)迴圈結束後並不能退出死迴圈,而是接著進入第二次迴圈,我們來分析在第二次迴圈的處理:
        3.1. 場景3,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和
        3.2. 場景4,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和
        3.3. 場景5,超時後timed_out被置1,進入第二次for (;;)迴圈後,回到場景2,退出死迴圈返回事件發生個數之和,此時和為0
    因此,對於所有場景,經過一次或者兩次for (;;)迴圈後,均能退出死迴圈,返回事件發生個數之和
    */
}

poll_freewait(&table); // 程序喚醒後,移除所有fd的讀寫等待佇列中該程序相關的表項

return retval; // 返回所有fd發生的所有事件的個數之和

}
主要的處理邏輯在註釋中已做說明,do_select主要處理邏輯就是:迴圈遍歷所有的n個bit位對應的fd,通過呼叫其驅動函式poll獲取該fd是否有事件發生:

對於場景1,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和

對於場景2,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和,此時和為0

對於場景3,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和

對於場景4,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和

對於場景5,超時後timed_out被置1,進入第二次for (;;)迴圈後,回到場景2,退出死迴圈返回事件發生個數之和,此時和為0

  1. 裝置驅動函式poll示例
    在上一節中,我們知道do_select函式通過呼叫驅動函式poll來獲取fd發生的事件,那麼驅動函式poll是如何知道該fd上有沒有事件發生呢,以及如何將程序新增到自己的讀寫等待佇列的呢?我們來分析一個例子,該裝置用一個環形佇列作為輸入輸出快取,當環形佇列還有空間時,則設定可寫事件,當佇列有資料時,則設定可讀事件:

static unsigned int scull_p_poll(struct file *filp, poll_table *wait)
{
unsigned int mask = 0;
...

poll_wait(filp, &dev->inq,  wait); // 將當前程序加入fd讀寫等待佇列
poll_wait(filp, &dev->outq, wait);
if (dev->rp != dev->wp)
    mask |= POLLIN | POLLRDNORM;   // 發生可讀事件
if (spacefree(dev))
    mask |= POLLOUT | POLLWRNORM;  // 發生可寫事件

return mask; // 返回fd發生的事件

}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p); // 呼叫註冊的_qproc函式
}

void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait); // 註冊_qproc函式
pwq->polling_task = current; // 賦值當前程序,用於將當前程序加入fd讀寫等待佇列
...
}

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
...
add_wait_queue(wait_address, &entry->wait); // 將包含當前程序的節點新增到fd讀寫等待佇列
}
驅動函式呼叫poll_wait將當前程序加入fd讀寫等待佇列,poll_wait呼叫poll_initwait函式中註冊的__pollwait函式,而在__pollwait函式中,通過呼叫add_wait_queue函式才真正的將當前程序新增到fd讀寫等待佇列

  1. 程序喚醒
    我們思考一個問題,對於場景3和4,當程序處於睡眠狀態的時候,如果這個時候任何一個fd上發生了事件,程序都將會喚醒,進而函式返回。這是如何實現的呢,為什麼任何fd發生了事件都可以喚醒程序呢?

上面分析驅動函式poll的功能的時候我們知道,在呼叫每個fd的poll函式的時候,會將當前程序加入到該fd的讀寫等待佇列,當完成一次for (;;)迴圈之後,該程序就被加入到所有fd的讀寫等待隊列了。當任意一個fd收到收據,都會喚醒自己讀寫等待佇列裡的所有程序,這就實現了當任何一個fd可讀可寫時就能立即喚醒程序的原理

我們以一個串列埠裝置為例,當硬體裝置收到資料時,觸發interrupt中斷處理,經過一系列的處理,最終呼叫到receive_buf函式來喚醒程序:

serial8250_interrupt -> serial8250_handle_port -> receive_chars -> tty_flip_buffer_push -> flush_to_ldisc -> disc->receive_buf

disc->receive_buf()
if (waitqueue_active(&tty->read_wait))
wake_up_interruptible(&tty->read_wait);
在receive_buf函式中,首先判斷是否有程序阻塞在自己的讀寫等待佇列中,如果有則呼叫wake_up_interruptible函式喚醒這些程序。當程序喚醒後,select函式返回之前,通過呼叫poll_freewait()函式移除所有fd的讀寫等待佇列中該程序相關的表項

  1. select 示例程式

include <sys/select.h>

include <stdio.h>

include <sys/types.h>

include <sys/stat.h>

include <fcntl.h>

include <unistd.h>

int main(int argc, char *argv[])
{
fd_set rfds;
struct timeval tv;
int fd;

mkfifo("test_fifo", 0666);
fd = open("test_fifo", O_RDWR);
while(1)
{
    FD_SET(0, &rfds);   // 每次select呼叫之前都需要重新設定監聽描述符和超時引數
    FD_SET(fd, &rfds);  
    tv.tv_sec = 3;
    tv.tv_usec = 0;

    int ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
    if (-1 == ret) // 異常
    {
        perror("select()");
    }
    else if (0 == ret) // 超時
    {
        printf("timeout\n");
    }
    else if (ret > 0) // 有fd收到資料
    {
        char buf[128] = {0};
        if (FD_ISSET(0, &rfds))  // 通過FD_ISSET判斷是哪個fd發生了事件
        {
            read(0, buf, sizeof(buf));
            printf("stdin = %s\n", buf);
        }
        else if (FD_ISSET(fd, &rfds))
        { 
            read(fd, buf, sizeof(buf));
            printf("fifo = %s\n", buf);
        }
    }
}
return 0;

}

  1. select 特點
    優點
    a. select目前幾乎在所有的平臺上支援,有良好跨平臺支援特性,在某些Unix系統上不支援poll()和epoll()

b. select對於超時提供了更好的精度,微秒級,而poll是毫秒

缺點
a. 每次呼叫 select,都需要把 fd 從使用者態拷貝到核心態,然後在核心輪詢遍歷所有 fd,再把結果拷貝回用戶態。返回後用戶還需要輪詢fd_set來知道哪些fd是有事件發生的,fd多的時候開銷很大

b. 單個程序能夠監視的檔案描述符的數量存在最大限制,在 Linux 上一般為 1024,可以通過修改巨集定義重新編譯核心的方式提升這一限制,但是這樣也會造成效率的降低

c. select返回結果時複用了使用者註冊監聽事件的fd_set結構,因此每次呼叫select前需要重新註冊監聽事件。超時引數在返回時也是未定義的,每次呼叫select之前也需要重新設定超時引數