I/O多路複用——poll
上一篇我們說了關於select的相關資訊,我們可以看到select是有弊端的,所以為了解決select的弊端,UNIX又在後期提出了poll。
select的弊端這裡就不多說了,上一篇部落格有提及。
poll
poll和select類似,不過在一些方面改善了select的弊端。它也是在指定的時間進行輪詢檔案描述符,檢視是否有就緒時間發生。
和上次一樣,我們先來看一下poll系統呼叫。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds是一個pollfd的結構體陣列。
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
這就是這個結構體陣列每個元素。fd用來記錄對應的檔案描述符,events用來表示poll所監聽的事件,這個由使用者來設定。revents用來表示返回的事件。revents是通過核心來進行操作修改。
這裡提供了一些合法事件。
事件 | 說明 |
---|---|
POLLIN | 普通或優先順序帶資料可讀 |
POLLRDNORM | 普通資料可讀 |
POLLRDBAND | 優先順序帶資料可讀 |
POLLPRI | 高優先順序資料可讀 |
POLLOUT | 普通資料可寫 |
POLLWRNORM | 普通資料可寫 |
POLLWRBAND | 優先順序帶資料可寫 |
POLLERR | 發生錯誤 |
POLLHUP | 發生掛起 |
POLLNVAL | 描述字不是一個開啟的檔案 |
後面的三個引數在events無意義,只能作為返回結果儲存在revents。
另外,這裡需要說的,這些引數如何設定給events,這些巨集相當於每一個佔用一個位元位,我們可以去想一下點陣圖,所以,如果我們要進行設定兩個事件,就使用|
&
事件,如果事件發生了,那麼結果大於1。這就是一個簡單的位運算的,相信你仔細想想就能夠理解。
第二個引數nfds,用來監視的檔案描述符的數目。
第三個引數是timeout,用來設定超時時間。
引數 | 說明 |
---|---|
-1 | poll將永遠阻塞,等待知道某個時間發生 |
0 | 立即返回 |
大於0的值 | 設定正常時間值 |
返回值
poll返回revents不為0的檔案描述符的個數。
失敗返回-1
總結
poll本質上和select沒有區別,它將使用者傳入的陣列拷貝到核心空間,然後查詢每個fd對應的裝置狀態,如果裝置就緒則在裝置等待佇列中加入一項並繼續遍歷,如果遍歷完所有fd後沒有發現就緒裝置,則掛起當前程序,直到裝置就緒或者主動超時,被喚醒後它又要再次遍歷fd。這個過程經歷了多次無謂的遍歷。
poll使用了events和revents分流的特點,這樣可以使得對關心事件只進行註冊一次。
poll基於連結串列進行儲存,沒有最大連線數的限制,只取決於記憶體大小。
poll還有一個特點是“水平觸發”,如果報告了fd後,沒有被處理,那麼下次poll時會再次報告該fd。
poll的實現機制與select類似,其對應核心中的sys_poll,只不過poll向核心傳遞pollfd陣列,然後對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。poll返回後,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。
poll的缺點
1、大量的fd的陣列被整體複製於使用者態和核心地址空間之間,而不管這樣是不是有意義。
2、poll依然需要進行輪詢,所消耗的時間太多。
3、水平觸發,效率低
示例程式
聊天室程式:
#define _GNU_SOURCE 1
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<stdlib.h>
#include<assert.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<fcntl.h>
#include<errno.h>
#include<string.h>
#include<poll.h>
#define LIMIT_FD 65535
#define LIMIT_USER 5
#define BUF_SIZE 1024
//客戶資料:包含客戶的socket地址,待寫到客戶端的資料的位置、從客戶端讀入資料。
struct client_data
{
struct sockaddr_in address;
char* write_buf; //寫入客戶端段的資料的位置
char buf[BUF_SIZE]; //客戶端讀入的資料
};
int StartUp(int port,char *ip_addr)
{
assert(ip_addr);
int sock = socket(AF_INET, SOCK_STREAM,0);
if(sock < 0)
{
perror("socket");
exit(2);
}
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = inet_addr(ip_addr);
if(bind(sock,(struct sockaddr *)&local,sizeof(local)) < 0)
{
perror("bind");
exit(3);
}
if(listen(sock,5) < 0)
{
perror("listen");
exit(4);
}
return sock;
}
//設定檔案描述符為非阻塞狀態
int setnoblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}
int main(int argc, char *argv[])
{
if(argc != 3)
{
printf("Usage: %s [local_ip] [local_port]\n",argv[0]);
return 1;
}
//建立監聽socket
int listen_sock = StartUp(atoi(argv[2]),argv[1]);
//建立users陣列,分配client資料物件的檔案描述符。利用它來進行索引使用者資料以及發資料
struct client_data* users = (struct client_data *)malloc(sizeof(struct client_data)*LIMIT_FD);
//client_data users[LIMIT_FD];
//雖然有足夠多的client_data,但是依然要限制使用者數量,事件最大
struct pollfd fds[LIMIT_USER+1];
int user_count = 0;
int i = 0;
for(i = 1; i <= LIMIT_USER; ++i)
{
fds[i].fd = -1;
fds[i].events = 0;
}
fds[0].fd = listen_sock;//設定監聽埠
fds[0].events = POLLIN|POLLERR;//監聽埠設定可讀和錯誤事件
fds[0].revents = 0;
while(1)
{
//永遠等待,當準備好再去提交給應用程式。
int ret = poll(fds, user_count+1, -1);
if(ret < 0)
{
printf("poll faile\n");
break;
}
for(i = 0; i < user_count+1; ++i)
{
//此時為監聽套接字,有新連線來,監聽套接字接受到可讀事件
if((fds[i].fd == listen_sock) && (fds[i].revents & POLLIN))
{
struct sockaddr_in peer;
socklen_t peer_len = sizeof(peer);
int sock = accept(listen_sock,\
(struct sockaddr *)&peer,&peer_len);
if(sock < 0)
{
perror("accept");
continue;
}
printf("new user :ip:%s,port:%d\n",inet_ntoa(peer.sin_addr),ntohs(peer.sin_port));
// 如果請求太多,則關閉請求連線。
if(user_count >= LIMIT_USER)
{
const char *msg = "Too many users!!!\n";
printf("%s",msg);
write(sock, msg, sizeof(msg) );
close(sock);
continue;
}
//相對於新連線,同時去修改fds,和users陣列對應連線的檔案描述符sock的客戶資料。
user_count++;
users[sock].address = peer;
//設定非阻塞
setnoblocking(sock);
fds[user_count].fd = sock;
fds[user_count].events = POLLIN | POLLERR | POLLRDHUP;
fds[user_count].revents = 0;
printf("come a new user, now have %d users\n",user_count);
}
//對於出現錯誤資訊
else if(fds[i].revents & POLLERR)
{
printf("get an error from %d\n",fds[i].fd);
//...
continue;
}
//如果客戶端關閉連線。此時檢測到客戶端斷開的請求,所以這個時候觸發這個事件
else if(fds[i].revents & POLLRDHUP)
{
//伺服器也許要關閉連線,並且把user_count減1
//這裡的減相當於去移動了檔案描述符,把最大的放到了需要減的那個了。
//
users[fds[i].fd] = users[fds[user_count].fd];
close(fds[i].fd);
fds[i] = fds[user_count];
i--;
user_count--;
printf("a client left\n");
}
//連線套接字可讀
else if(fds[i].revents & POLLIN)
{
int sock = fds[i].fd;
memset(users[sock].buf, 0,sizeof(users[sock].buf));
ret = read(sock, users[sock].buf, sizeof(users[sock].buf) - 1);
printf("client :%s\n",users[sock].buf);
if(ret > 0)
{
//收到客戶資料,此時通知其他的socket接受資料
users[sock].buf[ret] = 0;
int j = 0;
for(j = 1; j <= user_count; ++j)
{
if(fds[j].fd == sock)
{
continue;
}
fds[j].events |= ~POLLIN;
fds[j].events |= POLLOUT;
users[fds[j].fd].write_buf = users[sock].buf;
}
}
else if(ret < 0)
{
//讀取錯誤,關閉連線
if(errno != EAGAIN)
{
perror("read");
close(sock);
users[fds[i].fd] = users[fds[user_count].fd];
fds[i] = fds[user_count];
i--;
user_count--;
}
}
}
else if(fds[i].revents & POLLOUT)
{
//連線套接字可寫
int sock =fds[i].fd;
//判斷是否可寫
if(! users[sock].write_buf)
{
continue;
}
ret = write(sock,users[sock].write_buf,\
BUF_SIZE-1);
users[sock].write_buf = NULL;
//寫完以後重新註冊fds[i]的可讀事件
fds[i].events |= ~POLLOUT;
fds[i].events |= POLLIN;
}
}
}
close(listen_sock);
return 0;
}