Linux 管道的實現分析
阿新 • • 發佈:2019-02-04
//inode結點資訊結構
struct inode {
...
struct pipe_inode_info *i_pipe;
...
};
//管道緩衝區個數
#define PIPE_BUFFERS (16)
//管道快取區物件結構
struct pipe_buffer {
struct page *page; //管道緩衝區頁框的描述符地址
unsigned int offset, len; //頁框內有效資料的當前位置,和有效資料的長度
struct pipe_buf_operations *ops; //管道快取區方法表的地址
};
//管道資訊結構
struct pipe_inode_info {
wait_queue_head_t wait; //管道等待佇列
unsigned int nrbufs, curbuf; //包含待讀資料的緩衝區數和包含待讀資料的第一個緩衝區的索引
struct pipe_buffer bufs[PIPE_BUFFERS]; //管道緩衝區描述符陣列
struct page *tmp_page; //快取記憶體區頁框指標
unsigned int start; //當前管道快取區讀的位置
unsigned int readers; //讀程序的標誌,或編號
unsigned int writers; //寫程序的標誌,或編號
unsigned int waiting_writers; //在等待佇列中睡眠的寫程序的個數
unsigned int r_counter; //與readers類似,但當等待寫入FIFO的程序是使用
unsigned int w_counter; //與writers類似,但當等待寫入FIFO的程序時使用
struct fasync_struct *fasync_readers; //用於通過訊號進行的非同步I/O通知
struct fasync_struct *fasync_writers; //用於通過訊號的非同步I/O通知
};
2, 管道的實現
管道可以看著是開啟的檔案,但在已安裝的檔案系統中沒有相應的影像。
管道是作為一組VFS物件來實現的,因此沒有對應的磁碟映像。
而在2.6中把這些VFS物件組織成pipfs特殊檔案系統以加速他們的處理。
但這種檔案系統在系統目錄樹中沒有安裝點,使用者根本看不到它。
2.1 pipefs 檔案系統的安裝
//檔案系統的安裝可以參考檔案系統的實現一章
static struct super_block *pipefs_get_sb(struct file_system_type *fs_type,
int flags, const char *dev_name, void *data)
{
return get_sb_pseudo(fs_type, "pipe:", NULL, PIPEFS_MAGIC);
}
static struct file_system_type pipe_fs_type = {
.name = "pipefs",
.get_sb = pipefs_get_sb,
.kill_sb = kill_anon_super,
};
static int __init init_pipe_fs(void)
{
int err = register_filesystem(&pipe_fs_type);
if (!err) {
pipe_mnt = kern_mount(&pipe_fs_type);
if (IS_ERR(pipe_mnt)) {
err = PTR_ERR(pipe_mnt);
unregister_filesystem(&pipe_fs_type);
}
}
return err;
}
static void __exit exit_pipe_fs(void)
{
unregister_filesystem(&pipe_fs_type);
mntput(pipe_mnt);
}
2.2 pipe的建立的實現
對於每個管道來說,核心都建立一個inode結點物件,兩個file物件,一個用於讀,一個用於寫。
int do_pipe(int *fd)
{
struct qstr this;
char name[32];
struct dentry *dentry;
struct inode * inode;
struct file *f1, *f2;
int error;
int i,j;
error = -ENFILE;
f1 = get_empty_filp(); //獲取檔案物件1
if (!f1)
goto no_files;
f2 = get_empty_filp(); //獲取檔案物件2
if (!f2)
goto close_f1;
inode = get_pipe_inode(); //獲取pipe的inode結點
if (!inode)
goto close_f12;
error = get_unused_fd(); //獲取沒有使用的fd1
if (error < 0)
goto close_f12_inode;
i = error;
error = get_unused_fd(); //獲取沒有使用的fd2
if (error < 0)
goto close_f12_inode_i;
j = error;
error = -ENOMEM;
sprintf(name, "[%lu]", inode->i_ino); //設定索引節點號
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino; /* will go */
dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this); //獲取一個目錄物件
if (!dentry)
goto close_f12_inode_i_j;
dentry->d_op = &pipefs_dentry_operations;
//把目錄物件和inode結點聯絡在一起
d_add(dentry, inode);
f1->f_vfsmnt = f2->f_vfsmnt = mntget(mntget(pipe_mnt));
f1->f_dentry = f2->f_dentry = dget(dentry);
f1->f_mapping = f2->f_mapping = inode->i_mapping;
/* read file */ //給讀描述符的檔案物件賦值
f1->f_pos = f2->f_pos = 0; //讀的位置從0偏移量開始
f1->f_flags = O_RDONLY; //只讀
f1->f_op = &read_pipe_fops; //讀操作時執行的函式
f1->f_mode = FMODE_READ; //讀模式
f1->f_version = 0;
/* write file */
f2->f_flags = O_WRONLY; //只寫
f2->f_op = &write_pipe_fops; //寫操作執行函式
f2->f_mode = FMODE_WRITE; //寫模式
f2->f_version = 0;
fd_install(i, f1); //給檔案物件f1中的fd賦值
fd_install(j, f2); //給檔案物件f2中的fd賦值
fd[0] = i; //把值賦給使用者空間
fd[1] = j; //把值賦給使用者空間
return 0;
close_f12_inode_i_j:
put_unused_fd(j);
close_f12_inode_i:
put_unused_fd(i);
close_f12_inode:
free_pipe_info(inode);
iput(inode);
close_f12:
put_filp(f2);
close_f1:
put_filp(f1);
no_files:
return error;
}
//獲取管道的inode結構
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode(pipe_mnt->mnt_sb);
if (!inode)
goto fail_inode;
if(!pipe_new(inode))
goto fail_iput;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;
inode->i_fop = &rdwr_pipe_fops;
/*
* Mark the inode dirty from the very beginning,
* that way it will never be moved to the dirty
* list because "mark_inode_dirty()" will think
* that it already _is_ on the dirty list.
*/
inode->i_state = I_DIRTY;
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
inode->i_uid = current->fsuid;
inode->i_gid = current->fsgid;
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_blksize = PAGE_SIZE;
return inode;
fail_iput:
iput(inode);
fail_inode:
return NULL;
}
*管道的讀操作的實現
管道讀操作的規則如下:
//管道讀操作函式
static ssize_t
pipe_readv(struct file *filp, const struct iovec *_iov,
unsigned long nr_segs, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode; //獲取inode結點指標
struct pipe_inode_info *info;
int do_wakeup;
ssize_t ret;
struct iovec *iov = (struct iovec *)_iov; //獲取讀緩衝區的結構
size_t total_len;
total_len = iov_length(iov, nr_segs);
/* Null read succeeds. */
if (unlikely(total_len == 0))
return 0;
do_wakeup = 0;
ret = 0;
down(PIPE_SEM(*inode)); //獲取inode中的i_sem訊號量
info = inode->i_pipe; //獲取inode 結構的pipe_inode_info結構指標
for (;;) {
int bufs = info->nrbufs; //檢查有幾個管道緩衝區有被讀取的資料
if (bufs) { //說明有其中有緩衝區包含了讀資料
int curbuf = info->curbuf; //獲取當前讀資料的管道快取區的索引
struct pipe_buffer *buf = info->bufs + curbuf; //共有16個緩衝區,curbuf是當前的
struct pipe_buf_operations *ops = buf->ops; //獲取操作函式列表
void *addr;
size_t chars = buf->len;
int error;
//若緩衝區長度大於要求讀取的資料長度,chars設定成要求讀的長度
if (chars > total_len)
chars = total_len;
//執行Map方法
addr = ops->map(filp, info, buf);
//從快取區中複製資料
error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars);
//執行umap方法
ops->unmap(info, buf);
if (unlikely(error)) {
if (!ret) ret = -EFAULT; //第一次讀失敗
break;
}
//更新管道的offset和len欄位
ret += chars;
buf->offset += chars;
buf->len -= chars;
//若現在的快取區的資料長度為0
if (!buf->len) {
buf->ops = NULL;
ops->release(info, buf);
curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
info->curbuf = curbuf;
info->nrbufs = --bufs;
do_wakeup = 1;
}
total_len -= chars; //更新讀的總長度
if (!total_len) //該讀的已讀完成
break; /* common path: read succeeded */
}
if (bufs) /* More to do? */
continue;
//若bufs為0,說明所有管道為NULL,此時進行一下操作
if (!PIPE_WRITERS(*inode)) //是否有寫操作正在進行
break;
if (!PIPE_WAITING_WRITERS(*inode)) { //是否需要等待
/* syscall merging: Usually we must not sleep
* if O_NONBLOCK is set, or if we got some data.
* But if a writer sleeps in kernel space, then
* we can wait for that data without violating POSIX.
*/
if (ret)
break;
if (filp->f_flags & O_NONBLOCK) { //要等待但又設定了NONBLOCK標記,矛盾了
ret = -EAGAIN;
break;
}
}
if (signal_pending(current)) { //設定程序阻塞標誌
if (!ret) ret = -ERESTARTSYS;
break;
}
if (do_wakeup) {
wake_up_interruptible_sync(PIPE_WAIT(*inode));
kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
}
pipe_wait(inode);
}
up(PIPE_SEM(*inode));
/* Signal writers asynchronously that there is more room. */
if (do_wakeup) {
wake_up_interruptible(PIPE_WAIT(*inode));
kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
}
if (ret > 0)
file_accessed(filp); //更新檔案結構的atime物件
return ret;
}
static ssize_t
pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
{
struct iovec iov = { .iov_base = buf, .iov_len = count };
return pipe_readv(filp, &iov, 1, ppos);
}
/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode)
{
DEFINE_WAIT(wait);
//把current新增到管道的等待佇列中
prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE);
//釋放i_sem
up(PIPE_SEM(*inode));
schedule();
//被呼醒,把它從等待佇列中刪除
finish_wait(PIPE_WAIT(*inode), &wait);
//再次獲取i_sem索引節點訊號量
down(PIPE_SEM(*inode));
}
*管道的寫操作
在管道建立函式do_pipe中可以看到,管道的寫操作結構是write_pipe_fops,
該操作列表中的寫操作是呼叫pipe_write實現的。
POSIX標準定義了寫操作的一些規則:
static ssize_t
pipe_writev(struct file *filp, const struct iovec *_iov,
unsigned long nr_segs, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info;
ssize_t ret;
int do_wakeup;
struct iovec *iov = (struct iovec *)_iov;
size_t total_len;
total_len = iov_length(iov, nr_segs);
/* Null write succeeds. */
if (unlikely(total_len == 0))
return 0;
do_wakeup = 0;
ret = 0;
down(PIPE_SEM(*inode));
info = inode->i_pipe;
//是否有讀者程序存在,若沒有寫管道操作就沒有任何意義
//此時產生SIGPIPE訊號
if (!PIPE_READERS(*inode)) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
goto out;
}
/* We try to merge small writes */
//若有待讀資料的緩衝區,而且寫入的資料長度小於PAGE_SIZE
if (info->nrbufs && total_len < PAGE_SIZE) {
//第一個待讀緩衝區+可讀緩衝區數-1得到第一個可寫緩衝區的地址
int lastbuf = (info->curbuf + info->nrbufs - 1) & (PIPE_BUFFERS-1);
struct pipe_buffer *buf = info->bufs + lastbuf;
struct pipe_buf_operations *ops = buf->ops;
int offset = buf->offset + buf->len;
//若可寫緩衝區的剩餘的空間大於寫入的資料總量total_len
if (ops->can_merge && offset + total_len <= PAGE_SIZE) {
void *addr = ops->map(filp, info, buf);
//把資料複製到管道緩衝區
int error = pipe_iov_copy_from_user(offset + addr, iov, total_len);
ops->unmap(info, buf);
ret = error;
do_wakeup = 1;
if (error)
goto out;
//更新有效資料長度欄位
buf->len += total_len;
ret = total_len;
goto out;
}
}
// 若全部可寫(可讀緩衝區數為0),
// 或寫入資料長度大於管道緩衝區的長度單位(PAGE_SIZE)
for (;;) {
int bufs;
//是否有讀者程序存在
if (!PIPE_READERS(*inode)) {
send_sig(SIGPIPE, current, 0);
if (!ret) ret = -EPIPE;
break;
}
//獲取讀緩衝區數
bufs = info->nrbufs;
if (bufs < PIPE_BUFFERS) {
ssize_t chars;
//用第一個可讀緩衝區+可讀緩衝區數得到可寫(空)緩衝區的地址
int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
struct pipe_buffer *buf = info->bufs + newbuf;
struct page *page = info->tmp_page;
int error;
//若page的值為空,從夥伴系統中獲取一頁
if (!page) {
page = alloc_page(GFP_HIGHUSER);
if (unlikely(!page)) {
ret = ret ? : -ENOMEM;
break;
}
info->tmp_page = page;
}
/* Always wakeup, even if the copy fails. Otherwise
* we lock up (O_NONBLOCK-)readers that sleep due to
* syscall merging.
* FIXME! Is this really true?
*/
do_wakeup = 1;
chars = PAGE_SIZE;
if (chars > total_len)
chars = total_len;
//寫chars位元組到緩衝區中
error = pipe_iov_copy_from_user(kmap(page), iov, chars);
kunmap(page);
if (unlikely(error)) {
if (!ret) ret = -EFAULT;
break;
}
ret += chars;
/* Insert it into the buffer array */
/更新nrbufs,和len欄位。
buf->page = page;
buf->ops = &anon_pipe_buf_ops;
buf->offset = 0;
buf->len = chars;
info->nrbufs = ++bufs;
info->tmp_page = NULL;
//若沒有寫完繼續寫入剩下的資料
total_len -= chars;
if (!total_len)
break;
}
//還有可寫緩衝區,繼續寫
if (bufs < PIPE_BUFFERS)
continue;
//若設定非阻塞,
//若沒有寫入任何的資料ret=0,此時返回錯誤
//若已經寫完了資料,結束寫操作。
if (filp->f_flags & O_NONBLOCK) {
if (!ret) ret = -EAGAIN;
break;
}
if (signal_pending(current)) {
if (!ret) ret = -ERESTARTSYS;
break;
}
if (do_wakeup) {
wake_up_interruptible_sync(PIPE_WAIT(*inode));
kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
do_wakeup = 0;
}
PIPE_WAITING_WRITERS(*inode)++;
pipe_wait(inode);
PIPE_WAITING_WRITERS(*inode)--;
}
out:
up(PIPE_SEM(*inode));
if (do_wakeup) {
wake_up_interruptible(PIPE_WAIT(*inode));
kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
}
if (ret > 0)
inode_update_time(inode, 1); /* mtime and ctime */
return ret;
}