1. 程式人生 > >Linux 管道的實現分析

Linux 管道的實現分析


//inode結點資訊結構
struct inode {
...
    struct pipe_inode_info  *i_pipe;
... 
};


//管道緩衝區個數
#define PIPE_BUFFERS (16)
//管道快取區物件結構
struct pipe_buffer {
    struct page *page; //管道緩衝區頁框的描述符地址
    unsigned int offset, len; //頁框內有效資料的當前位置,和有效資料的長度
    struct pipe_buf_operations *ops; //管道快取區方法表的地址
};


//管道資訊結構
struct pipe_inode_info {
    wait_queue_head_t wait; //管道等待佇列
    unsigned int nrbufs, curbuf; //包含待讀資料的緩衝區數和包含待讀資料的第一個緩衝區的索引
    struct pipe_buffer bufs[PIPE_BUFFERS]; //管道緩衝區描述符陣列
    struct page *tmp_page; //快取記憶體區頁框指標
    unsigned int start;  //當前管道快取區讀的位置
    unsigned int readers; //讀程序的標誌,或編號
    unsigned int writers; //寫程序的標誌,或編號
    unsigned int waiting_writers; //在等待佇列中睡眠的寫程序的個數
    unsigned int r_counter; //與readers類似,但當等待寫入FIFO的程序是使用
    unsigned int w_counter; //與writers類似,但當等待寫入FIFO的程序時使用
    struct fasync_struct *fasync_readers; //用於通過訊號進行的非同步I/O通知
    struct fasync_struct *fasync_writers; //用於通過訊號的非同步I/O通知
};
 
 
2, 管道的實現
管道可以看著是開啟的檔案,但在已安裝的檔案系統中沒有相應的影像。
管道是作為一組VFS物件來實現的,因此沒有對應的磁碟映像。
而在2.6中把這些VFS物件組織成pipfs特殊檔案系統以加速他們的處理。
但這種檔案系統在系統目錄樹中沒有安裝點,使用者根本看不到它。
2.1 pipefs 檔案系統的安裝
//檔案系統的安裝可以參考檔案系統的實現一章
static struct super_block *pipefs_get_sb(struct file_system_type *fs_type,
    int flags, const char *dev_name, void *data)
{
    return get_sb_pseudo(fs_type, "pipe:", NULL, PIPEFS_MAGIC);
}
static struct file_system_type pipe_fs_type = { 
    .name       = "pipefs",
    .get_sb     = pipefs_get_sb,
    .kill_sb    = kill_anon_super,
};
static int __init init_pipe_fs(void)
{
    int err = register_filesystem(&pipe_fs_type);
    if (!err) {
        pipe_mnt = kern_mount(&pipe_fs_type);
        if (IS_ERR(pipe_mnt)) {
            err = PTR_ERR(pipe_mnt);
            unregister_filesystem(&pipe_fs_type);
        }   
    }   
    return err;
}
static void __exit exit_pipe_fs(void)
{
    unregister_filesystem(&pipe_fs_type);
    mntput(pipe_mnt);
}


2.2 pipe的建立的實現
對於每個管道來說,核心都建立一個inode結點物件,兩個file物件,一個用於讀,一個用於寫。


int do_pipe(int *fd)
{
 struct qstr this;
 char name[32];
 struct dentry *dentry;
 struct inode * inode;
 struct file *f1, *f2;
 int error;
 int i,j;
 error = -ENFILE;
 f1 = get_empty_filp();  //獲取檔案物件1
 if (!f1)
  goto no_files;
 f2 = get_empty_filp();  //獲取檔案物件2
 if (!f2)
  goto close_f1;
 inode = get_pipe_inode(); //獲取pipe的inode結點
 if (!inode)
  goto close_f12;
 error = get_unused_fd(); //獲取沒有使用的fd1
 if (error < 0)
  goto close_f12_inode;
 i = error;
 error = get_unused_fd(); //獲取沒有使用的fd2
 if (error < 0)
  goto close_f12_inode_i;
 j = error;
 error = -ENOMEM;
 sprintf(name, "[%lu]", inode->i_ino); //設定索引節點號
 this.name = name;
 this.len = strlen(name);
 this.hash = inode->i_ino; /* will go */
 dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this); //獲取一個目錄物件
 if (!dentry)
  goto close_f12_inode_i_j;
 dentry->d_op = &pipefs_dentry_operations;
 //把目錄物件和inode結點聯絡在一起
 d_add(dentry, inode);
 f1->f_vfsmnt = f2->f_vfsmnt = mntget(mntget(pipe_mnt));
 f1->f_dentry = f2->f_dentry = dget(dentry);
 f1->f_mapping = f2->f_mapping = inode->i_mapping;
 /* read file */ //給讀描述符的檔案物件賦值
 f1->f_pos = f2->f_pos = 0; //讀的位置從0偏移量開始
 f1->f_flags = O_RDONLY; //只讀
 f1->f_op = &read_pipe_fops; //讀操作時執行的函式
 f1->f_mode = FMODE_READ; //讀模式
 f1->f_version = 0;
 /* write file */
 f2->f_flags = O_WRONLY; //只寫
 f2->f_op = &write_pipe_fops; //寫操作執行函式
 f2->f_mode = FMODE_WRITE; //寫模式
 f2->f_version = 0;
 fd_install(i, f1); //給檔案物件f1中的fd賦值
 fd_install(j, f2); //給檔案物件f2中的fd賦值
 fd[0] = i; //把值賦給使用者空間
 fd[1] = j; //把值賦給使用者空間
 return 0;
close_f12_inode_i_j:
 put_unused_fd(j);
close_f12_inode_i:
 put_unused_fd(i);
close_f12_inode:
 free_pipe_info(inode);
 iput(inode);
close_f12:
 put_filp(f2);
close_f1:
 put_filp(f1);
no_files:
 return error; 
}


//獲取管道的inode結構
static struct inode * get_pipe_inode(void)
{
 struct inode *inode = new_inode(pipe_mnt->mnt_sb);
 if (!inode)
  goto fail_inode;
 if(!pipe_new(inode))
  goto fail_iput;
 PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;
 inode->i_fop = &rdwr_pipe_fops;
 /*
  * Mark the inode dirty from the very beginning,
  * that way it will never be moved to the dirty
  * list because "mark_inode_dirty()" will think
  * that it already _is_ on the dirty list.
  */
 inode->i_state = I_DIRTY;
 inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
 inode->i_uid = current->fsuid;
 inode->i_gid = current->fsgid;
 inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
 inode->i_blksize = PAGE_SIZE;
 return inode;
fail_iput:
 iput(inode);
fail_inode:
 return NULL;
}
 
*管道的讀操作的實現
 
管道讀操作的規則如下:

//管道讀操作函式
static ssize_t
pipe_readv(struct file *filp, const struct iovec *_iov,
    unsigned long nr_segs, loff_t *ppos)
{
 struct inode *inode = filp->f_dentry->d_inode; //獲取inode結點指標
 struct pipe_inode_info *info;
 int do_wakeup;
 ssize_t ret;
 struct iovec *iov = (struct iovec *)_iov; //獲取讀緩衝區的結構
 size_t total_len;
 total_len = iov_length(iov, nr_segs);
 /* Null read succeeds. */
 if (unlikely(total_len == 0))
  return 0;
 do_wakeup = 0;
 ret = 0;
 down(PIPE_SEM(*inode)); //獲取inode中的i_sem訊號量
 info = inode->i_pipe; //獲取inode 結構的pipe_inode_info結構指標
 for (;;) {
  int bufs = info->nrbufs; //檢查有幾個管道緩衝區有被讀取的資料
  if (bufs) { //說明有其中有緩衝區包含了讀資料
   int curbuf = info->curbuf; //獲取當前讀資料的管道快取區的索引
   struct pipe_buffer *buf = info->bufs + curbuf; //共有16個緩衝區,curbuf是當前的
   struct pipe_buf_operations *ops = buf->ops; //獲取操作函式列表
   void *addr;
   size_t chars = buf->len; 
   int error;
   //若緩衝區長度大於要求讀取的資料長度,chars設定成要求讀的長度
   if (chars > total_len) 
    chars = total_len;
   //執行Map方法
   addr = ops->map(filp, info, buf);
   //從快取區中複製資料
   error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars);
   //執行umap方法
   ops->unmap(info, buf);
   if (unlikely(error)) { 
    if (!ret) ret = -EFAULT; //第一次讀失敗
    break;
   }
   //更新管道的offset和len欄位
   ret += chars;
   buf->offset += chars;
   buf->len -= chars;
   
   //若現在的快取區的資料長度為0
   if (!buf->len) {
    buf->ops = NULL;
    ops->release(info, buf);
    curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
    info->curbuf = curbuf;
    info->nrbufs = --bufs;
    do_wakeup = 1;
   }
   total_len -= chars;  //更新讀的總長度
   if (!total_len)  //該讀的已讀完成
    break; /* common path: read succeeded */
  }
  if (bufs) /* More to do? */
   continue;
  //若bufs為0,說明所有管道為NULL,此時進行一下操作
  if (!PIPE_WRITERS(*inode)) //是否有寫操作正在進行
   break;
  if (!PIPE_WAITING_WRITERS(*inode)) { //是否需要等待
   /* syscall merging: Usually we must not sleep
    * if O_NONBLOCK is set, or if we got some data.
    * But if a writer sleeps in kernel space, then
    * we can wait for that data without violating POSIX.
    */
   if (ret)
    break;
   if (filp->f_flags & O_NONBLOCK) { //要等待但又設定了NONBLOCK標記,矛盾了
    ret = -EAGAIN;
    break;
   }
  }
  if (signal_pending(current)) { //設定程序阻塞標誌
   if (!ret) ret = -ERESTARTSYS;
   break;
  }
  if (do_wakeup) {
   wake_up_interruptible_sync(PIPE_WAIT(*inode));
    kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
  }
  pipe_wait(inode); 
 }
 up(PIPE_SEM(*inode));
 /* Signal writers asynchronously that there is more room.  */
 if (do_wakeup) {
  wake_up_interruptible(PIPE_WAIT(*inode));
  kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
 }
 if (ret > 0)
  file_accessed(filp);  //更新檔案結構的atime物件
 return ret;
}
static ssize_t
pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
{
 struct iovec iov = { .iov_base = buf, .iov_len = count };
 return pipe_readv(filp, &iov, 1, ppos);
}


/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode)
{
    DEFINE_WAIT(wait);
 //把current新增到管道的等待佇列中
    prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE);
 //釋放i_sem
    up(PIPE_SEM(*inode));
    schedule();
 //被呼醒,把它從等待佇列中刪除
    finish_wait(PIPE_WAIT(*inode), &wait);
 //再次獲取i_sem索引節點訊號量
    down(PIPE_SEM(*inode));
}
 
 
*管道的寫操作
在管道建立函式do_pipe中可以看到,管道的寫操作結構是write_pipe_fops,
該操作列表中的寫操作是呼叫pipe_write實現的。
POSIX標準定義了寫操作的一些規則:
 

static ssize_t
pipe_writev(struct file *filp, const struct iovec *_iov,
     unsigned long nr_segs, loff_t *ppos)
{
    struct inode *inode = filp->f_dentry->d_inode;
    struct pipe_inode_info *info;
    ssize_t ret;
    int do_wakeup;
    struct iovec *iov = (struct iovec *)_iov;
    size_t total_len;


    total_len = iov_length(iov, nr_segs);
    /* Null write succeeds. */
    if (unlikely(total_len == 0))
        return 0;


    do_wakeup = 0;
    ret = 0;
    down(PIPE_SEM(*inode));
    info = inode->i_pipe;


    //是否有讀者程序存在,若沒有寫管道操作就沒有任何意義


    //此時產生SIGPIPE訊號


    if (!PIPE_READERS(*inode)) {
        send_sig(SIGPIPE, current, 0);
        ret = -EPIPE;
        goto out;
    }


    /* We try to merge small writes */
    //若有待讀資料的緩衝區,而且寫入的資料長度小於PAGE_SIZE


    if (info->nrbufs && total_len < PAGE_SIZE) {
        //第一個待讀緩衝區+可讀緩衝區數-1得到第一個可寫緩衝區的地址


        int lastbuf = (info->curbuf + info->nrbufs - 1) & (PIPE_BUFFERS-1);
        struct pipe_buffer *buf = info->bufs + lastbuf;
        struct pipe_buf_operations *ops = buf->ops;
        int offset = buf->offset + buf->len;
        //若可寫緩衝區的剩餘的空間大於寫入的資料總量total_len


        if (ops->can_merge && offset + total_len <= PAGE_SIZE) { 
            void *addr = ops->map(filp, info, buf);
            //把資料複製到管道緩衝區


            int error = pipe_iov_copy_from_user(offset + addr, iov, total_len);
            ops->unmap(info, buf);
            ret = error;
            do_wakeup = 1;
            if (error)
                goto out;
            //更新有效資料長度欄位


            buf->len += total_len;
            ret = total_len;
            goto out;
        }
            
    }


    // 若全部可寫(可讀緩衝區數為0),


    // 或寫入資料長度大於管道緩衝區的長度單位(PAGE_SIZE)


    for (;;) {
        int bufs;
        //是否有讀者程序存在


        if (!PIPE_READERS(*inode)) {
            send_sig(SIGPIPE, current, 0);
            if (!ret) ret = -EPIPE;
            break;
        }
        //獲取讀緩衝區數


        bufs = info->nrbufs;
        if (bufs < PIPE_BUFFERS) {
            ssize_t chars;
            //用第一個可讀緩衝區+可讀緩衝區數得到可寫(空)緩衝區的地址


            int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
            struct pipe_buffer *buf = info->bufs + newbuf;
            struct page *page = info->tmp_page;
            int error;


            //若page的值為空,從夥伴系統中獲取一頁


            if (!page) {
                page = alloc_page(GFP_HIGHUSER);
                if (unlikely(!page)) {
                    ret = ret ? : -ENOMEM;
                    break;
                }
                info->tmp_page = page;
            }
            /* Always wakeup, even if the copy fails. Otherwise
             * we lock up (O_NONBLOCK-)readers that sleep due to
             * syscall merging.
             * FIXME! Is this really true?
             */
            do_wakeup = 1;
            chars = PAGE_SIZE;
            if (chars > total_len)
                chars = total_len;


            //寫chars位元組到緩衝區中


            error = pipe_iov_copy_from_user(kmap(page), iov, chars);
            kunmap(page);
            if (unlikely(error)) {
                if (!ret) ret = -EFAULT;
                break;
            }
            ret += chars;


            /* Insert it into the buffer array */
            /更新nrbufs,和len欄位。
            buf->page = page;
            buf->ops = &anon_pipe_buf_ops;
            buf->offset = 0;
            buf->len = chars;
            info->nrbufs = ++bufs;
            info->tmp_page = NULL;


            //若沒有寫完繼續寫入剩下的資料


            total_len -= chars;
            if (!total_len)
                break;
        }
        //還有可寫緩衝區,繼續寫


        if (bufs < PIPE_BUFFERS)
            continue;
        //若設定非阻塞,


        //若沒有寫入任何的資料ret=0,此時返回錯誤


        //若已經寫完了資料,結束寫操作。


        if (filp->f_flags & O_NONBLOCK) {
            if (!ret) ret = -EAGAIN;
            break;
        }
        if (signal_pending(current)) {
            if (!ret) ret = -ERESTARTSYS;
            break;
        }
        if (do_wakeup) {
            wake_up_interruptible_sync(PIPE_WAIT(*inode));
            kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
            do_wakeup = 0;
        }
        PIPE_WAITING_WRITERS(*inode)++;
        pipe_wait(inode);
        PIPE_WAITING_WRITERS(*inode)--;
    }
out:
    up(PIPE_SEM(*inode));
    if (do_wakeup) {
        wake_up_interruptible(PIPE_WAIT(*inode));
        kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
    }
    if (ret > 0)
        inode_update_time(inode, 1);    /* mtime and ctime */
    return ret;
}