1. 程式人生 > >IO複用(epoll)

IO複用(epoll)

  在前面的文章中講了實現IO複用的兩種方式:selectpoll。今天主要講一個更為高效的函式epoll。

epoll

  epoll能顯著提高在大量連結中,只有少量活躍連線時的cpu利用率。因為,首先epoll可以複用監聽的檔案描述符集合,而不用每次在等待事件之前重新準備被監聽的檔案描述符集合。其次是因為epoll獲取就緒事件時,不用遍歷整個監聽事件的集合,而是隻需要遍歷那些被核心IO一步喚醒的放入ready佇列的檔案描述符集合。
Epoll的主要API

#include <sys/epoll.h>

int epoll_create(int size);
int
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

①、int epoll_create(int size);
  用來建立一個epoll控制代碼,底層實現也就是生成一個紅黑樹的樹根。它的引數只有一個size,設定監聽檔案描述符的個數,是個建議值,epoll後期監聽的檔案描述符上限和size無關。需要說明的是:當建立好epoll控制代碼後,它就是會佔用一個fd值,在linux下如果檢視/proc/程序id/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須呼叫close()關閉,否則可能導致fd被耗盡。
  
②、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
  控制某個epoll監聽檔案描述符上的事件

第一個引數是epoll_create()建立的epoll控制代碼,
第二個引數是op操作,有三種:
  EPOLL_CTL_ADD,新增一個新的fd到到紅黑樹上,
  EPOLL_CTL_MOD,修改對應fd的監聽事件,
  EPOLL_CTL_DEL,刪除一個fd,也就是將其從紅黑樹上摘下來。
第三個引數是監聽的檔案描述符fd,  
第四個引數是epoll_event結構體指標,這個結構體裡面有兩個成員:

struct epoll_event {
      __uint32_t events;  /* Epoll events */
      epoll_data_t data;  /* User data variable */
  };

  第一個是event,監聽的檔案描述符的事件,一般使用的事件都有一下幾個:

  EPOLLIN : 表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉)
  EPOLLOUT: 表示對應的檔案描述符可以寫
  EPOLLPRI: 表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來)
  EPOLLERR: 表示對應的檔案描述符發生錯誤
  EPOLLHUP: 表示對應的檔案描述符被結束通話;
  EPOLLET:  將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)而言的
  EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡

  第二個data也是一個epoll_data_t聯合體,定義如下:
  

typedef union epoll_data {
            void *ptr;
            int fd;
            uint32_t u32;
            uint64_t u64;
        } epoll_data_t;

  在這個結構體中最常用的就是兩個:void *ptr 和 int fd。因為這是一個聯合體。同時只能使用其中的一個成員。一般情況下,我們直接使用fd這個成員,傳入監聽的檔案描述符,和epoll_ctl函式的第三個引數保持一致。當我們想要進一步提高epoll的效能,可以使用void *ptr這個泛型指標。註冊回撥函式,當監聽的事件滿足時,直接呼叫該回調函式去執行相應的邏輯。

③、int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  Epoll_wait函式是等待監聽的事件就緒,類似於select函式和epoll函式。
第一個引數是epoll控制代碼,
第二個引數是epoll_event結構體型別的陣列,
第三個引數是這個陣列的大小,
第四個引數是設定超時,timeout。

ET模式和LT模式

  epoll除了提供select/poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得使用者空間程式有可能快取IO狀態,減少epoll_wait的呼叫,提高應用程式效率。

LT模式

  LT模式,也就是水平觸發模式,是epoll的預設工作方式,相當於比較快一點的poll。在這種做法中,核心告訴你一個檔案描述符是否就緒了,然後你可以對這個就緒的fd進行IO操作。如果你不作任何操作,核心還是會繼續通知你的,即在這種模式下,只要緩衝區有資料就會觸發epoll_wait()函式。。可以設定為阻塞版本,也可以設定為非阻塞版本。

ET模式

  ET模式,邊沿觸發,是一種高效的工作模式,在這種模式下,當檔案描述符變為就緒狀態後,核心通過epoll通知,便不會在通知,及時緩衝區裡面還有資料也不會再通知,在這種模式下只能使用非阻塞版本,是為了避免當一個檔案控制代碼阻塞讀或寫操作時,把處理多個檔案描述符的任務餓死。
  只有當read或wirte函式返回EAGIAN錯誤碼時,才需要掛起等待,但並不是說每次都需要迴圈讀,直到讀到EAGIN才結束,當我們讀到的位元組數小於緩衝區大小是時,就可以認為讀事件處理完成。使用epoll ET模式,可以減少epoll_wait()函式的呼叫次數,提高效率。

示例程式碼

  以一個epoll的伺服器程式結束本文。

#include<stdio.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<ctype.h>
#include<sys/epoll.h>
#include<fcntl.h>


#define   MYPORT  8888
#define  BACKLOG  10
#define MAXDATASIZE 1024
#define FILEMAX 3000 
#define size 20 //監聽的事件數

int main()
{
    int i,j,maxi;
    int listenfd,connfd,sockfd; //定義套接字描述符
    int nready; //接受epool_wait返回值
    int numbytes; //接受recv返回值

    char buf[MAXDATASIZE]; //傳送緩衝區
    struct epoll_event evt; //註冊監聽事件
    struct epoll_event ep[size]; //滿足事件


    //定義IPV4套介面地址結構
    struct sockaddr_in seraddr;        //service 地址

    struct sockaddr_in cliaddr;     //client 地址
    int sin_size;

    //初始化IPV4套介面地址結構

    seraddr.sin_family =AF_INET; //指定該地址家族
    seraddr.sin_port =htons(MYPORT);  //埠
    seraddr.sin_addr.s_addr = INADDR_ANY;  //IPV4的地址
    bzero(&(seraddr.sin_zero),8);

    //socket()函式
    if((listenfd = socket(AF_INET,SOCK_STREAM,0))==-1)
    {
        perror("socket");
        exit(1);
    }

    //地址重複利用
    int on = 1;
    if(setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
    {
        perror("setsockopt");
        exit(1);
    }

    //bind()函式
    if(bind(listenfd,(struct sockaddr *)&seraddr,sizeof(struct sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }

    //listen()函式
    if(listen(listenfd,BACKLOG)==-1)
    {
        perror("listen");
        exit(1);
    }

    int epfd = epoll_create(size); //建立控制代碼
    if(epfd == -1)
    {
        perror("epoll_create errror!\n");
        exit(1);
    }

    evt.events = EPOLLIN ;
    evt.data.fd = listenfd;

    //註冊監聽事件listenfd到epfd
    int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&evt);
    if(ret == -1)
    {
        perror("epoll_ctl error!\n");
        exit(1);
    }

    while(1)
    {
        nready = epoll_wait(epfd,ep,size,-1);  //監聽事件是否就緒
        if(nready < 0)
        {
            perror("epoll_wait error!\n");
            exit(1);
        }

        for(i = 0;i < nready;i++)
        {
            if(!(ep[i].events & EPOLLIN))
            {
                continue;
            }
            else if(ep[i].data.fd == listenfd) //listenfd就緒,客戶端發起連線
            {
                sin_size = sizeof(cliaddr);
                if((connfd=accept(listenfd,(struct sockaddr *)&cliaddr,&sin_size))==-1)
                {
                    perror("accept");
                    exit(1);
                }
                printf("client IP: %s\t PORT : %d\n",inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));

                //修改connfd為非阻塞讀
                evt.events = EPOLLIN | EPOLLET; //ET模式

                int flag = fcntl(connfd, F_GETFL);
                flag |= O_NONBLOCK;
                fcntl(connfd, F_SETFL, flag);

                evt.data.fd = connfd;
                int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&evt);
                if(ret == -1)
                {
                    perror("epoll_ctl error!\n");
                    exit(1);
                }
            }
            else
            {
                sockfd = ep[i].data.fd;
                memset(buf,0,sizeof(buf));

               // sockfd設定為非阻塞模式,資料還沒有發給接收端時,呼叫recv就會返回-1,並且errno會被設為EAGAIN.
                numbytes = recv(sockfd,buf,1024,0)
                if(numbytes == -1 && EAGAIN != errno)
                {
                    perror("recv error!\n");
                    exit(1);
                }
                if(numbytes == 0)//客戶端斷開連線
                {
                    printf("client[%d],close!\n",i);
                    int ret = epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);
                    if(ret == -1)
                    {
                        perror("epoll_ctl error!\n");
                        exit(1);
                    }

                    close(sockfd);
                }
                if(numbytes > 0 )
                {
                    send(sockfd,buf,numbytes,0);
                    numbytes = recv(sockfd,buf,1024,0);
                }
            }
        }
    }

    close(listenfd);
    close(epfd);


    return 0;
}