高階I/O複用技術:Epoll的使用及一個完整的C例項
高效能的網路伺服器需要同時併發處理大量的客戶端,而採用以前的那種對每個連線使用一個分開的執行緒或程序方法效率不高,因為處理大量客戶端的時候,資源的使用及程序上下文的切換將會影響伺服器的效能。一個可替代的方法是在一個單一的執行緒中使用非阻塞的I/O(non-blocking I/O)。
這篇文章主要介紹linux下的epoll(7)方法,其有著良好的就緒事件通知機制。我們將會使用C來展現一個完整的TCP伺服器實現程式碼。Epoll是被linux2.6開始引進的,但是不被其他的類UNIX系統支援,它提供了一種類似select或poll函式的機制:
1.
Select(2)只能夠同時管理FD_SETSIZE
數目的檔案描述符
2. poll(2)沒有固定的描述符上限這一限制,但是每次必須遍歷所有的描述符來檢查就緒的描述符,這個過程的時間複雜度為O(N)。
epoll沒有select這樣對檔案描述符上限的限制,也不會像poll那樣進行線性的遍歷。因此epoll處理大併發連線有著更高的效能。
Epoll相關操作函式介紹:
1. epoll_create(2) or epoll_create1(2)(有著不同的引數值)用來建立epoll例項。
/usr/include/sys/epoll.h extern int epoll_create (int __size) ; RETURN:>0, 成功;-1, 出錯
函式描述:
(1) epoll_create返回的是一個檔案描述符,也就是說epoll是以特殊檔案的方式體現給使用者
(2) __size提示作業系統,使用者可能要使用多少個檔案描述符,該引數已經廢棄,填寫一個大於0的正整數
2. epoll_ctl(2)用來增加或移除被epoll所監聽的檔案描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); RETURN:0,成功;-1,出錯
函式描述:
(1) epfd為epoll_create建立的epoll描述符
(2) epoll_ctl函式對epoll進行op型別的操作,op選項為
EPOLL_CTL_ADD,對fd描述符註冊event事件
EPOLL_CTL_MOD,對fd描述符的event事件進行修改
EPOLL_CTL_DEL,刪除已註冊的event事件
3. epoll_wait(2)用來等待發生在監聽描述符上的事件。它會一直阻塞直到事件發生。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); RETURN:>0,發生事件個數;=0,時間到;-1,出錯
函式描述:
epoll_wait與select函式類似,同步地等待事件發生
(1) epfd,標識epoll的檔案描述符
(2) events,指向傳入作業系統的一個epoll_event陣列
(3) maxevents,表示傳入陣列的大小,必須大於0
當有事件發生,Linux會填寫events結構,返回給應用程式。由於epoll_wait同步等待,有可能被訊號中斷,返回EINTR錯誤
更多的函式介紹請參照man。
Epoll的兩種模式:
1. 水平觸發(LT):使用此種模式,當資料可讀的時候,epoll_wait()將會一直返回就緒事件。如果你沒有處理完全部資料,並且再次在該epoll例項上呼叫epoll_wait()才監聽描述符的時候,它將會再次返回就緒事件,因為有資料可讀。ET只支援非阻塞socket。
2. 邊緣觸發(ET):使用此種模式,只能獲取一次就緒通知,如果沒有處理完全部資料,並且再次呼叫epoll_wait()的時候,它將會阻塞,因為就緒事件已經釋放出來了。
ET的效能更高,但是對程式設計師的要求也更高。在ET模式下,我們必須一次乾淨而徹底地處理完所有事件。LT兩種模式的socket都支援。
傳遞給epoll_ctl(2)的Epoll事件結構體如下所示:
typedefunionepoll_data { void*ptr; intfd; __uint32_t u32; __uint64_t u64; }epoll_data_t; structepoll_event { __uint32_t events;/* Epoll events */ epoll_data_t data;/* User data variable */ };
對於每一個監聽的描述符,能夠關聯一個整形資料或指向使用者資料的指標。
epoll的事件型別:
enum EPOLL_EVENTS { EPOLLIN = 0x001, #define EPOLLIN EPOLLIN EPOLLPRI = 0x002, #define EPOLLPRI EPOLLPRI EPOLLOUT = 0x004, #define EPOLLOUT EPOLLOUT EPOLLRDNORM = 0x040, #define EPOLLRDNORM EPOLLRDNORM EPOLLRDBAND = 0x080, #define EPOLLRDBAND EPOLLRDBAND EPOLLWRNORM = 0x100, #define EPOLLWRNORM EPOLLWRNORM EPOLLWRBAND = 0x200, #define EPOLLWRBAND EPOLLWRBAND EPOLLMSG = 0x400, #define EPOLLMSG EPOLLMSG EPOLLERR = 0x008, #define EPOLLERR EPOLLERR EPOLLHUP = 0x010, #define EPOLLHUP EPOLLHUP EPOLLRDHUP = 0x2000, #define EPOLLRDHUP EPOLLRDHUP EPOLLONESHOT = (1 << 30), #define EPOLLONESHOT EPOLLONESHOT EPOLLET = (1 << 31) #define EPOLLET EPOLLET };
– EPOLLIN,讀事件
– EPOLLOUT,寫事件
– EPOLLPRI,帶外資料,與select的異常事件集合對應
– EPOLLRDHUP,TCP連線對端至少寫寫半關閉
– EPOLLERR,錯誤事件
– EPOLLET,設定事件為邊沿觸發
– EPOLLONESHOT,只觸發一次,事件自動被刪除
epoll在一個檔案描述符上只能有一個事件,在一個描述符上新增多個事件,會產生EEXIST的錯誤。同樣,刪除epoll的事件,只需描述符就夠了
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
這裡有一個比較重要的問題:從epoll_wait返回的events中,該如何知道是哪個描述符上的事件:在註冊epoll事件的時候,一定要填寫epoll_data,否則我們將分不清觸發的是哪個描述符上的事件。
下面我們將實現一個輕型TCP伺服器,功能是在標準輸出中打印發送給套接字的一切資料。
/* * ===================================================================================== * * Filename: EpollServer.c * * Description: this is a epoll server example * * Version: 1.0 * Created: 2012年03月15日 20時24分26秒 * Revision: none * Compiler: gcc * * Author: LGP (), [email protected] * Company: * * ===================================================================================== */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/epoll.h> #include <sys/types.h> #include <sys/socket.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <netdb.h> /*struct addrinfo { int ai_flags; int ai_family; int ai_socktype; int ai_protocol; size_t ai_addrlen; struct sockaddr *ai_addr; char *ai_canonname; struct addrinfo *ai_next; }; */ static int create_and_bind(char* port) { struct addrinfo hints; struct addrinfo*result,*rp; int s,sfd; memset(&hints,0,sizeof(struct addrinfo)); hints.ai_family= AF_UNSPEC;/* Return IPv4 and IPv6 */ hints.ai_socktype= SOCK_STREAM;/* TCP socket */ hints.ai_flags= AI_PASSIVE;/* All interfaces */ s = getaddrinfo(NULL, port,&hints,&result); //more info about getaddrinfo() please see:man getaddrinfo! if(s != 0) { fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(s)); return -1; } for(rp= result;rp!= NULL;rp=rp->ai_next) { sfd = socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol); if(sfd==-1) continue; s =bind(sfd,rp->ai_addr,rp->ai_addrlen); if(s ==0) { /* We managed to bind successfully! */ break; } close(sfd); } if(rp== NULL) { fprintf(stderr,"Could not bind\n"); return-1; } freeaddrinfo(result); return sfd; } static int make_socket_non_blocking(int sfd) { int flags, s; flags = fcntl(sfd, F_GETFL,0); if(flags == -1) { perror("fcntl"); return-1; } flags|= O_NONBLOCK; s =fcntl(sfd, F_SETFL, flags); if(s ==-1) { perror("fcntl"); return-1; } return 0; } #define MAXEVENTS 64 int main(int argc,char*argv[]) { int sfd, s; int efd; struct epoll_event event; struct epoll_event* events; if(argc!=2) { fprintf(stderr,"Usage: %s [port]\n",argv[0]); exit(EXIT_FAILURE); } sfd = create_and_bind(argv[1]); if( sfd == -1 ) abort(); s = make_socket_non_blocking(sfd); if(s ==-1) abort(); s = listen(sfd, SOMAXCONN); if(s ==-1) { perror("listen"); abort(); } efd = epoll_create1(0); if(efd==-1) { perror("epoll_create"); abort(); } event.data.fd=sfd; event.events= EPOLLIN | EPOLLET; s =epoll_ctl(efd, EPOLL_CTL_ADD,sfd,&event); if(s ==-1) { perror("epoll_ctl"); abort(); } /* Buffer where events are returned */ events=calloc(MAXEVENTS,sizeof event); /* The event loop */ while(1) { int n,i; n =epoll_wait(efd, events, MAXEVENTS,-1); for(i=0;i< n;i++) { if((events[i].events & EPOLLERR)|| (events[i].events & EPOLLHUP)|| (!(events[i].events & EPOLLIN))) { /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ fprintf(stderr,"epoll error\n"); close(events[i].data.fd); continue; } else if(sfd == events[i].data.fd) { /* We have a notification on the listening socket, which means one or more incoming connections. */ while(1) { struct sockaddr in_addr; socklen_t in_len; int infd; char hbuf[NI_MAXHOST],sbuf[NI_MAXSERV]; in_len = sizeof in_addr; infd = accept(sfd,&in_addr,&in_len); if(infd==-1) { if((errno== EAGAIN)|| (errno== EWOULDBLOCK)) { /* We have processed all incoming connections. */ break; } else { perror("accept"); break; } } s =getnameinfo(&in_addr,in_len, hbuf,sizeof hbuf, sbuf,sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); if(s ==0) { printf("Accepted connection on descriptor %d " "(host=%s, port=%s)\n",infd,hbuf,sbuf); } /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */ s = make_socket_non_blocking(infd); if(s ==-1) abort(); event.data.fd=infd; event.events= EPOLLIN | EPOLLET; s = epoll_ctl(efd, EPOLL_CTL_ADD,infd,&event); if(s ==-1) { perror("epoll_ctl"); abort(); } } continue; } else { /* We have data on the fd waiting to be read. Read and display it. We must read whatever data is available completely, as we are running in edge-triggered mode and won't get a notification again for the same data. */ int done =0; while(1) { ssize_t count; char buf[512]; count = read(events[i].data.fd,buf,sizeof buf); if(count == -1) { /* If errno == EAGAIN, that means we have read all data. So go back to the main loop. */ if(errno!= EAGAIN) { perror("read"); done=1; } break; } else if(count ==0) { /* End of file. The remote has closed the connection. */ done=1; break; } /* Write the buffer to standard output */ s = write(1,buf, count); if(s ==-1) { perror("write"); abort(); } } if(done) { printf("Closed connection on descriptor %d\n",events[i].data.fd); /* Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. */ close(events[i].data.fd); } } } } free(events); close(sfd); return EXIT_SUCCESS; }
以下是使用c++對epoll簡單的封裝類:
/** * @file file.h * @comment * wrap of file descriptor * * @author niexw */ #ifndef _XCOM_FILE_H_ #define _XCOM_FILE_H_ #include <stdio.h> #include <fcntl.h> #include <sys/uio.h> #include <fcntl.h> #include "exception.h" #include "buffer.h" namespace unp { /** * @class File * @comment * wrap of file descriptor */ class File { protected: int fd_; public: // // construtor and destructor // File() : fd_(-1) {} explicit File(FILE *stream) : fd_(fileno(stream)) {} ~File() { close(); } int getFd() { return fd_; } int getFd() const { return fd_; } size_t read(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::read(fd_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t write(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::write(fd_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void close() { if (fd_ != -1) { ::close(fd_); fd_ = -1; } } void setNonblock() { int flags = fcntl(fd_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags |= O_NONBLOCK; flags = fcntl(fd_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } void clrNonblock() { int flags = fcntl(fd_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags &= ~O_NONBLOCK; flags = fcntl(fd_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } size_t readv(CircleBuffer &buf) { int ret; RETRY: if ((ret = ::readv(fd_, buf.idle_, buf.idlenum_)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } buf.afterRead(ret); return ret; } size_t writev(CircleBuffer &buf) { int ret; RETRY: if ((ret = ::writev(fd_, buf.data_, buf.datanum_)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } buf.afterWrite(ret); return ret; } void setFlag(int option) { int flags; RETRY: flags = fcntl(fd_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags |= option; RETRY1: int ret = fcntl(fd_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void clrFlag(int option) { int flags; RETRY: flags = fcntl(fd_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags &= ~option; RETRY1: int ret = fcntl(fd_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } }; /** * @class File2 * @comment * wrap of file descriptor */ class File2 { protected: int descriptor_; public: File2() : descriptor_(-1) { } explicit File2(FILE *stream) : descriptor_(fileno(stream)) { } explicit File2(File2 &f) : descriptor_(f.descriptor_) { f.descriptor_ = -1; } ~File2() { close(); } int descriptor() { return descriptor_; } size_t read(char *buf, size_t count) { int ret; RETRY: if ((ret = ::read(descriptor_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t write(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::write(descriptor_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void close() { if (descriptor_ != -1) { ::close(descriptor_); descriptor_ = -1; } } size_t readv(const struct iovec *iov, int cnt) { int ret; RETRY: if ((ret = ::readv(descriptor_, iov, cnt)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t writev(const struct iovec *iov, int cnt) { int ret; RETRY: if ((ret = ::writev(descriptor_, iov, cnt)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void setControlOption(int option) { int flags; RETRY: flags = fcntl(descriptor_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags |= option; RETRY1: int ret = fcntl(descriptor_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void clearControlOption(int option) { int flags; RETRY: flags = fcntl(descriptor_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags &= ~option; RETRY1: int ret = fcntl(descriptor_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void setNonblock() { int flags = fcntl(descriptor_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags |= O_NONBLOCK; flags = fcntl(descriptor_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } void clrNonblock() { int flags = fcntl(descriptor_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags &= ~O_NONBLOCK; flags = fcntl(descriptor_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } }; }; // namespace unp #endif // _XCOM_FILE_H_
/** * @file epoll.h * @comment * wrap of epoll * * @author niexw */ #ifndef _UNP_EPOLL_H_ #define _UNP_EPOLL_H_ #include <sys/epoll.h> #include <assert.h> #include <map> #include <strings.h> #include "file.h" namespace unp { /** * @class Epoll * @comment * wrap of epoll */ class Epoll : public File { public: Epoll() {} ~Epoll() {} struct Event : public epoll_event { Event() { events = EPOLLERR; data.u64 = 0; } Event(unsigned int type, void *magic) { events = type; data.ptr = magic; } }; int create() { if ((fd_ = epoll_create(1)) == -1) throw EXCEPTION(); return fd_; } void registerEvent(int fd, Event &event) { if (epoll_ctl(fd_, EPOLL_CTL_ADD, fd, &event) == -1) throw EXCEPTION(); } void modifyEvent(int fd, Event &event) { if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, &event) == -1) throw EXCEPTION(); } void unregisterEvent(int fd) { if (epoll_ctl(fd_, EPOLL_CTL_DEL, fd, NULL) == -1) throw EXCEPTION(); } int waitEvent(Event *events, int size, int msec) { int ret; assert(events != NULL); RETRY: if ((ret = epoll_wait(fd_, events, size, msec == -1 ? NULL : msec)) == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } return ret; } }; #include <iostream> using std::cout; using std::endl; class Epoll2 : public File { public: typedef void* (*Callback)(epoll_event &event, void *); protected: struct Event : public epoll_event { Callback func_; void *param_; Event() : func_(NULL), param_(NULL) { events = EPOLLERR; data.u64 = 0; } Event(unsigned int type) : func_(NULL), param_(NULL) { events = EPOLLERR | type; data.u64 = 0; } Event(unsigned int type, Callback func, void *param) : func_(func), param_(param) { events = EPOLLERR | type; data.u64 = 0; } }; typedef std::map<int, Event> Events; Events events_; epoll_event happens_[10]; int timeout_; Callback func_; void *param_; public: Epoll2() : timeout_(-1), func_(NULL), param_(NULL) { assert(sizeof(Events::iterator) == sizeof(void*)); fd_ = epoll_create(10); } Epoll2(int msec, Callback func) : timeout_(msec), func_(NULL), param_(NULL) { assert(sizeof(Events::iterator) == sizeof(void*)); fd_ = epoll_create(10); } ~Epoll2() { } void registerEvent(int fd, int option, Callback func, void *param) { Event event(option, func, param); std::pair<Events::iterator, bool> ret = events_.insert(std::pair<int, Event>(fd, event)); //ret.first->second.data.ptr = (void *)ret.first._M_node; bcopy(&ret.first, &ret.first->second.data.ptr, sizeof(void*)); if (epoll_ctl(fd_, EPOLL_CTL_ADD, fd, &ret.first->second) == -1) throw EXCEPTION(); } void setEventOption(int fd, int option) { Event *p = &events_[fd]; p->events = option | EPOLLERR; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void setEventOption(int fd, int option, Callback func, void *param) { Event *p = &events_[fd]; p->events = option | EPOLLERR; p->func_ = func; p->param_ = param; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void addEventOption(int fd, int option) { Event *p = &events_[fd]; p->events |= option; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void addEventOption(int fd, int option, Callback func, void *param) { Event *p = &events_[fd]; p->events |= option; p->func_ = func; p->param_ = param; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void clrEventOption(int fd, int option, Callback func, void *param) { Event *p = &events_[fd]; p->events &= ~option; p->func_ = func; p->param_ = param; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void clrEventOption(int fd, int option) { Event *p = &events_[fd]; p->events &= ~option; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void unregisterEvent(int fd) { events_.erase(fd); if (epoll_ctl(fd_, EPOLL_CTL_DEL, fd, NULL) == -1) throw EXCEPTION(); } void setTimeout(int msec, Callback func, void *param) { timeout_ = msec; func_ = func; param_ = param; } bool run() { int ret; RETRY: if ((ret = epoll_wait(fd_, happens_, 10, timeout_)) == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } for (int i = 0; i < ret; ++i) { Events::iterator it; bcopy(&happens_[i].data.ptr, &it, sizeof(void *)); //it._M_node = (std::_Rb_tree_node_base*)happens_[i].data.ptr; if (it->second.func_ != NULL) it->second.func_(happens_[i], it->second.param_); if (happens_[i].events & EPOLLERR) throw EXCEPTION(); } if (ret == 0 && func_ != NULL) func_(happens_[0], param_); return !events_.empty(); } }; }; // namespace unp #endif /* _UNP_EPOLL_H_ */