IO複用(epoll)
在前面的文章中講了實現IO複用的兩種方式:select和poll。今天主要講一個更為高效的函式epoll。
epoll
epoll能顯著提高在大量連結中,只有少量活躍連線時的cpu利用率。因為,首先epoll可以複用監聽的檔案描述符集合,而不用每次在等待事件之前重新準備被監聽的檔案描述符集合。其次是因為epoll獲取就緒事件時,不用遍歷整個監聽事件的集合,而是隻需要遍歷那些被核心IO一步喚醒的放入ready佇列的檔案描述符集合。
Epoll的主要API有
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
①、int epoll_create(int size);
用來建立一個epoll控制代碼,底層實現也就是生成一個紅黑樹的樹根。它的引數只有一個size,設定監聽檔案描述符的個數,是個建議值,epoll後期監聽的檔案描述符上限和size無關。需要說明的是:當建立好epoll控制代碼後,它就是會佔用一個fd值,在linux下如果檢視/proc/程序id/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須呼叫close()關閉,否則可能導致fd被耗盡。
②、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
控制某個epoll監聽檔案描述符上的事件
第一個引數是epoll_create()建立的epoll控制代碼,
第二個引數是op操作,有三種:
EPOLL_CTL_ADD,新增一個新的fd到到紅黑樹上,
EPOLL_CTL_MOD,修改對應fd的監聽事件,
EPOLL_CTL_DEL,刪除一個fd,也就是將其從紅黑樹上摘下來。
第三個引數是監聽的檔案描述符fd,
第四個引數是epoll_event結構體指標,這個結構體裡面有兩個成員:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
第一個是event,監聽的檔案描述符的事件,一般使用的事件都有一下幾個:
EPOLLIN : 表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉)
EPOLLOUT: 表示對應的檔案描述符可以寫
EPOLLPRI: 表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來)
EPOLLERR: 表示對應的檔案描述符發生錯誤
EPOLLHUP: 表示對應的檔案描述符被結束通話;
EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)而言的
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡
第二個data也是一個epoll_data_t聯合體,定義如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
在這個結構體中最常用的就是兩個:void *ptr 和 int fd。因為這是一個聯合體。同時只能使用其中的一個成員。一般情況下,我們直接使用fd這個成員,傳入監聽的檔案描述符,和epoll_ctl函式的第三個引數保持一致。當我們想要進一步提高epoll的效能,可以使用void *ptr這個泛型指標。註冊回撥函式,當監聽的事件滿足時,直接呼叫該回調函式去執行相應的邏輯。
③、int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
Epoll_wait函式是等待監聽的事件就緒,類似於select函式和epoll函式。
第一個引數是epoll控制代碼,
第二個引數是epoll_event結構體型別的陣列,
第三個引數是這個陣列的大小,
第四個引數是設定超時,timeout。
ET模式和LT模式
epoll除了提供select/poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得使用者空間程式有可能快取IO狀態,減少epoll_wait的呼叫,提高應用程式效率。
LT模式
LT模式,也就是水平觸發模式,是epoll的預設工作方式,相當於比較快一點的poll。在這種做法中,核心告訴你一個檔案描述符是否就緒了,然後你可以對這個就緒的fd進行IO操作。如果你不作任何操作,核心還是會繼續通知你的,即在這種模式下,只要緩衝區有資料就會觸發epoll_wait()函式。。可以設定為阻塞版本,也可以設定為非阻塞版本。
ET模式
ET模式,邊沿觸發,是一種高效的工作模式,在這種模式下,當檔案描述符變為就緒狀態後,核心通過epoll通知,便不會在通知,及時緩衝區裡面還有資料也不會再通知,在這種模式下只能使用非阻塞版本,是為了避免當一個檔案控制代碼阻塞讀或寫操作時,把處理多個檔案描述符的任務餓死。
只有當read或wirte函式返回EAGIAN錯誤碼時,才需要掛起等待,但並不是說每次都需要迴圈讀,直到讀到EAGIN才結束,當我們讀到的位元組數小於緩衝區大小是時,就可以認為讀事件處理完成。使用epoll ET模式,可以減少epoll_wait()函式的呼叫次數,提高效率。
示例程式碼
以一個epoll的伺服器程式結束本文。
#include<stdio.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<ctype.h>
#include<sys/epoll.h>
#include<fcntl.h>
#define MYPORT 8888
#define BACKLOG 10
#define MAXDATASIZE 1024
#define FILEMAX 3000
#define size 20 //監聽的事件數
int main()
{
int i,j,maxi;
int listenfd,connfd,sockfd; //定義套接字描述符
int nready; //接受epool_wait返回值
int numbytes; //接受recv返回值
char buf[MAXDATASIZE]; //傳送緩衝區
struct epoll_event evt; //註冊監聽事件
struct epoll_event ep[size]; //滿足事件
//定義IPV4套介面地址結構
struct sockaddr_in seraddr; //service 地址
struct sockaddr_in cliaddr; //client 地址
int sin_size;
//初始化IPV4套介面地址結構
seraddr.sin_family =AF_INET; //指定該地址家族
seraddr.sin_port =htons(MYPORT); //埠
seraddr.sin_addr.s_addr = INADDR_ANY; //IPV4的地址
bzero(&(seraddr.sin_zero),8);
//socket()函式
if((listenfd = socket(AF_INET,SOCK_STREAM,0))==-1)
{
perror("socket");
exit(1);
}
//地址重複利用
int on = 1;
if(setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
{
perror("setsockopt");
exit(1);
}
//bind()函式
if(bind(listenfd,(struct sockaddr *)&seraddr,sizeof(struct sockaddr))==-1)
{
perror("bind");
exit(1);
}
//listen()函式
if(listen(listenfd,BACKLOG)==-1)
{
perror("listen");
exit(1);
}
int epfd = epoll_create(size); //建立控制代碼
if(epfd == -1)
{
perror("epoll_create errror!\n");
exit(1);
}
evt.events = EPOLLIN ;
evt.data.fd = listenfd;
//註冊監聽事件listenfd到epfd
int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&evt);
if(ret == -1)
{
perror("epoll_ctl error!\n");
exit(1);
}
while(1)
{
nready = epoll_wait(epfd,ep,size,-1); //監聽事件是否就緒
if(nready < 0)
{
perror("epoll_wait error!\n");
exit(1);
}
for(i = 0;i < nready;i++)
{
if(!(ep[i].events & EPOLLIN))
{
continue;
}
else if(ep[i].data.fd == listenfd) //listenfd就緒,客戶端發起連線
{
sin_size = sizeof(cliaddr);
if((connfd=accept(listenfd,(struct sockaddr *)&cliaddr,&sin_size))==-1)
{
perror("accept");
exit(1);
}
printf("client IP: %s\t PORT : %d\n",inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));
//修改connfd為非阻塞讀
evt.events = EPOLLIN | EPOLLET; //ET模式
int flag = fcntl(connfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
evt.data.fd = connfd;
int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&evt);
if(ret == -1)
{
perror("epoll_ctl error!\n");
exit(1);
}
}
else
{
sockfd = ep[i].data.fd;
memset(buf,0,sizeof(buf));
// sockfd設定為非阻塞模式,資料還沒有發給接收端時,呼叫recv就會返回-1,並且errno會被設為EAGAIN.
numbytes = recv(sockfd,buf,1024,0)
if(numbytes == -1 && EAGAIN != errno)
{
perror("recv error!\n");
exit(1);
}
if(numbytes == 0)//客戶端斷開連線
{
printf("client[%d],close!\n",i);
int ret = epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);
if(ret == -1)
{
perror("epoll_ctl error!\n");
exit(1);
}
close(sockfd);
}
if(numbytes > 0 )
{
send(sockfd,buf,numbytes,0);
numbytes = recv(sockfd,buf,1024,0);
}
}
}
}
close(listenfd);
close(epfd);
return 0;
}