網路程式設計-多路轉接之poll與epoll模型
首先,還是需要理解io過程:io過程總體來看分兩步,第一步就是等,第二步才是資料搬遷。而如果要想提高io的效能與效率,就要減少等的比重。
可以假想一個場景:
你去釣魚,但是你只有一個魚竿。你的同伴也和你一起去釣魚,但是他帶了100個魚竿。假設每條魚上鉤的概率都是一樣的,那麼你和他相同的時間內,你在死盯著一個魚竿,而他只需要來回巡視所有的魚竿,一旦有魚上鉤,拿上來即可。很明顯,它的這種方式就要比你高效得多。
如果理想情況下它的魚鉤足夠多,就會出現一種情況,每秒內都有魚上鉤。
替換到我們的io模型中,多路轉接就能夠實現這種近似理想化的情況。
但是最重要的一點:
不論是之前的select還是poll與epoll模型,本質上都是為了io過程中的等待這一過程。
poll
poll在用法上與select大致相同。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
//pollfd結構
struct pollfd
{
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */
short int revents; /* Types of events that actually occurred. */
};
引數說明:
- fds是一個poll函式監聽的結構列表,每一個元素中,包含了三個部分:檔案描述符,監聽的事件集合,返回的事件集合。
- nfds:表示fds陣列的長度。
- timeout:表示函式的超時時間,單位是毫秒
events的取值:
revents的取值:
返回值;
- 小於0:表示出錯
- 等於0:表示poll函式等待出錯
- 大於0:表示poll函式由由於監聽的檔案描述符就緒而返回
優點
select使用三個點陣圖來表示關心的事件型別,而poll使用了一個結構體指標實現。
並且使用點陣圖,對於關心的檔案描述符上限也受制於點陣圖的大小。
- pollfd結構包含了要關心的檔案描述符和關心的事件以及發生的事件,比select使用起來更方便
- poll對檔案描述符沒有數量的限制。
缺點
- 和select函式一樣,poll返回後,需要使用pollfd輪詢來獲取就緒的檔案描述符。
- 每次呼叫poll都需要把大量的pollfd結構從使用者態拷貝到記憶體中。
隨著檔案描述符上升,效率也會逐漸下降。
程式碼實現:
#define MAX 1024
typedef struct pollfd pollfd;
void Add(int fd,pollfd* fd_list,int size)
{
int i=0;
for(i=0;i<size;i++)
{
if(fd_list[i].fd==-1)
{
fd_list[i].fd=fd;
fd_list[i].events=POLLIN;
break;
}
}
}
void Init(pollfd* fd_list,int size)
{
int i=0;
for(i=0;i<size;i++)
{
fd_list[i].fd=-1;
fd_list[i].events=0;
fd_list[i].revents=0;
}
}
int startup(int port)
{
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
perror("socket");
exit(1);
}
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_addr.s_addr=htonl(INADDR_ANY);
local.sin_port=htons(port);
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(2);
}
if(listen(sock,5)<0)
{
perror("listen");
exit(3);
}
return sock;
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
printf("Usage:[%s port]\n",argv[0]);
return 1;
}
int new_sock=startup(atoi(argv[1]));
pollfd fd_list[MAX];
Init(fd_list,sizeof(fd_list)/sizeof(pollfd));//???
Add(new_sock,fd_list,sizeof(fd_list)/sizeof(pollfd));
for(;;)
{
int ret=poll(fd_list,sizeof(fd_list)/sizeof(pollfd),1000);
if(ret<0)
{
perror("poll");
continue;
}
else if(ret==0)
{
printf("timeout...\n");
continue;
}
size_t i=0;
for(i=0;i<sizeof(fd_list)/sizeof(pollfd);i++)
{
if(fd_list[i].fd==-1)
{
continue;
}
if(!(fd_list[i].revents & POLLIN))
{
continue;
}
if(fd_list[i].fd==new_sock)
{
struct sockaddr_in client;
socklen_t len=sizeof(client);
int connect_sock=accept(new_sock,(struct sockaddr*)&client,&len);
if(connect_sock<0)
{
perror("accpet");
continue;
}
Add(connect_sock,fd_list,sizeof(fd_list)/sizeof(pollfd));
}
else
{
char buf[MAX];
ssize_t s=read(fd_list[i].fd,buf,sizeof(buf)-1);
if(s<0)
{
perror("read");
continue;
}
else if(s==0)
{
printf("client quit!\n");
close(fd_list[i].fd);
fd_list[i].fd=-1;
}
else
{
printf("client say# %s\n",buf);
//write(fd_list[i].fd,buf,strlen(buf));
}
}
}
}
}
epoll模型
epoll,從命名上就可以看出與poll應該是有關聯的。按照man手冊的說法:epoll是為處理大批量控制代碼而做了改進的poll。他幾乎具備了之前的poll與select的所有優點。
相關係統呼叫
epoll_create
呼叫該函式,作業系統會幫我們做三件事。
- 建立一顆紅黑二叉樹
- 建立一個就緒佇列
- 建立回撥機制
以上稱作建立一個epoll模型。
#include <sys/epoll.h> int epoll_create(int size);
其中引數size是被忽略的(在Linux2.6.8以後),雖然是被忽略的,為了防止跨版本的問題,所以儘可能地給一個較大的值。
該函式用完之後必須用close函式關閉。
epoll_ctl
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
該函式是epoll的註冊事件函式:
- 第一個引數是epoll_create的返回值
- 第二個引數表示動作,用三個巨集表示
- 巨集的取值:
- EPOLL_CTL_ADD:註冊新的fd至epfd中
- EPOLL_CTL_MOD :修改已經註冊的fd的監聽事件
- EPOLL_CTL_DEL:從epfd中刪除一個fd
- 第三個引數是需要監聽的fd
- 第四個引數是告訴核心需要監聽什麼事。
epoll_event結構:
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
event的取值:
enum EPOLL_EVENTS
{
EPOLLIN = 0x001,//關心讀事件
#define EPOLLIN EPOLLIN
EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI
EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT
EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM
EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND
EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM
EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND
EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG
EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR
EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP
EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP
EPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUP
EPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOT
EPOLLET = 1u << 31
#define EPOLLET EPOLLET
};
可以看出這些巨集是一個個的巨集,並且是一個二進位制序列且只有一個1不會重複。所以需要關心多個使用多個按位或即可。
epoll_wait
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
該函式收集在epoll監控的事件中已經發送的事件。
- 引數events事分配好的epoll_event結構體陣列
- epoll將會把發生的事件賦值到events陣列中
- maxevents告訴核心這個events有多大,不能大於建立epoll模型時的size大小
- 引數timeout是超時時間
- 如果函式呼叫成功,返回對應i/o上已準備好的檔案描述符數目,返回0表示已超時,小於0表示失敗。
工作原理
當呼叫epoll_create函式時,系統會建立一個epoll模型,也就是做三件事,建立紅黑樹,就緒的佇列,回撥機制。
紅黑樹將儲存epoll所監聽的套接字。用來儲存所有的套接字,當進行add或者del的時候,都從紅黑樹上去處理,這樣時間複雜度就可以保持在O(logn)。
當新增事件以後,這個事件就會和相應的裝置驅動程式建立回撥關係,當相應的時間發生的時候,這個時候就會去呼叫回撥函式。回撥函式就完成了把時間新增到連結串列當中。
當我們執行epoll_ctl時,除了把socket放到epoll檔案系統裡file物件對應的紅黑樹上之外,還會給核心中斷處理程式註冊一個回撥函式,告訴核心,如果這個控制代碼的中斷到了,就把它放到準備就緒連結串列裡。所以,當一個socket上有資料到了,核心在把網絡卡上的資料copy到核心中後就來把socket插入到準備就緒連結串列裡了。
總結下來:epoll的使用方法分三步:
- 建立epoll模型
- 進行註冊關心的檔案描述符
- 等待檔案描述符就緒
epoll的優點
其實從epoll的工作原理就能夠看出epoll雖然原理複雜,但是使用起來會比select方便。不管是poll還是select資料結構都需要自己去維護,而epoll不需要,只需要你無腦式地將你關心地檔案描述符事件註冊進去,並且直接去就緒佇列中取即可。
優點總結:
- 檔案描述符無上限:通過epoll_ctl來註冊一個檔案描述符,核心中使用紅黑樹來管理所有需要監控地檔案描述符。
- 基於事件的就緒通知方式:一旦被監聽的某個檔案描述符就緒,核心會採用回撥機制,迅速啟用該檔案描述符,即是就緒的檔案描述符增多,也不會影響效能。
- 維護就緒佇列:當檔案描述符就緒,就會被放到核心中的一個就緒佇列中,只需要取佇列中元素即可。
void service(int epfd,struct epoll_event *revs,int num,int listen_sock)
{
//既關心讀有關心寫
int i=0;
struct epoll_event ev;
for(i=0;i<num;i++)
{
int fd=revs[i].data.fd;
if(revs[i].events&EPOLLIN)
{
//read
if(fd==listen_sock)//accept
{
struct sockaddr_in client;
socklen_t len=sizeof(client);
int new_fd=accept(fd,(struct sockaddr*)&client,&len);
if(new_fd<0)
{
perror("accept");
continue;
}
printf("get a connection!\n");
ev.events=EPOLLIN;
ev.data.fd=new_fd;
epoll_ctl(epfd,EPOLL_CTL_ADD,new_fd,&ev);
}
else
{
//route
char buf[1024];
ssize_t s=read(fd,buf,sizeof(buf));
if(s>0)
{
buf[s]=0;
printf("client:#%s\n",buf);
ev.events=EPOLLOUT;
ev.data.fd=fd;
epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);
}
else if(s==0)
{
printf("client quit!\n");
close(fd);
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);//刪除該fd
}
else
{
perror("read");
close(fd);
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);//刪除該fd
}
}
}
if(revs[i].events&EPOLLOUT)
{
//write
const char* msg="HTTP/1.0 200 OK\r\n\r\n<html><h1>EPOLL SUCCESS:)</h1><></html>\r\n";
write(fd,msg,strlen(msg));
close(fd);
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
}
}
}
int startup(int port)
{
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
perror("socket");
exit(1);
}
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_addr.s_addr=htonl(INADDR_ANY);
local.sin_port=htons(port);
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(2);
}
if(listen(sock,5)<0)
{
perror("listen");
exit(3);
}
return sock;
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
printf("Usage:[%s port]\n",argv[0]);
return 1;
}
int listen_sock=startup(atoi(argv[1]));
int epfd=epoll_create(MAX);
if(epfd<0)
{
perror("epoll_create");
return 5;
}
struct epoll_event ev;
ev.events=EPOLLIN;
ev.data.fd=listen_sock;
epoll_ctl(epfd,EPOLL_CTL_ADD,listen_sock,&ev);
struct epoll_event r_ev_s[MAX];
int size=0;
for(;;)
{
switch(size=epoll_wait(epfd,r_ev_s,MAX,-1))
{
case -1:
perror("epoll_wait");
break;
case 0:
printf("timeout....\n");
break;
default:
service(epfd,r_ev_s,size,listen_sock);
break;
}
}
}