1. 程式人生 > 其它 >2.do_select函式分析

2.do_select函式分析

技術標籤:LiNUXlinux

do_select函式的執行過程

  • 1 先把全部fd掃一遍
  • 2 如果發現有可用的fd,跳到5
  • 3 如果沒有,當前程序去睡眠xx秒
  • 4 xx秒後自己醒了,或者狀態變化的fd喚醒了自己,跳到1
  • 5 結束迴圈體,返回

核心過程

  1. poll_initwait():設定poll_wqueues->poll_table的成員變數poll_queue_proc為__pollwait函式;同時記錄當前程序task_struct記在pwq結構體的polling_task。
  2. f_op->poll():會呼叫poll_wait(),進而執行上一步設定的方法__pollwait();
    __pollwait():設定wait->func喚醒回撥函式為pollwake函式,並將poll_table_entry->wait加入等待佇列
  3. poll_schedule_timeout():該程序進入帶有超時的睡眠狀態

之後,當其他程序就緒事件發生時便會喚醒相應等待佇列上的程序。比如監控的是可寫事件,則會在write()方法中呼叫wake_up方法喚醒相對應的等待佇列上的程序,當喚醒後執行前面設定的喚醒回撥函式pollwake函式。

  1. pollwake():詳見pollwake函式喚醒過程分析一篇
  2. poll_freewait():當程序喚醒後,將就緒事件結果儲存在fds的res_in、res_out、res_ex,然後把該等待佇列從該佇列頭中移除。
  3. 回到core_sys_select(),將就緒事件結果拷貝到使用者空間。

原理

在一個迴圈中對每個需要監聽的裝置呼叫它們自己的 poll 支援函式(核心最終會相應呼叫 poll_wait(), 把當前程序新增到相應裝置的等待佇列上,然後將該應用程式程序設定為睡眠狀態)以使得當前程序被加入各個裝置的等待佇列。

若當前沒有任何被監聽的裝置就緒,則核心進行排程(呼叫 schedule)讓出 cpu 進入阻塞狀態,超時schedule 返回時將再次迴圈檢測是否有操作可以進行,如此反覆;若有任意一個裝置就緒,呼叫wake up 喚醒使用者當前程序,select/poll 便立即返回,使用者程序獲得了可讀或可寫的fd。

do_select函式分析

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
	ktime_t expire, *to = NULL;
	struct poll_wqueues table;
	poll_table *wait;
	
	·····(Part omitted)

	rcu_read_lock();
	retval = max_select_fd(n, fds); // 
	rcu_read_unlock();

    ·····(Part omitted)

	poll_initwait(&table);
	wait = &table.pt;

    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        wait = NULL;
        timed_out = 1;     // 如果系統呼叫帶進來的超時時間為0,那麼設定timed_out = 1,表示不阻塞,直接返回。
    }

    if (end_time && !timed_out)
       slack = estimate_accuracy(end_time); // 超時時間轉換

	retval = 0; // retval用於儲存已經準備好的描述符數,初始為0
	for (;;) {
		·····(Part omitted)

		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;

			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			if (all_bits == 0) {  // in、out、exp集合都無監控事件,即退出這一輪的事件檢測
				i += BITS_PER_LONG;  // 一輪檢測BITS_PER_LONG個bit位
				continue;
			}

			for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
				struct fd f;
				····(Part omitted)
				// 通過fd(即i),主要是獲取當前使用者程序中的fd對應的檔案物件即socket物件,increases the reference count
				f = fdget(i); 
				if (f.file) {
					const struct file_operations *f_op; // 獲取socket檔案物件的驅動程式檔案操作表指標
					f_op = f.file->f_op;
					mask = DEFAULT_POLLMASK;
					if (f_op->poll) { // socket檔案物件,其f_op->poll對應的函式是sock_poll
						wait_key_set(wait, in, out,
							     bit, busy_flag);
		                // 在迴圈中對每個需要監聽的裝置呼叫它們自己的poll方法使得當前程序被加入各個裝置的等待佇列。
						// 事件mask:通過sock_poll函式(socket檔案的poll方法)檢查檔案是否能夠進行IO操作,返回當前裝置fd的狀態(如是否可讀可寫)
						mask = (*f_op->poll)(f.file, wait); 
					}
					fdput(f); /*Release the reference to file, that is, reduce the reference count f_count */
					/*通過判斷socket的可讀寫狀態來把socket放置到合適的返回集合中。
					如果socket可讀,那麼就把socket放置到可讀集合中,如果socket可寫,那麼就放置到可寫集合中*/
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit; // 如果是這個描述符可讀, 置位
						retval++;      // 返回描述符個數加1
						wait->_qproc = NULL;
					}
					····(Part omitted)
					/* got something, stop busy polling */
					if (retval) {
						can_busy_loop = false;
						busy_flag = 0;

					/*
					 * only remember a returned
					 * POLL_BUSY_LOOP if we asked for it
					 */
					} else if (busy_flag & mask)
						can_busy_loop = true;

				}
			}
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
			cond_resched();
		}
		wait->_qproc = NULL;  // 避免應用程序被喚醒之後再次呼叫pollwait()的時候重複地呼叫函式__pollwait()
		/*retval儲存了檢測到的可操作的檔案描述符的個數。如果有檔案可操作或超時或收到signal,則跳出for(;;)迴圈系統呼叫結束,直接返回*/
		/* timed_out為1時,表明程序睡眠延時到期, 系統呼叫結束*/
		if (retval || timed_out || signal_pending(current))
			break;
		if (table.error) {
			retval = table.error;  // 返回錯誤碼給使用者程序,核心會根據錯誤碼判斷是否重新執行系統呼叫
			break;
		}

		/* only if found POLL_BUSY_LOOP sockets && not out of time */
		if (can_busy_loop && !need_resched()) {
			if (!busy_end) {
				busy_end = busy_loop_end_time();
				continue;
			}
			if (!busy_loop_timeout(busy_end))
				continue;
		}
		busy_flag = 0;

		/*
		 * If this is the first loop and we have a timeout
		 * given, then we convert to ktime_t and set the to
		 * pointer to the expiry value.
		 */
		if (end_time && !to) {
			expire = timespec_to_ktime(*end_time);
			to = &expire;
		}
		
       // 第一次迴圈中,當前使用者程序從這裡進入休眠,上面傳下來的超時時間只是為了用在睡眠超時這裡而已
       // 程序睡眠超時,poll_schedule_timeout()返回0;被喚醒時返回-EINTR
		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
					   to, slack))
			timed_out = 1;  /* 超時後,將其設定成1,方便後面退出迴圈返回到上層 */
	}
   
	poll_freewait(&table);  // 清理各個驅動程式的等待佇列頭,同時釋放掉所有空出來的poll_table_page頁(包含的poll_table_entry)

	return retval;
}

static int max_select_fd(unsigned long n, fd_set_bits *fds)

max_select_fd返回已在fd_set中開啟且小於使用者指定的最大值的fd

poll_initwait(&table)

該呼叫的作用:將當前使用者程序放入自己的waiting queue table中,並將waiting queue新增到test table 中等待
poll_initwait函式初始化一個poll_wqueues變數table:
poll_initwait —> init_poll_funcptr(&pwq->pt, __pollwait); —> pt->qproc = qproc;
即table->pt->qproc = __pollwait,__pollwait將在驅動的poll函式裡用到

void poll_initwait(struct poll_wqueues *pwq)
{
	init_poll_funcptr(&pwq->pt, __pollwait); // 註冊_pollwait回撥函式
	pwq->polling_task = current;  // 指定當前使用者程序
	pwq->triggered = 0;
	pwq->error = 0;
	pwq->table = NULL;
	pwq->inline_index = 0;
}

sock_poll函式

是socket檔案物件的poll方法(驅動程式操作),用於查詢socket檔案物件是否可讀可寫(繼續呼叫tcp_poll才能得到)

static unsigned int sock_poll(struct file *file, poll_table *wait)
{
	unsigned int busy_flag = 0;
	struct socket *sock;

	/*
	 *      We can't return errors to poll, so it's either yes or no.
	 */
	sock = file->private_data;

	if (sk_can_busy_loop(sock->sk)) {
		/* this socket can poll_ll so tell the system call */
		busy_flag = POLL_BUSY_LOOP;

		/* once, only if requested by syscall */
		if (wait && (wait->_key & POLL_BUSY_LOOP))
			sk_busy_loop(sock->sk, 1);
	}

	return busy_flag | sock->ops->poll(file, sock, wait); // 對應 TCP 型別的socket,這個poll介面對應的是 tcp_poll() 函式
}

tcp_poll函式 - Wait for a TCP event

tcp_poll() 函式通過呼叫 sock_poll_wait() 函式把程序新增到socket的等待佇列中,
然後檢測socket是否可讀寫,並通過mask返回可讀寫的狀態。
所以在 do_select() 函式中的 mask = file->f_op->poll(file, wait); 這行程式碼其實呼叫的是tcp檔案物件的poll方法(驅動程式操作): tcp_poll() 函式。

unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
	unsigned int mask;
	struct sock *sk = sock->sk;
	const struct tcp_sock *tp = tcp_sk(sk);

	sock_rps_record_flow(sk);
	// sk_sleep(sk)獲取socket的wait_queue_head_t等待佇列頭
    // 每個socket自己都帶有一個等待佇列socket_wq,所以“裝置的等待佇列”不止一個
	sock_poll_wait(file, sk_sleep(sk), wait);
	····(Part omitted)
	
	return mask;
}

sock_poll_wait

中間函式,主要作用是呼叫poll_wait函式,並將memory barrier置於poll_wait呼叫之後,確保寫入完成?

static inline void sock_poll_wait(struct file *filp,
		wait_queue_head_t *wait_address, poll_table *p)
{
	if (!poll_does_not_wait(p) && wait_address) {
		poll_wait(filp, wait_address, p);   // 把檔案新增到sk->sleep佇列中進行等待
		/* We need to be sure we are in sync with the
		 * socket flags modification.
		 *
		 * This memory barrier is paired in the wq_has_sleeper.
		 */
		smp_mb(); // 完全記憶體屏障
	}
}

poll_wait函式

poll_wait 函式所做的工作是把當前使用者程序新增到 wait 引數指定的等待佇列中。
poll_wait 函式並不阻塞,真正的阻塞動作是上層的 select/poll 函式中完成的。

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    // poll_table在睡眠之前保證其為有效地址,而在喚醒之後保證傳入的poll_table地址是NULL  
    // 因為在喚醒之後,再次呼叫fop->poll()的作用只是為了再次檢查裝置的事件狀態而已  
    // 如果驅動程式中沒有提供等待佇列頭wait_address,那麼將不會往下執行p->qproc
	if (p && p->_qproc && wait_address) 
		p->_qproc(filp, wait_address, p); // 回撥函式:p->qproc就是之前poll_initwait初始化poll_wqueues時註冊的__pollwait
}

__pollwait函式

由__pollwait函式實際執行將使用者程序插入socket的等待佇列的動作
核心呼叫系統呼叫引用的所有檔案的poll方法,並將相同的poll_table傳遞給每個檔案,而poll_table的核心就是__pollwait

__pollwait()函式呼叫時需要3個引數,第一個是特定fd對應的file結構體指標,第二個就是特定fd對應的硬體驅動程式中的等待佇列頭指標,第3個是呼叫select()的應用程序中poll_wqueues結構體的poll_table項(該程序監測的所有fd呼叫fop->poll函式都用這一個poll_table結構體)。

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); // 使用container_of求出poll_wqueues的地址  
	struct poll_table_entry *entry = poll_get_entry(pwq); // 分配一個poll_table_entry
	if (!entry)
		return;
	// 初始化一個poll_table_entry
	entry->filp = get_file(filp); // 將filp的引用計數加1,filp這裡是socket檔案物件
	entry->wait_address = wait_address;  // 設定來自裝置驅動程式的等待佇列頭
	entry->key = p->_key; // 設定對該fd事件關心的mask
	init_waitqueue_func_entry(&entry->wait, pollwake);  // 初始化佇列等待項,pollwake是喚醒該佇列等待項呼叫的函式
	entry->wait.private = pwq; // 將poll_wqueues作為該等待佇列項的私有資料
	add_wait_queue(wait_address, &entry->wait); // 將該等待佇列項新增到等待佇列頭中去
}
static inline void
init_waitqueue_func_entry(wait_queue_t *q, wait_queue_func_t func)
{
	q->flags	= 0;
	q->private	= NULL;
	q->func		= func; // //設定喚醒回撥函式
}
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
	unsigned long flags;

	wait->flags &= ~WQ_FLAG_EXCLUSIVE;
	spin_lock_irqsave(&q->lock, flags);
	__add_wait_queue(q, wait);
	spin_unlock_irqrestore(&q->lock, flags);
}

static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
	list_add(&new->task_list, &head->task_list);
}
void poll_freewait(struct poll_wqueues *pwq)
{
    struct poll_table_page * p = pwq->table;
    int i;
    for (i = 0; i < pwq->inline_index; i++)
        free_poll_entry(pwq->inline_entries + i);
    while (p) {
        struct poll_table_entry * entry;
        struct poll_table_page *old;

        entry = p->entry;
        do {
            entry--;
            free_poll_entry(entry);
        } while (entry > p->entries);
        old = p;
        p = p->next;
        free_page((unsigned long) old);
    }
}

static void free_poll_entry(struct poll_table_entry *entry)
{
    //從等待佇列中移除wait
    remove_wait_queue(entry->wait_address, &entry->wait);
    fput(entry->filp);
}

poll系統呼叫的內部資料結構

1. poll_wqueues

每一個select系統呼叫只有一個 poll_wqueues,記錄相關I/O裝置的等待佇列 ,對外介面是 poll_table pt 的地址
當select()退出迴圈體返回時,它要把當前程序從全部等待佇列中移除——這些裝置再也不用著去喚醒當前隊列了。

struct poll_wqueues {
	poll_table pt;  //包含一個函式指標,通常指向__pollwait或null
	struct poll_table_page *table;  // 如果inline_entries空間不夠用,後續會動態申請實體記憶體頁以連結串列的形式掛載poll_wqueues.table上統一管理
	struct task_struct *polling_task;  // 指向當前使用者程序
	int triggered;  // 當前使用者程序被喚醒後置成1,以免該程序接著進睡眠
	int error;  // 錯誤碼
	int inline_index;  // 陣列inline_entries的引用下標
	struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; // 內嵌的poll_table_entry陣列inline_entries[] 的大小有限
};

對每一個fd呼叫fop->poll() -> poll_wait() -> __pollwait()都會先從poll_wqueues. inline_entries[]中分配一個poll_table_entry結構體,直到該陣列用完才會分配物理頁掛在連結串列指標poll_wqueues.table上, 然後才會分配一個poll_table_entry結構體

2. poll_table

poll_table 結構就是為了把程序新增到socket的等待佇列中而創造的
poll_table 會被傳遞給所有的檔案物件,最終檔案物件的驅動程式操作 poll方法 會呼叫poll_table指向的__pollwait函式

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
typedef struct poll_table_struct {
	poll_queue_proc _qproc;  // 函式指標,指向__pollwait函式
	unsigned long _key; // // 等待特定fd對應硬體裝置的事件掩碼,如POLLIN、POLLOUT、POLLERR; wait_key_set函式設定_key
} poll_table;
3. poll_table_page 和 poll_table_entry
// poll_table_page是一個包含poll_table_entry結構記憶體頁連結串列,每一個page佔1個PAGE_SIZE大小的區域
struct poll_table_page {  // 申請的物理頁都會將起始地址強制轉換成該結構體指標 
	struct poll_table_page * next;  // 指向下一個申請的物理頁
	struct poll_table_entry * entry;  // 指向entries[]中首個待分配(空的) poll_table_entry地址
	struct poll_table_entry entries[0];  // 該page頁後面剩餘的空間都是待分配的
};
// 輪詢表條目,分配給特定的select呼叫
struct poll_table_entry {
	struct file *filp;  // 指向特定fd對應的file檔案物件
	unsigned long key;  // 等待特定fd對應硬體裝置的事件掩碼,如POLLIN、POLLOUT、POLLERR;
	wait_queue_t wait;    // 代表呼叫select()的應用程序,等待在fd對應裝置的特定事件(讀或者寫)的等待佇列頭上的等待佇列項;
	wait_queue_head_t *wait_address;  // 裝置驅動程式中特定事件的等待佇列頭(等待佇列有多個wait_queue_t組成,通過雙鏈表連線)
};

特定的硬體裝置驅動程式的事件等待佇列頭是有限個數的,通常有讀事件和寫事件的等待佇列頭;
應用程式可以有多個fd在進行同時監測其各自的事件發生,但該應用程序中每一個fd有多少個poll_table_entry存在,那就取決於fd對應的驅動程式中有幾個事件等待佇列頭了
通常驅動程式的poll函式中需要對每一個事件的等待佇列頭呼叫poll_wait()函式。比如,如果有讀寫兩個等待佇列頭,那麼就在這個應用程序中存在兩個poll_table_entry結構體,在這兩個事件的等待佇列頭中分別將兩個等待佇列項加入
如果有多個應用程序使用selcet()方式同時在訪問同一個硬體裝置,此時硬體驅動程式中加入等待佇列頭中的等待佇列項對每一個應用程式來說都是相同數量的(一個事件等待佇列頭一個,數量取決於事件等待佇列頭的個數)

4. wait_queue_t和wait_queue_head_t

等待隊列表示一組睡眠的程序,當某一條件為真時,由核心喚醒它們
因為程序經常需要等待某event發生,等待佇列實現了在event上的條件等待:
希望等待特定event的程序將自己放入合適的等待佇列,並放棄控制權,進入睡眠。

等待佇列由迴圈連結串列實現,由等待佇列頭(wait_queue_head_t)和等待佇列項(wait_queue_t)組成,其元素(等待佇列項)包含指向程序描述符的指標

當一個程序需要在某個wait_queue_head_t上睡眠時,將自己的程序控制塊資訊封裝到wait_queue_t中,然後掛載到wait_queue_t的連結串列中,如下圖:

typedef struct __wait_queue wait_queue_t;
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);
int default_wake_function(wait_queue_t *wait, unsigned mode, int flags, void *key);

// 等待佇列項
// wait_queue_t 是sleeping processe的等待佇列項,用於引用程序自身
struct __wait_queue {
	unsigned int		flags;  //prepare_to_wait()裡有對flags的操作,檢視以得出其含義
#define WQ_FLAG_EXCLUSIVE	0x01 //一個常數,在prepare_to_wait()用於修改flags的值
	void			*private;   //通常指向當前任務控制塊
	wait_queue_func_t	func;   //喚醒阻塞任務的函式 ,決定了喚醒的方式
	struct list_head	task_list; // 阻塞任務連結串列
};

每個等待佇列都有一個等待佇列頭(wait queue head),使用等待佇列時首先需要定義一個wait_queue_head,這可以通過DECLARE_WAIT_QUEUE_HEAD巨集來完成,這是靜態定義的方法。該巨集會定義一個wait_queue_head,並且初始化結構中的鎖以及等待佇列

// 等待佇列頭
// 該結構體變數通常由interruptible_sleep_on分配在stack上,
struct __wait_queue_head {
	spinlock_t		lock;   //自旋鎖變數,用於等待佇列頭 
	struct list_head	task_list; // a linked list of sleeping processes,list中每個item的型別是wait_queue_t
};
typedef struct __wait_queue_head wait_queue_head_t;

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-PLQSjEvc-1608088685625)(https://i.stack.imgur.com/LNXqH.gif)]

參考資料

  1. wait_queue_head and wait_queue
  2. LDD