1. 程式人生 > 其它 >理解select,poll,epoll實現分析

理解select,poll,epoll實現分析

mark 引用:http://janfan.cn/chinese/2015/01/05/select-poll-impl-inside-the-kernel.html文章

select()/poll() 的核心實現

同時對多個檔案裝置進行I/O事件監聽的時候(I/O multiplexing),我們經常會用到系統呼叫函式select()poll(),甚至是為大規模成百上千個檔案裝置進行併發讀寫而設計的epoll()

I/O multiplexing: When an application needs to handle multiple I/O descriptors at the same time, and I/O on any one descriptor can result in blocking. E.g. file and socket descriptors, multiple socket descriptors

一旦某些檔案裝置準備好了,可以讀寫了,或者是我們自己設定的timeout時間到了,這些函式就會返回,根據返回結果主程式繼續執行。

用了這些函式有什麼好處? 我們自己本來就可以實現這種I/O Multiplexing啊,比如說:

  • 建立多個程序或執行緒來監聽
  • Non-blocking讀寫監聽的輪詢(polling)
  • 非同步I/O(Asynchronous I/O)與Unix Signal事件觸發

想要和我們自己的實現手段做比較,那麼首先我們就得知道這些函式在背後是怎麼實現的。 本文以Linux(v3.9-rc8)原始碼為例,探索select()poll()的核心實現。

select()原始碼概述

首先看看select()函式的函式原型,具體用法請自行輸入命令列$ man 2 select查閱吧 : )

int select(int nfds, 
	    fd_set *restrict readfds, 
	    fd_set *restrict writefds, 
	    fd_set *restrict errorfds, 
	    struct timeval *restrict timeout);

下文將按照這個結構來講解select()在Linux的實現機制。

  1. select()核心入口
  2. do_select()的迴圈體
  3. struct file_operations
    裝置驅動的操作函式
  4. scull驅動例項
  5. poll_wait與裝置的等待佇列
  6. 其它相關細節
  7. 最後

好,讓我們開始吧 : )

select()核心入口

我們首先把目光放到檔案fs/select.c檔案上。

SYSCALL_DEFINE5(select, int, n, 
		fd_set __user *, inp, 
		fd_set __user *, outp, 
		fd_set __user *, exp, 
		struct timeval __user *, tvp)
{
	// ret = core_sys_select(n, inp, outp, exp, to);
	ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
	return ret;
}
int core_sys_select(int n, 
		fd_set __user *inp, 
		fd_set __user *outp, 
		fd_set __user *exp, 
		struct timespec *end_time)
{
	fd_set_bits fds;
	// …
	if ((ret = get_fd_set(n, inp, fds.in)) ||
	    (ret = get_fd_set(n, outp, fds.out)) ||
	    (ret = get_fd_set(n, exp, fds.ex)))
		goto out;
	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);

	// …
	ret = do_select(n, &fds, end_time);
	// …
}

很好,我們找到了一個巨集定義的select()函式的入口,繼續深入,可以看到其中最重要的就是do_select()這個核心函式。

do_select()的迴圈體

do_select()實質上是一個大的迴圈體,對每一個主程式要求監聽的裝置fd(File Descriptor)做一次struct file_operations結構體裡的poll操作。

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
	// …
	for (;;) {
		// …
		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			// …
			struct fd f;
			f = fdget(i);
			if (f.file) {
				const struct file_operations *f_op;
				f_op = f.file->f_op;
				mask = DEFAULT_POLLMASK;
				if (f_op->poll) {
					wait_key_set(wait, in, out,
						     bit, busy_flag);
					// 對每個fd進行I/O事件檢測
					mask = (*f_op->poll)(f.file, wait);
				}
				fdput(f);
				// …
			}
		}
		// 退出迴圈體
		if (retval || timed_out || signal_pending(current))
			break;
		// 進入休眠
		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
				to, slack))
			timed_out = 1;
	}
}

(*f_op->poll)會返回當前裝置fd的狀態(比如是否可讀可寫),根據這個狀態,do_select()接著做出不同的動作

  • 如果裝置fd的狀態與主程式的感興趣的I/O事件匹配,則記錄下來,do_select()退出迴圈體,並把結果返回給上層主程式。
  • 如果不匹配,do_select()發現timeout已經到了或者程序有signal訊號打斷,也會退出迴圈,只是返回空的結果給上層應用。

但如果do_select()發現當前沒有事件發生,又還沒到timeout,更沒signal打擾,核心會在這個迴圈體裡面永遠地輪詢下去嗎?

select()把全部fd檢測一輪之後如果沒有可用I/O事件,會讓當前程序去休眠一段時間,等待fd裝置或定時器來喚醒自己,然後再繼續迴圈體看看哪些fd可用,以此提高效率。

int poll_schedule_timeout(struct poll_wqueues *pwq, int state,
			  ktime_t *expires, unsigned long slack)
{
	int rc = -EINTR;

	// 休眠
	set_current_state(state);
	if (!pwq->triggered)
		rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);
	__set_current_state(TASK_RUNNING);

	/*
	 * Prepare for the next iteration.
	 *
	 * The following set_mb() serves two purposes.  First, it's
	 * the counterpart rmb of the wmb in pollwake() such that data
	 * written before wake up is always visible after wake up.
	 * Second, the full barrier guarantees that triggered clearing
	 * doesn't pass event check of the next iteration.  Note that
	 * this problem doesn't exist for the first iteration as
	 * add_wait_queue() has full barrier semantics.
	 */
	set_mb(pwq->triggered, 0);

	return rc;
}
EXPORT_SYMBOL(poll_schedule_timeout);

struct file_operations裝置驅動的操作函式

裝置發現I/O事件時會喚醒主程式程序? 每個裝置fd的等待佇列在哪?我們什麼時候把當前程序新增到它們的等待佇列裡去了?

mask = (*f_op->poll)(f.file, wait);

就是上面這行程式碼乾的好事。 不過在此之前,我們得先了解一下系統核心與檔案裝置的驅動程式之間耦合框架的設計。

上文對每個裝置的操作f_op->poll,是一個針對每個檔案裝置特定的核心函式,區別於我們平時用的系統呼叫poll()。 並且,這個操作是select()poll()epoll()背後實現的共同基礎。

Support for any of these calls requires support from the device driver. This support (for all three calls,select()poll()andepoll()) is provided through the driver’s poll method.

Linux的設計很靈活,它並不知道每個具體的檔案裝置是怎麼操作的(怎麼開啟,怎麼讀寫),但核心讓每個裝置擁有一個struct file_operations結構體,這個結構體裡定義了各種用於操作裝置的函式指標,指向操作每個檔案裝置的驅動程式實現的具體操作函式,即裝置驅動的回撥函式(callback)。

struct file {
	struct path		f_path;
	struct inode		*f_inode;	/* cached value */
	const struct file_operations	*f_op;

	// …

} __attribute__((aligned(4)));	/* lest something weird decides that 2 is OK */
struct file_operations {
	struct module *owner;
	loff_t (*llseek) (struct file *, loff_t, int);
	ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
	ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
	ssize_t (*aio_read) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
	ssize_t (*aio_write) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
	ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
	ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
	int (*iterate) (struct file *, struct dir_context *);
	// select()輪詢裝置fd的操作函式
	unsigned int (*poll) (struct file *, struct poll_table_struct *);
	// …	
};

這個f_op->poll對檔案裝置做了什麼事情呢? 一是呼叫poll_wait()函式(在include/linux/poll.h檔案); 二是檢測檔案裝置的當前狀態。

unsigned int (*poll) (struct file *filp, struct poll_table_struct *pwait);

The device method is in charge of these two steps:

  1. Callpoll_wait()on one or more wait queues that could indicate a change in the poll status. If no file descriptors are currently available for I/O, the kernel causes the process to wait on the wait queues for all file descriptors passed to the system call.
  2. Return a bit mask describing the operations (if any) that could be immediately performed without blocking.

或者來看另一個版本的說法:

For every file descriptor, it calls that fd’spoll()method, which will add the caller to that fd’s wait queue, and return which events (readable, writeable, exception) currently apply to that fd.

下一節裡我們會結合驅動例項程式來理解。

scull驅動例項

由於Linux裝置驅動的耦合設計,對裝置的操作函式都是驅動程式自定義的,我們必須要結合一個具體的例項來看看,才能知道f_op->poll裡面弄得是什麼鬼。

在這裡我們以Linux Device Drivers, Third Edition一書中的例子——scull裝置的驅動程式為例。

scull(Simple Character Utility for Loading Localities). scull is a char driver that acts on a memory area as though it were a device.

scull裝置不同於硬體裝置,它是模擬出來的一塊記憶體,因此對它的讀寫更快速更自由,記憶體支援你順著讀倒著讀點著讀怎麼讀都可以。 我們以書中“管道”(pipe)式,即FIFO的讀寫驅動程式為例。

首先是scull_pipe的結構體,注意wait_queue_head_t這個佇列型別,它就是用來記錄等待裝置I/O事件的程序的。

struct scull_pipe {
        wait_queue_head_t inq, outq;       /* read and write queues */
        char *buffer, *end;                /* begin of buf, end of buf */
        int buffersize;                    /* used in pointer arithmetic */
        char *rp, *wp;                     /* where to read, where to write */
        int nreaders, nwriters;            /* number of openings for r/w */
        struct fasync_struct *async_queue; /* asynchronous readers */
        struct mutex mutex;              /* mutual exclusion semaphore */
        struct cdev cdev;                  /* Char device structure */
};

scull裝置的輪詢操作函式scull_p_poll,驅動模組載入後,這個函式就被掛到(*poll)函式指標上去了。

我們可以看到它的確是返回了當前裝置的I/O狀態,並且呼叫了核心的poll_wait()函式,這裡注意,它把自己的wait_queue_head_t佇列也當作引數傳進去了。

static unsigned int scull_p_poll(struct file *filp, poll_table *wait)
{
	struct scull_pipe *dev = filp->private_data;
	unsigned int mask = 0;

	/*
	 * The buffer is circular; it is considered full
	 * if "wp" is right behind "rp" and empty if the
	 * two are equal.
	 */
	mutex_lock(&dev->mutex);
	poll_wait(filp, &dev->inq,  wait);
	poll_wait(filp, &dev->outq, wait);
	if (dev->rp != dev->wp)
		mask |= POLLIN | POLLRDNORM;	/* readable */
	if (spacefree(dev))
		mask |= POLLOUT | POLLWRNORM;	/* writable */
	mutex_unlock(&dev->mutex);
	return mask;
}

scull有資料寫入時,它會把wait_queue_head_t佇列裡等待的程序給喚醒。

static ssize_t scull_p_write(struct file *filp, const char __user *buf, size_t count,
                loff_t *f_pos)
{
	// …
	/* Make sure there's space to write */
	// …
	/* ok, space is there, accept something */
	// …
	/* finally, awake any reader */
	wake_up_interruptible(&dev->inq);  /* blocked in read() and select() */
	// …
}

可是wait_queue_head_t佇列裡的程序是什麼時候裝進去的? 肯定是poll_wait搞的鬼! 我們又得回到該死的Linux核心去了。

poll_wait與裝置的等待佇列

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && p->_qproc && wait_address)
		p->_qproc(filp, wait_address, p);
}

/*
 * Do not touch the structure directly, use the access functions
 * poll_does_not_wait() and poll_requested_events() instead.
 */
typedef struct poll_table_struct {
	poll_queue_proc _qproc;
	unsigned long _key;
} poll_table;

/* 
 * structures and helpers for f_op->poll implementations
 */
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

可以看到,poll_wait()其實就是隻是直接呼叫了struct poll_table_struct結構裡繫結的函式指標。 我們得找到struct poll_table_struct初始化的地方。

Thepoll_tablestructure is just a wrapper around a function that builds the actual data structure. That structure, forpollandselect, is a linked list of memory pages containingpoll_table_entrystructures.

struct poll_table_struct裡的函式指標,是在do_select()初始化的。

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
	struct poll_wqueues table;
	poll_table *wait;
	poll_initwait(&table);
	wait = &table.pt;
	// …
}

void poll_initwait(struct poll_wqueues *pwq)
{
	// 初始化poll_table裡的函式指標
	init_poll_funcptr(&pwq->pt, __pollwait);
	pwq->polling_task = current;
	pwq->triggered = 0;
	pwq->error = 0;
	pwq->table = NULL;
	pwq->inline_index = 0;
}
EXPORT_SYMBOL(poll_initwait);

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
	pt->_qproc = qproc;
	pt->_key   = ~0UL; /* all events enabled */
}

我們現在終於知道,__pollwait()函式,就是poll_wait()幕後的真凶。

add_wait_queue()把當前程序新增到裝置的等待佇列wait_queue_head_t中去。

/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
				poll_table *p)
{
	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
	struct poll_table_entry *entry = poll_get_entry(pwq);
	if (!entry)
		return;
	entry->filp = get_file(filp);
	entry->wait_address = wait_address;
	entry->key = p->_key;
	init_waitqueue_func_entry(&entry->wait, pollwake);
	entry->wait.private = pwq;
	// 把當前程序裝到裝置的等待佇列
	add_wait_queue(wait_address, &entry->wait);
}

void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
	unsigned long flags;

	wait->flags &= ~WQ_FLAG_EXCLUSIVE;
	spin_lock_irqsave(&q->lock, flags);
	__add_wait_queue(q, wait);
	spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);

static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
	list_add(&new->task_list, &head->task_list);
}

/**
 * Insert a new element after the given list head. The new element does not
 * need to be initialised as empty list.
 * The list changes from:
 *      head → some element → ...
 * to
 *      head → new element → older element → ...
 *
 * Example:
 * struct foo *newfoo = malloc(...);
 * list_add(&newfoo->entry, &bar->list_of_foos);
 *
 * @param entry The new element to prepend to the list.
 * @param head The existing list.
 */
static inline void
list_add(struct list_head *entry, struct list_head *head)
{
    __list_add(entry, head, head->next);
}

其它相關細節

  • fd_set實質上是一個unsigned long陣列,裡面的每一個long整值的每一位都代表一個檔案,其中置為1的位表示使用者要求監聽的檔案。 可以看到,select()能同時監聽的fd好少,只有1024個。
#define __FD_SETSIZE	1024

typedef struct {
	unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

typedef __kernel_fd_set		fd_set;
  • 所謂的檔案描述符fd (File Descriptor),大家也知道它其實只是一個表意的整數值,更深入地說,它是每個程序的file陣列的下標。
struct fd {
	struct file *file;
	unsigned int flags;
};
  • select()系統呼叫會建立一個poll_wqueues結構體,用來記錄相關I/O裝置的等待佇列;當select()退出迴圈體返回時,它要把當前程序從全部等待佇列中移除——這些裝置再也不用著去喚醒當前隊列了。

The call topoll_waitsometimes also adds the process to the given wait queue. The whole structure must be maintained by the kernel so that the process can be removed from all of those queues before poll or select returns.

/*
 * Structures and helpers for select/poll syscall
 */
struct poll_wqueues {
	poll_table pt;
	struct poll_table_page *table;
	struct task_struct *polling_task;
	int triggered;
	int error;
	int inline_index;
	struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

struct poll_table_entry {
	struct file *filp;
	unsigned long key;
	wait_queue_t wait;
	wait_queue_head_t *wait_address;
};
  • wait_queue_head_t就是一個程序(task)的佇列。
struct __wait_queue_head {
	spinlock_t		lock;
	struct list_head	task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;

  • select()epoll()的比較
  1. select,poll實現需要自己不斷輪詢所有fd集合,直到裝置就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要呼叫epoll_wait不斷輪詢就緒連結串列,期間也可能多次睡眠和喚醒交替,但是它是裝置就緒時,呼叫回撥函式,把就緒fd放入就緒連結串列中,並喚醒在epoll_wait中進入睡眠的程序。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的時候只要判斷一下就緒連結串列是否為空就行了,這節省了大量的CPU時間。這就是回撥機制帶來的效能提升。
  2. epoll所支援的FD上限是最大可以開啟檔案的數目,這個數字一般遠大於2048,舉個例子,在1GB記憶體的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統記憶體關係很大。

更具體的比較可以參見這篇文章

最後

非常艱難的,我們終於來到了這裡(T^T)

總結一下select()的大概流程(poll同理,只是用於存放fd的資料結構不同而已)。

  1. 先把全部fd掃一遍
  2. 如果發現有可用的fd,跳到5
  3. 如果沒有,當前程序去睡覺xx秒
  4. xx秒後自己醒了,或者狀態變化的fd喚醒了自己,跳到1
  5. 結束迴圈體,返回

我相信,你肯定還沒懂,這程式碼實在是亂得一逼,被我剪輯之後再是亂得沒法看了(嘆氣)。 所以看官請務必親自去看Linux原始碼,在這裡我已經給出了大致的方向,等你看完原始碼回來,這篇文章你肯定也就明白了。 當然別忘了下面的參考資料,它們可幫大忙了 :P

主要參考資料