libevent 閱讀記錄二
libevent大概框架已經瞭解,現在我們通過libevent封裝的epoll瞭解一下libevent的I/O模型。
epollop結構體
1 struct epollop { 2 struct evepoll *fds; //檔案描述符事件陣列,每個檔案描述符對應的事件,這裡將讀寫事件分開了。記錄所有需要監聽的事件/* due to limitations in the epoll interface, we need to keep track of * all file descriptors outself. */
3 int nfds; //檔案描述符數量 4 struct epoll_event *events; //epoll的事件結構陣列typedef union epoll_data { void *ptr;
struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ } __EPOLL_PACKED;
5 int nevents; //可以快取監聽的events的數量 6 int epfd; //epoll_create返回的epoll控制代碼 7 };
我們仍然找一個demo,從demo的呼叫方式來分析libevent的epoll工作方式,參考libevent在非同步socket中的使用
第一個例子,雖然有些缺憾,但勝在短小,方便理解,後面可以在去看後面2個例子。由於libevent的signal是嵌入到I/O模型中的,和前面一樣,這裡跳過signal部分,後面再說。
1 /* Port to listen on. */ 2 #define SERVER_PORT 5555 3 4 /** 5 * 這個結構是指定的客戶端資料,這個例子中只指定了讀事件物件。 6 */ 7 struct client 8 { 9 struct event ev_read; 10 }; 11 12 /** 13 * 將一個socket設定成非阻塞模式 14 */ 15 int 16 setnonblock(int fd) 17 { 18 int flags; 19 20 flags = fcntl(fd, F_GETFL); 21 if (flags < 0) 22 return flags; 23 flags |= O_NONBLOCK; 24 if (fcntl(fd, F_SETFL, flags) < 0) 25 return -1; 26 27 return 0; 28 } 29 30 /** 31 * 這個函式當客戶端的socket可讀時由libevent呼叫 32 */ 33 void 34 on_read(int fd, short ev, void *arg) 35 { 36 struct client *client = (struct client *)arg; 37 u_char buf[8196]; 38 int len, wlen; 39 40 len = read(fd, buf, sizeof(buf)); 41 if (len == 0) 42 { 43 /* 客戶端斷開連線,在這裡移除讀事件並且釋放客戶資料結構 */ 44 printf("Client disconnected.\n"); 45 close(fd); 46 event_del(&client->ev_read); 47 free(client); 48 return; 49 } 50 else if (len < 0) 51 { 52 /* 出現了其它的錯誤,在這裡關閉socket,移除事件並且釋放客戶資料結構 */ 53 printf("Socket failure, disconnecting client: %s", 54 strerror(errno)); 55 close(fd); 56 event_del(&client->ev_read); 57 free(client); 58 return; 59 } 60 61 62 63 /* 為了簡便,我們直接將資料寫回到客戶端。通常我們不能在非阻塞的應用程式中這麼做, 64 * 我們應該將資料放到佇列中,等待可寫事件的時候再寫回客戶端。 */ 65 wlen = write(fd, buf, len); 66 if (wlen < len) 67 { 68 /* 我們沒有把所有資料都寫回。如果我們有適當的佇列/快取機制, 69 * 我們能夠在再次的可寫事件中再次寫入剩餘的資料。因為這是一個簡單的例子, 70 * 我們僅僅捨去了沒有寫入的資料。 */ 71 printf("Short write, not all data echoed back to client.\n"); 72 } 73 } 74 75 /** 76 * 當有一個連線請求準備被接受時,這個函式將被libevent呼叫。 */ 77 void 78 on_accept(int fd, short ev, void *arg) 79 { 80 int client_fd; 81 struct sockaddr_in client_addr; 82 socklen_t client_len = sizeof(client_addr); 83 struct client *client; 84 85 /* 接受新的連線 */ 86 client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); 87 if (client_fd == -1) 88 { 89 warn("accept failed"); 90 return; 91 } 92 93 /* 設定客戶端socket為非阻塞模式。 */ 94 if (setnonblock(client_fd) < 0) 95 warn("failed to set client socket non-blocking"); 96 97 /* 我們接受了一個新的客戶,分配一個新的客戶資料結構物件來儲存這個客戶的狀態。 */ 98 client = calloc(1, sizeof(*client)); 99 if (client == NULL) 100 err(1, "malloc failed"); 101 102 /* 設定讀事件,libevent將在客戶端socket可讀時呼叫on_read函式。 103 * 我們也會我們也會不斷的響應讀事件,所以我們不用在每次讀取時再次新增讀事件。 */ 104 event_set(&client->ev_read, client_fd, EV_READ|EV_PERSIST, on_read, 105 client); 106 107 /* 設定的事件並沒有啟用,使用新增事件讓其啟用。 */ 108 event_add(&client->ev_read, NULL); 109 110 printf("Accepted connection from %s\n", 111 inet_ntoa(client_addr.sin_addr)); 112 } 113 114 int 115 main(int argc, char **argv) 116 { 117 int listen_fd; 118 struct sockaddr_in listen_addr; 119 int reuseaddr_on = 1; 120 121 /* 接受連線請求的事件物件。 */ 122 struct event ev_accept; 123 124 /* 初始化 libevent. */ 125 event_init(); 126 127 /* 建立我們監聽的socket。 */ 128 listen_fd = socket(AF_INET, SOCK_STREAM, 0); 129 if (listen_fd < 0) 130 err(1, "listen failed"); 131 if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, 132 sizeof(reuseaddr_on)) == -1) 133 err(1, "setsockopt failed"); 134 memset(&listen_addr, 0, sizeof(listen_addr)); 135 listen_addr.sin_family = AF_INET; 136 listen_addr.sin_addr.s_addr = INADDR_ANY; 137 listen_addr.sin_port = htons(SERVER_PORT); 138 if (bind(listen_fd, (struct sockaddr *)&listen_addr, 139 sizeof(listen_addr)) < 0) 140 err(1, "bind failed"); 141 if (listen(listen_fd, 5) < 0) 142 err(1, "listen failed"); 143 144 /* 設定socket為非阻塞模式,使用libevent程式設計這是必不可少的。 */ 145 if (setnonblock(listen_fd) < 0) 146 err(1, "failed to set server socket to non-blocking"); 147 148 /* 我們現在有了一個監聽的socket,我們建立一個讀事件,當有客戶連線時,接收通知。 */ 149 event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL); //用epoll_wait的返回資訊也可以完成監聽連線請求,但是libevent封裝的epoll並不理會連線請求,因此響應連線要我們自己處理。
libevent的epoll_wait只處理異常,超時,及關心的事件發生。其中關心的事件只處理讀事件,寫事件,EPOLLERR(對應的檔案描述符發生錯誤),
EPOLLHUP(對應的檔案描述符被結束通話)。
這裡是將監聽連線註冊到epoll中的讀事件並持久化,當收到資訊後呼叫on_accept接受連線,併為新連線上來的socket註冊讀事件。 150 event_add(&ev_accept, NULL); 151 152 /* 開始 libevent 的事件迴圈。 */ 153 event_dispatch(); 154 155 return 0; 156 }
1.前一篇提到過,在event_base_new(void)過程中通過呼叫base->evsel->init(base)我們進行了I/O模型的初始化,在epoll中就是呼叫的epoll_init(struct event_base *base)
1 epoll_init(struct event_base *base) 2 { 3 int epfd; //epoll_create返回的epoll控制代碼,這個控制代碼會佔用一個fd值。 4 struct epollop *epollop; //指向本篇開始提到的結構體的指標 5 6 /* Disable epollueue when this environment variable is set */ 7 if (evutil_getenv("EVENT_NOEPOLL")) 8 return (NULL); 9 10 /* Initalize the kernel queue */ 11 if ((epfd = epoll_create(32000)) == -1) { 12 if (errno != ENOSYS) 13 event_warn("epoll_create"); 14 return (NULL); 15 } 16 17 FD_CLOSEONEXEC(epfd); //從定義來看是說在子程序中不能使用這個控制代碼,也就是子程序不能用這個epoll例項#ifdef HAVE_SETFD #define FD_CLOSEONEXEC(x) do { \ if (fcntl(x, F_SETFD, 1) == -1) \ event_warn("fcntl(%d, F_SETFD)", x); \ } while (0) #else #define FD_CLOSEONEXEC(x) #endif
18 19 if (!(epollop = calloc(1, sizeof(struct epollop)))) 20 return (NULL); 21 22 epollop->epfd = epfd; 23 24 /* Initalize fields */ 25 epollop->events = malloc(INITIAL_NEVENTS * sizeof(struct epoll_event)); 26 if (epollop->events == NULL) { 27 free(epollop); 28 return (NULL); 29 } 30 epollop->nevents = INITIAL_NEVENTS; 31 32 epollop->fds = calloc(INITIAL_NFILES, sizeof(struct evepoll)); 33 if (epollop->fds == NULL) { 34 free(epollop->events); 35 free(epollop); 36 return (NULL); 37 } 38 epollop->nfds = INITIAL_NFILES; 39 40 evsignal_init(base); //訊號下一篇在說,跳過訊號相關部分 41 42 return (epollop); 43 }
2.event_add(struct event *ev, const struct timeval *tv)中,只要事件是讀|寫|訊號,並且不是已註冊或已啟用事件,都將呼叫對應的add註冊成對應I/O模型的事件。這裡包含訊號前面已經提到過,是因為訊號嵌入到了I/O模型中,這一篇不討論訊號。
在epoll模型中就是epoll_add(void *arg, struct event *ev)
1 static int 2 epoll_add(void *arg, struct event *ev) 3 { 4 struct epollop *epollop = arg; 5 struct epoll_event epev = {0, {0}}; 6 struct evepoll *evep; 7 int fd, op, events; 8 9 if (ev->ev_events & EV_SIGNAL) //如果事件型別是訊號,註冊訊號事件 10 return (evsignal_add(ev)); 11 12 fd = ev->ev_fd; 13 if (fd >= epollop->nfds) { //如果這個要註冊事件的檔案描述符值超過已分配的檔案描述符陣列長度,呼叫realloc擴充epollop->fds的分配記憶體×2,直到這個值小於陣列長度。將擴充部分置為0 14 /* Extent the file descriptor array as necessary */ 15 if (epoll_recalc(ev->ev_base, epollop, fd) == -1) 16 return (-1); 17 } 18 evep = &epollop->fds[fd]; 19 op = EPOLL_CTL_ADD; 20 events = 0; 21 if (evep->evread != NULL) { //該檔案描述符已經存在讀事件,修改時保留讀事件 22 events |= EPOLLIN; 23 op = EPOLL_CTL_MOD; 24 } 25 if (evep->evwrite != NULL) { //該檔案描述符已經存在寫事件,修改時保留寫事件 26 events |= EPOLLOUT; 27 op = EPOLL_CTL_MOD; 28 } 29 30 if (ev->ev_events & EV_READ) //註冊事件如果需要註冊讀事件,為該檔案描述符新增讀事件 31 events |= EPOLLIN; 32 if (ev->ev_events & EV_WRITE) //註冊事件如果需要註冊寫事件,為該檔案描述符新增寫事件 33 events |= EPOLLOUT; 34 35 epev.data.fd = fd; //監聽的檔案描述符 36 epev.events = events; //需要監聽改檔案描述符發生的事件 37 if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -1) //將該檔案描述符需要監聽的事件註冊到epoll 38 return (-1); 39 40 /* Update events responsible */ 41 if (ev->ev_events & EV_READ) 42 evep->evread = ev; //表明該檔案描述符有讀事件 43 if (ev->ev_events & EV_WRITE) 44 evep->evwrite = ev; //表明該檔案描述符有寫事件 45 46 return (0); 47 }
3.事件註冊完成後,event_base_loop()中監聽這些事件event_dispatch(struct event_base *base, void *arg, struct timeval *tv),
對於epoll模型,就是epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
1 static int 2 epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv) 3 { 4 struct epollop *epollop = arg; 5 struct epoll_event *events = epollop->events; 6 struct evepoll *evep; 7 int i, res, timeout = -1; 8 9 if (tv != NULL) 10 timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000; //超時時間,單位ms,向上取整 11 12 if (timeout > MAX_EPOLL_TIMEOUT_MSEC) { //超時時間超過linux核心可以等待的最長時間,取最長時間 13 /* Linux kernels can wait forever if the timeout is too big; 14 * see comment on MAX_EPOLL_TIMEOUT_MSEC. */ 15 timeout = MAX_EPOLL_TIMEOUT_MSEC; 16 } 17 18 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); 19 20 if (res == -1) { //返回-1,說明epoll_wait產生異常 21 if (errno != EINTR) { //在超時或監聽到事件發生前收到的不是中斷訊號 22 event_warn("epoll_wait"); 23 return (-1); 24 } 25 26 evsignal_process(base); //訊號處理,此處不說明 27 return (0); 28 } else if (base->sig.evsignal_caught) { //監聽的某種訊號產生並處理,此處不說明 29 evsignal_process(base); 30 } 31 32 event_debug(("%s: epoll_wait reports %d", __func__, res)); 33 34 for (i = 0; i < res; i++) { 35 int what = events[i].events; 36 struct event *evread = NULL, *evwrite = NULL; 37 int fd = events[i].data.fd; //發生該事件的檔案描述符 38 39 if (fd < 0 || fd >= epollop->nfds) 40 continue; 41 evep = &epollop->fds[fd]; //發生該事件的檔案描述符的事件型別(2個事件型別讀、寫) 42 43 if (what & (EPOLLHUP|EPOLLERR)) { //該檔案描述符被結束通話或發生錯誤(讀事件應該是用來登出該檔案描述符上的事件,這裡不明白啟用寫事件是要幹嘛) 44 evread = evep->evread; //如果有讀事件,記錄下來 45 evwrite = evep->evwrite; //如果有寫事件,記錄下來 46 } else { 47 if (what & EPOLLIN) { //該檔案描述符可以讀(包括對端SOCKET正常關閉) 48 evread = evep->evread; //記錄這個讀事件 49 } 50 51 if (what & EPOLLOUT) { //檔案描述符可以寫 52 evwrite = evep->evwrite; //記錄這個寫事件 53 } 54 } 55 56 if (!(evread||evwrite)) //如果沒有讀或寫事件發生,跳過該檔案描述符的事件 57 continue; 58 59 if (evread != NULL) //如果前面記錄了讀事件,將該事件插入到啟用連結串列,且呼叫事件回撥次數設為1 60 event_active(evread, EV_READ, 1); 61 if (evwrite != NULL) //如果前面記錄了寫事件,將該事件插入到啟用連結串列,且呼叫事件回撥次數設為1 62 event_active(evwrite, EV_WRITE, 1); 63 } 64 65 if (res == epollop->nevents && epollop->nevents < MAX_NEVENTS) { //監聽到的事件數量達到了分配的快取上限,但沒有達到MAX_NEVENTS,說明我們的快取陣列不夠用了,擴充快取監聽事件的陣列 66 /* We used all of the event space this time. We should 67 be ready for more events next time. */ 68 int new_nevents = epollop->nevents * 2; 69 struct epoll_event *new_events; 70 71 new_events = realloc(epollop->events, 72 new_nevents * sizeof(struct epoll_event)); 73 if (new_events) { 74 epollop->events = new_events; 75 epollop->nevents = new_nevents; 76 } 77 } 78 79 return (0); 80 }
epoll到這裡差不多結束了,處理已啟用事件是在event框架的event_dispatch中,不屬於I/O模型的部分,epoll模型還剩下注銷事件,登出epoll這個模型例項。
epoll的監聽事件一般都是持久化的(如果不是持久化的,參考上一篇最後),需要登出監聽事件一般在客戶端斷開連線(讀事件讀到0個位元組),而登出epoll模型例項,釋放epoll的記憶體,意味著我們沒有I/O模型和訊號管理可用了,還只剩下計時器事件可用,功能基本也就結束了。
所以這裡最後看看epoll登出事件epoll_del是怎麼做的,登出epoll模型例項epoll_dealloc就省略了。
1 static int 2 epoll_del(void *arg, struct event *ev) 3 { 4 struct epollop *epollop = arg; 5 struct epoll_event epev = {0, {0}}; 6 struct evepoll *evep; 7 int fd, events, op; 8 int needwritedelete = 1, needreaddelete = 1; 9 10 if (ev->ev_events & EV_SIGNAL) //如果該事件是訊號,恢復該訊號的到註冊之前的預設處理。 11 return (evsignal_del(ev)); 12 13 fd = ev->ev_fd; 14 if (fd >= epollop->nfds) //該檔案描述符不在事件佇列中,忽略 15 return (0); 16 evep = &epollop->fds[fd]; 17 18 op = EPOLL_CTL_DEL; 19 events = 0; 20 21 if (ev->ev_events & EV_READ) 22 events |= EPOLLIN; 23 if (ev->ev_events & EV_WRITE) 24 events |= EPOLLOUT; 25 26 if ((events & (EPOLLIN|EPOLLOUT)) != (EPOLLIN|EPOLLOUT)) { //如果該檔案描述符上的事件不是同時有讀和寫事件 27 if ((events & EPOLLIN) && evep->evwrite != NULL) { //這是什麼情況?理解不能 28 needwritedelete = 0; 29 events = EPOLLOUT; 30 op = EPOLL_CTL_MOD; 31 } else if ((events & EPOLLOUT) && evep->evread != NULL) { 32 needreaddelete = 0; 33 events = EPOLLIN; 34 op = EPOLL_CTL_MOD; 35 } 36 } 37 38 epev.events = events; 39 epev.data.fd = fd; 40 41 if (needreaddelete) 42 evep->evread = NULL; //將該檔案描述符上的讀事件置為NULL 43 if (needwritedelete) 44 evep->evwrite = NULL; //將該檔案描述符上的寫事件置為NULL 45 46 if (epoll_ctl(epollop->epfd, op, fd, &epev) == -1) 47 return (-1); 48 49 return (0); 50 }