I/O多路複用 select 原始碼解析
- I/O多路複用介紹
在介紹I/O多路複用之前,我們先看看一個最基本的socket client/server端是如何接收處理資料的:
client: socket -> bind -> connect -> read/write -> close
server: socket -> bind -> listen -> accept -> read/write -> close
對於server端,這種方式邏輯簡單實現容易,但是不足之處也比較明顯,那就是隻支援一個client的連線和資料收發。那如何支援多個client呢?很容易想到使用多執行緒,即每當accept一個新連線的時候,就建立一個新的執行緒,在新的執行緒裡處理該連線對應socket的資料收發。但是多執行緒也有自己的問題,比如當client非常多時,系統需要建立和維護大量的執行緒,這對系統資源消耗比較大,同時對共享資源的訪問需要各種加鎖解鎖等操作,影響系統效率。
那有沒有一種方法,只用一個執行緒就能支援多個client的資料收發互動呢?答案是肯定的,那就是I/O多路複用。I/O多路複用通過一種機制,使單執行緒可以監視多個檔案描述符,一旦有一個或多個fd有事件發生就通知應用程式,然後進行相應fd的讀寫等操作。與多執行緒相比,I/O 多路複用的最大優勢是系統開銷小,不需要建立和管理大量的執行緒,提高系統效率。當前常見的多路複用機制有select, poll, epoll等,各個機制都有自己特點。本篇介紹select的實現機制,通過對原始碼的解析,瞭解其實現機制,進而理解其優缺點,如為何select預設最大監聽fd數量為1024個,為何每次呼叫select前需要重新註冊fd監聽事件和設定超時引數?帶著這些疑問,我們開始下面的講解。
- 原始碼
a. 下載地址
https://www.kernel.org/
該網頁上可以下載最新版本的linux kernel程式碼,本文以5.2.13為例講解
b. 函式呼叫關係
壓縮包解壓後,找到fs/select.c檔案,該檔案就是select函式和poll函式的程式碼實現,本篇只介紹select函式相關的實現
select函式入口:
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
return kern_select(n, inp, outp, exp, tvp);
}
其主要函式呼叫關係:select -> kern_select -> core_sys_select -> do_select
- fd_set
a. 型別解析
在開始select解析之前,首先介紹一個非常重要的資料結構fd_set,其定義在檔案 include/linux/types.h:
typedef __kernel_fd_set fd_set;
我們轉到源結構__kernel_fd_set,其定義在檔案 include/uapi/linux/posix_types.h:
undef __FD_SETSIZE
define __FD_SETSIZE 1024
typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;
...
endif /* _LINUX_POSIX_TYPES_H */
可以看到,fd_set本質是一個unsigned long型的陣列,其陣列長度與__FD_SETSIZE有關。這個巨集定義有什麼作用呢,這個值1024與select預設最大監聽fd數量1024個有沒有關係呢?
我們知道,linux系統中一切皆是檔案,如鍵盤,螢幕,串列埠,socket等等,每開啟一個裝置系統都會分配一個fd來指示該檔案,其值從0開始。如程序啟動的時候系統預設開啟三個檔案(0:stdin, 1:stdout, 2:stderr),因此我們可以直接從鍵盤讀取輸入資料或直接輸出資料到螢幕,而不需要手動呼叫open()函式。當我們繼續開啟其他檔案時,系統會為該檔案分配新的fd,新的fd是該程序的檔案描述符表中值最小的可用檔案描述符,即每次分配的fd值自增++。Linux 中一個程序預設最多能開啟1024(NR_OPEN_DEFAULT)個檔案,因此預設fd取值範圍為0-1023。
select與fd又有什麼關係呢?我們可以想到select函式定會有一個輸入引數(後面可以看到有三個相關引數),用來儲存需要監聽的fd,那麼這個引數是什麼型別呢,如何能儲存需要監聽的多個fd呢?答案就是用1024個bit位來儲存。由於fd範圍是0-1023,我們可以以fd為索引,當需要監聽該fd時,將對應索引的bit位置1,當不需要監聽該fd時,將對應索引的bit位置0,這樣就可以儲存所有設定的監聽描述符了。如果1024個bit位全部置1,表示最多可同時監聽1024個fd,這就是為什麼select預設最多隻能監聽1024個fd的原因了。那1024個bit以什麼方式存在呢?為了後續處理效率,系統選用unsigned long型陣列為載體來儲存,即unsigned long fds[1024 / (8 * sizeof(long))]。如果我們將1024這個值用巨集定義__FD_SETSIZE來指示,即為unsigned long fds[__FD_SETSIZE / (8 * sizeof(long))],是不是有些眼熟,這不就是最開始__kernel_fd_set/fd_set的定義嘛!由此我們知道,select就是使用unsigned long陣列來儲存監聽的fd,陣列中的每個unsigned long以及unsigned long 中的每個bit,都表示一個fd是否被監聽。假設一個long 4個位元組,則fd_set圖示如下:
上述過程我們發現有兩個巨集定義,__FD_SETSIZE 和 NR_OPEN_DEFAULT,那麼這兩個值與select有什麼關係呢?NR_OPEN_DEFAULT 表示一個程序最多可以開啟的檔案個數,即約束了fd的取值為 0 - NR_OPEN_DEFAULT-1,而 __FD_SETSIZE 表示 select 用於儲存fd所用的bit位個數。如果NR_OPEN_DEFAULT > __FD_SETSIZE, 則select可能無法監聽所有開啟的檔案,即fd範圍在 __FD_SETSIZE - NR_OPEN_DEFAULT之間的檔案無法監聽。如果NR_OPEN_DEFAULT < __FD_SETSIZE, 則select可以監聽所有的檔案,只是會有多餘無用的bit位。兩個巨集定義預設值都為1024,如果修改需要重新編譯系統,並且最好以sizeof(long)為單位進行修改,後面可以看到如果取值不合適,會影響select的執行效率。
b.FD_ZERO、FD_SET、FD_CLR、FD_ISSET
瞭解了fd_set的結構和實現之後,再來看看跟它相關的幾個常見的函式,主要就是對特定的bit位進行置1,置0操作,以及查詢某個bit位當前是置1還是置0等:
void FD_ZERO(fd_set *fdset); // fd_set中的所有bit位置0
void FD_SET(int fd, fd_set *fdset); // fd_set中以檔案描述符fd為索引的bit位置1
void FD_CLR(int fd, fd_set *fdset); // fd_set中以檔案描述符fd為索引的bit位置0
int FD_ISSET(int fd, fd_set *fdset); // 查詢fd_set中以fd值為索引的bit位是否置位,用於判斷fd是否發生了事件
- select 函式原型
瞭解了fd_set的實現之後,我們就可以來看看select函式的原型聲明瞭。其函式宣告如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
a.nfds
後面我們會分析到,select內部是通過迴圈判斷fd_set中所有置1的bit位所對應的fd是否有事件發生,當最高置1的bit位判斷完成後,迴圈就可以提前結束了,因為再往高位所有的bit位都置0了,也就是那些fd並沒有被監聽,因此可以提前退出迴圈提高效率。如下圖所示:
nfds引數即表示查詢完所有置1的bit位所需要的迴圈次數,由於索引值就是fd的值,因此該引數取值通常為所有的監聽描述符中最大值+1。當然直接取1024也沒有問題,只是select內部可能會多做些無用的迴圈判斷,稍稍影響些效率。
b.readfd/writefds/exceptfds
三個監聽檔案描述符集合,可分別設定讀/寫/異常事件的監聽描述符集合,如果所有fd均不需要監聽某個時間則該集合可以設為NULL
c.timeout
超時時間,通過s和us兩個欄位設定,它表示如果所有的fd都沒有任何事件發生的時候,函式返回之前需要等待的時間。引數有三種取值對應不同的處理機制:
1. NULL, 表示永遠等待下去,直到有一個或多個fd發生了事件函式才會返回,返回值為所有fd發生的所有事件的個數之和
2. 具體等待時間,即s和us至少一個不為0,表示當所有的fd都沒有事件發生的時候,函式返回前需要等待的時間。在等待期間如果任何一個fd發生任何一個事件,則函式返回,返回值為所有fd發生的所有事件的個數之和;如果時間到了仍沒有fd發生任何事件,則函式返回0
3. 不等待,s和us均為0,即非阻塞方式,表示檢查完所有監聽的fd,不管有沒有事件發生函式均立即返回。如果有事件發生,則返回值為所有fd發生的所有事件的個數之和;如果沒有任何fd發生任何事件,則函式返回0
d.返回值
上面已經提到過了,返回值表示所有fd發生的所有事件個數之和。由於返回值僅僅是一個int值,因此select只能告訴我們總共有多少個事件發生了,但不能告訴我們是哪個fd發生了事件(可能是一個,多個,甚至全部),如果同一個fd發生了多個事件,也不能告訴我們具體發生的是哪些事件。所以當select返回後,使用者只能通過FD_ISSET函式迴圈判斷每個fd是否有事件發生,如果有事件發生再判斷該fd發生的是可讀、可寫還是異常事件,然後再進行相應的讀寫操作。所以select具有O(n)的輪詢複雜度,監聽的fd越多迴圈時間就越長。後面當分析epoll的實現的時候,我們會發現epoll只返回發生事件的fd資訊,使用者可以直接進行讀寫操作,大大提高了效率
-
core_sys_select 解析
int core_sys_select(int n, fd_set *inp, fd_set *outp, fd_set exp, struct timespec64 end_time)
{
...
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; // SELECT_STACK_ALLOC = 256
size = FDS_BYTES(n); // n個fd,n個bit,使用long型陣列儲存,需要陣列長度 size = (n+8sizeof(long)-1)/8(sizeof(long))
bits = stack_fds; // 核心預分配的long型陣列空間,共256個位元組,陣列長度 256/4 = 64fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2size;
fds.res_in = bits + 3size;
fds.res_out = bits + 4size;
fds.res_ex = bits + 5size; // 6個long型陣列指標,分別用於儲存輸入和輸出的in/out/ex描述符集合get_fd_set(n, inp, fds.in);
get_fd_set(n, outp, fds.out);
get_fd_set(n, exp, fds.ex); // 使用者監聽的三組fd分別拷貝到核心態,每組拷貝size個long型長度zero_fd_set(n, fds.res_in); // 清空用於儲存結果的三組long型陣列,每組清空size個long型長度
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);ret = do_select(n, &fds, end_time); // 迴圈查詢所有fd的驅動poll函式,檢查是否有可讀可寫事件,有則返回,無則進入睡眠等待超時或等待事件發生
if (ret <= 0)
return ret; // 返回0表示沒有任何fd有事件發生,<0表示發生錯誤set_fd_set(n, inp, fds.res_in);
set_fd_set(n, outp, fds.res_out);
set_fd_set(n, exp, fds.res_ex); // 拷貝儲存結果的三組long型陣列資料到使用者態return ret; // 返回所有fd發生的所有事件個數之和
}
a.FDS_BYTES(n)
我們先來看巨集定義FDS_BYTES(n),經過多層巨集定義呼叫,最終為:
define FDS_BYTES(n) = (n+8sizeof(long)-1)/8(sizeof(long))
即n個bit位由long型陣列來儲存所需要的陣列長度。我們以long為4個位元組來說明,那麼1-32個bit位只需要長度為1的long型陣列,如果有33個bit位,則就需要長度為2的long型陣列了。因此如前所述,當我們在給select傳入第一個引數nfds的時候,該引數取值最好為所有監聽fd中最大值+1,讓系統分配能存下nfds個bit位的最小long型陣列長度,如果直接傳入1024,這裡就會多分配無用的陣列長度,後續也會多做無用的迴圈判斷
b.stack_fds
stack_fds就是系統預先分配的long型陣列空間,用於儲存所有需要監聽的fd,以及用於儲存查詢結果,返回讀,寫,異常事件的查詢結果。即long型陣列空間所需長度為6FDS_BYTES(n),如果nfds取值最大1024,即程序所有開啟的1024個fd全部進行可讀可寫和異常監聽,那麼此時FDS_BYTES(n) = (1024+8sizeof(long)-1)/8(sizeof(long)) = 32,即儲存監聽可讀事件的fd需要長度為32的long型陣列,加上監聽可寫和異常事件的fd,以及返回結果的儲存,總共需要長度為 326 = 192的long型陣列,而long型陣列stack_fds預設長度為 256/4 = 64,因此當這個預設長度儲存不下時,就需要對陣列長度進行擴充套件,擴充套件為6*FDS_BYTES(n)。原始碼如下:
...
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6)
{
/* Not enough space in on-stack array; must use kmalloc */
...
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
}
c. 函式流程圖
當long型陣列stack_fds空間足夠了之後,通過6個間距為FDS_BYTES(n)的long型指標,標記6段長度為FDS_BYTES(n)的long型陣列空間,用來儲存所有輸入和輸出的fd。函式處理流程如下圖:
通過get_fd_set函式將需要監聽可讀可寫和異常事件的fd拷貝到三個輸入long型陣列空間fds->in/out/ex,通過zero_fd_set先將三個輸出long型陣列空間fds->res_in/out/ex清空,用於儲存查詢結果
通過do_select函式迴圈查詢所有fd發生的事件,如果返回值<0表示內部發生異常,如果=0表示沒有事件發生,如果>0表示共發生了多少個事件
將查詢結果賦值到三個輸出long型陣列空間fds->res_in/out/ex
通過set_fd_set將查詢結果從輸出long型陣列空間fds->res_in/out/ex拷貝回用戶輸入的fd_set,然後返回發生的事件個數,並作為select函式的最終返回值
由第4步可知,select返回監聽結果是複用輸入的fd_set的,因此當呼叫完一次select函式後,需要重新置位需要監聽的fd。否則假設我們要監聽5個fd,結果第一次select返回發現其中只有2個fd有事件發生,這時另外3個fd標誌位是被清零了,如果下次呼叫select之前不重新置位,那麼這3個fd就不會再進入查詢迴圈中,即便有事件發生,select函式再也不會返回這些fd的結果了。因此在呼叫完select之後,一定要將需要監聽的fd重新置位,具體可參考後面的示例程式
-
do_select 解析
通過core_sys_select函式的解析,我們發現真正完成查詢事件發生結果的是do_select函式。該函式首先獲取三個輸入long型陣列,獲取需要查詢的fd,查詢完成後,再將結果寫入到三個輸出long型陣列中。下面我們通過5種場景,分別分析各個場景的處理機制: -
有fd收到資料
-
沒有fd收到資料,超時時間設為0,即不等待
-
沒有fd收到資料,超時時間設為NULL,即永遠等待直到有fd收到資料
-
沒有fd收到資料,超時時間設為具體值,定時結束之前有fd收到資料
-
沒有fd收到資料,超時時間設為具體值,定時結束之前沒有fd收到資料
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
...
int timed_out = 0;
u64 slack = 0;
// 註冊_qproc函式,用於查詢事件發生結果時將當前程序加入讀寫等待佇列
poll_initwait(&table);
// 我們有如下5種場景,下面分析一下分別會有什麼樣的處理
// 1. 有fd收到資料
// 2. 沒有fd收到資料,超時時間設為0,即不等待
// 3. 沒有fd收到資料,超時時間設為NULL,即永遠等待直到有fd收到資料
// 4. 沒有fd收到資料,超時時間設為具體值,定時結束之前有fd收到資料
// 5. 沒有fd收到資料,超時時間設為具體值,定時結束之前沒有fd收到資料
// 對於場景2,超時時間設為0,設定timed_out = 1,該標誌位用於場景2退出for (;;)迴圈
if (end_time && !end_time->tv_sec && !end_time->tv_nsec)
timed_out = 1;
// 對於場景4和5,超時時間設為具體值,則將超時時間結構體轉為U64整數,便於後續定時器處理
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
retval = 0; // 統計所有fd發生的所有事件個數之和,也是select函式的返回值
for (;;) // 不同場景有不同的機制來退出該死迴圈
{
// 6個long型指標inp/outp/exp/rinp/routp/rexp,分別指向stack_fds中用於輸入和輸出的in/out/ex檔案描述符集合
inp = fds->in;
outp = fds->out;
exp = fds->ex;
rinp = fds->res_in;
routp = fds->res_out;
rexp = fds->res_ex;
// 迴圈遍歷所有的n個bit位
for (i = 0; i < n; ++rinp, ++routp, ++rexp)
{
unsigned long in, out, ex, all_bits, bit = 1;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
// n個bit位以long型為單位進行多組(即32個為一組)迴圈遍歷
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) { // 該組的32(BITS_PER_LONG)個fd都不需要監聽in/out/ex事件,則調過該組迴圈
i += BITS_PER_LONG;
continue;
}
// 迴圈遍歷一個組的32個bit位
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1)
{
// 當遍歷的是最後一組時,該組fd個數可能不到32個了,則遍歷完最後一個fd就可以提前退出該組迴圈
if (i >= n)
break;
// 該bit位對應的fd沒有註冊監聽in/out/ex事件,則跳過該bit位迴圈
if (!(bit & all_bits))
continue;
f = fdget(i); // 通過fd值獲取file結構
// vfs_poll呼叫驅動程式 file->f_op->poll(file, pt),查詢該fd發生的事件,驅動函式poll功能:
// 1. 返回該fd發生的in/out/ex事件,用mask標識發生的事件
// 2. 將當前程序加入到該fd的讀寫等待佇列,當程序喚醒後再從等待佇列中移除該程序相關的節點
mask = vfs_poll(f.file, wait);
// 當fd發生了in/out/ex事件,且使用者設定監聽該事件,則儲存查詢結果:
// 1. 儲存發生事件的fd值
// 2. retval++,即所有fd發生的所有事件個數之和,即如果同一個fd發生2個事件,統計結果是+2而不是+1,這就是select返回值的意義
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
}
// 迴圈遍歷完該組的32個fd後,將發生事件的fd分別寫到用於儲存in/out/ex結果的三組long型陣列中
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
}
// 場景1,有事件發生,retval不為0,退出for (;;)迴圈,函式返回事件發生個數之和
// 場景2,沒有事件發生,retval為0,但因為timed_out已被設為1,因此退出for (;;)迴圈,函式返回事件發生個數之和,即為0
if (retval || timed_out || signal_pending(current))
break;
// 場景3,4,5,沒有事件發生,則需等待
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))
{
// 設定當前程序為睡眠狀態
set_current_state(TASK_INTERRUPTIBLE);
// 場景3,沒有事件發生,永遠等待直到有任何fd收到資料
if (!expires)
// 該程序移除執行佇列,同時主動釋放CPU控制權讓排程器排程執行另一個程序
// 該程序將不再被排程,直到有fd收到資料時其驅動程式將該程序喚醒
schedule();
return -EINTR;
// 場景4和5,沒有事件發生,開啟定時器等待特定時間
// 設定定時時間
hrtimer_set_expires_range_ns(&t.timer, *expires, delta);
// 初始化定時器
hrtimer_init_sleeper(&t, current);
t->timer.function = hrtimer_wakeup; // 設定超時回撥函式hrtimer_wakeup
t->task = current; // 場景4,設定t->task指向當前程序,用於poll_schedule_timeout判斷程序喚醒是因為有fd收到資料
// 如果直到定時器超時都沒有收到資料,則觸發超時回撥函式
static enum hrtimer_restart hrtimer_wakeup(struct hrtimer *timer)
t->task = NULL; // 場景5,設定t->task為空,用於poll_schedule_timeout判斷程序喚醒是因為超時
wake_up_process(task); // 喚醒當前程序
// 啟動定時器
hrtimer_start_expires(&t.timer, mode);
// 該程序移除執行佇列,同時主動釋放CPU控制權讓排程器排程執行另一個程序
// 該程序將不再被排程,直到有fd收到資料或者定時器超時將該程序喚醒
schedule();
set_current_state(TASK_RUNNING); // 程序被驅動程式或超時回撥函式喚醒,退出睡眠狀態
// 根據t->task值判斷程序喚醒是因為超時還是因為有fd收到資料
// 場景4,超時前有fd收到資料,不觸發回撥函式,t->task沒有被置空,poll_schedule_timeout返回-EINTR
// 場景5,一直沒有fd收到資料,觸發回撥函式,t->task被置空,poll_schedule_timeout返回0
return !t.task ? 0 : -EINTR;
// 根據poll_schedule_timeout返回值判斷程序喚醒是否因為超時,如果是則設定timed_out = 1
timed_out = 1;
}
/*
至此,我們完成了第一次for (;;)迴圈
1. 對於場景1,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和
2. 對於場景2,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和,此時和為0
3. 對於場景3,4,5,在第一次for (;;)迴圈結束後並不能退出死迴圈,而是接著進入第二次迴圈,我們來分析在第二次迴圈的處理:
3.1. 場景3,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和
3.2. 場景4,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和
3.3. 場景5,超時後timed_out被置1,進入第二次for (;;)迴圈後,回到場景2,退出死迴圈返回事件發生個數之和,此時和為0
因此,對於所有場景,經過一次或者兩次for (;;)迴圈後,均能退出死迴圈,返回事件發生個數之和
*/
}
poll_freewait(&table); // 程序喚醒後,移除所有fd的讀寫等待佇列中該程序相關的表項
return retval; // 返回所有fd發生的所有事件的個數之和
}
主要的處理邏輯在註釋中已做說明,do_select主要處理邏輯就是:迴圈遍歷所有的n個bit位對應的fd,通過呼叫其驅動函式poll獲取該fd是否有事件發生:
對於場景1,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和
對於場景2,第一次for (;;)迴圈就能退出死迴圈,返回事件發生個數之和,此時和為0
對於場景3,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和
對於場景4,fd收到了資料後進入第二次for (;;)迴圈,回到場景1,退出死迴圈返回事件發生個數之和
對於場景5,超時後timed_out被置1,進入第二次for (;;)迴圈後,回到場景2,退出死迴圈返回事件發生個數之和,此時和為0
- 裝置驅動函式poll示例
在上一節中,我們知道do_select函式通過呼叫驅動函式poll來獲取fd發生的事件,那麼驅動函式poll是如何知道該fd上有沒有事件發生呢,以及如何將程序新增到自己的讀寫等待佇列的呢?我們來分析一個例子,該裝置用一個環形佇列作為輸入輸出快取,當環形佇列還有空間時,則設定可寫事件,當佇列有資料時,則設定可讀事件:
static unsigned int scull_p_poll(struct file *filp, poll_table *wait)
{
unsigned int mask = 0;
...
poll_wait(filp, &dev->inq, wait); // 將當前程序加入fd讀寫等待佇列
poll_wait(filp, &dev->outq, wait);
if (dev->rp != dev->wp)
mask |= POLLIN | POLLRDNORM; // 發生可讀事件
if (spacefree(dev))
mask |= POLLOUT | POLLWRNORM; // 發生可寫事件
return mask; // 返回fd發生的事件
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p); // 呼叫註冊的_qproc函式
}
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait); // 註冊_qproc函式
pwq->polling_task = current; // 賦值當前程序,用於將當前程序加入fd讀寫等待佇列
...
}
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
...
add_wait_queue(wait_address, &entry->wait); // 將包含當前程序的節點新增到fd讀寫等待佇列
}
驅動函式呼叫poll_wait將當前程序加入fd讀寫等待佇列,poll_wait呼叫poll_initwait函式中註冊的__pollwait函式,而在__pollwait函式中,通過呼叫add_wait_queue函式才真正的將當前程序新增到fd讀寫等待佇列
- 程序喚醒
我們思考一個問題,對於場景3和4,當程序處於睡眠狀態的時候,如果這個時候任何一個fd上發生了事件,程序都將會喚醒,進而函式返回。這是如何實現的呢,為什麼任何fd發生了事件都可以喚醒程序呢?
上面分析驅動函式poll的功能的時候我們知道,在呼叫每個fd的poll函式的時候,會將當前程序加入到該fd的讀寫等待佇列,當完成一次for (;;)迴圈之後,該程序就被加入到所有fd的讀寫等待隊列了。當任意一個fd收到收據,都會喚醒自己讀寫等待佇列裡的所有程序,這就實現了當任何一個fd可讀可寫時就能立即喚醒程序的原理
我們以一個串列埠裝置為例,當硬體裝置收到資料時,觸發interrupt中斷處理,經過一系列的處理,最終呼叫到receive_buf函式來喚醒程序:
serial8250_interrupt -> serial8250_handle_port -> receive_chars -> tty_flip_buffer_push -> flush_to_ldisc -> disc->receive_buf
disc->receive_buf()
if (waitqueue_active(&tty->read_wait))
wake_up_interruptible(&tty->read_wait);
在receive_buf函式中,首先判斷是否有程序阻塞在自己的讀寫等待佇列中,如果有則呼叫wake_up_interruptible函式喚醒這些程序。當程序喚醒後,select函式返回之前,通過呼叫poll_freewait()函式移除所有fd的讀寫等待佇列中該程序相關的表項
- select 示例程式
include <sys/select.h>
include <stdio.h>
include <sys/types.h>
include <sys/stat.h>
include <fcntl.h>
include <unistd.h>
int main(int argc, char *argv[])
{
fd_set rfds;
struct timeval tv;
int fd;
mkfifo("test_fifo", 0666);
fd = open("test_fifo", O_RDWR);
while(1)
{
FD_SET(0, &rfds); // 每次select呼叫之前都需要重新設定監聽描述符和超時引數
FD_SET(fd, &rfds);
tv.tv_sec = 3;
tv.tv_usec = 0;
int ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
if (-1 == ret) // 異常
{
perror("select()");
}
else if (0 == ret) // 超時
{
printf("timeout\n");
}
else if (ret > 0) // 有fd收到資料
{
char buf[128] = {0};
if (FD_ISSET(0, &rfds)) // 通過FD_ISSET判斷是哪個fd發生了事件
{
read(0, buf, sizeof(buf));
printf("stdin = %s\n", buf);
}
else if (FD_ISSET(fd, &rfds))
{
read(fd, buf, sizeof(buf));
printf("fifo = %s\n", buf);
}
}
}
return 0;
}
- select 特點
優點
a. select目前幾乎在所有的平臺上支援,有良好跨平臺支援特性,在某些Unix系統上不支援poll()和epoll()
b. select對於超時提供了更好的精度,微秒級,而poll是毫秒
缺點
a. 每次呼叫 select,都需要把 fd 從使用者態拷貝到核心態,然後在核心輪詢遍歷所有 fd,再把結果拷貝回用戶態。返回後用戶還需要輪詢fd_set來知道哪些fd是有事件發生的,fd多的時候開銷很大
b. 單個程序能夠監視的檔案描述符的數量存在最大限制,在 Linux 上一般為 1024,可以通過修改巨集定義重新編譯核心的方式提升這一限制,但是這樣也會造成效率的降低
c. select返回結果時複用了使用者註冊監聽事件的fd_set結構,因此每次呼叫select前需要重新註冊監聽事件。超時引數在返回時也是未定義的,每次呼叫select之前也需要重新設定超時引數