1. 程式人生 > >epoll反應堆及ET模式下的EPOLLOUT學習總結

epoll反應堆及ET模式下的EPOLLOUT學習總結

學習epoll反應堆發現網上的epoll反應堆都是同一份程式碼框架…
自己理解、梳理一遍,思路在註釋裡

#include <stdlib.h>
#include <stdio.h> 
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <time.h> #define MAX_EVENTS 1024 #define BUFLEN 128 #define SERV_PORT 8080 /* * status:1表示在監聽事件中,0表示不在 * last_active:記錄最後一次響應時間,做超時處理 */ struct myevent_s { int fd; //cfd listenfd int events; //EPOLLIN EPLLOUT void *arg; //指向自己結構體指標 void
(*call_back)(int fd, int events, void *arg); int status; char buf[BUFLEN]; int len; long last_active; }; int g_efd; /* epoll_create返回的控制代碼 */ struct myevent_s g_events[MAX_EVENTS+1]; /* +1 最後一個用於 listen fd */ /* struct epoll_event 結構體epoll_event被用於註冊所感興趣的事件和回傳所發生待處理的事件,定義如下: struct epoll_event { __uint32_t events; epoll event epoll_data_t data; User data variable }; typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; //儲存觸發事件的某個檔案描述符相關的資料 */
//初始化myevent_s型別 void eventset(struct myevent_s* ev, int fd, void (*call_back)(int, int, void *), void* arg) { ev->fd = fd; ev->call_back = call_back; ev->events = 0; ev->arg = arg; ev->status = 0; // memset(ev->buf, 0, sizeof(ev->buf)); 本模型將收到的資料再發回去,所以不memset // ev->len = 0; 這個成員表示recv的資料長度,但是在邏輯裡也沒用到 //而且加上這一句在我的測試中只要客戶端傳送兩條資料之後就會被close fd //沒明白為什麼...可能這就是傳說中的那種接手後發現程式碼裡有一段 //留下來的註釋:“我也不知道這段程式碼有什麼用,但是沒有就會bug”的程式碼 ev->last_active = time(NULL); return; } void recvdata(int fd, int events, void *arg); void senddata(int fd, int events, void *arg); //將fd新增到epoll註冊的事件合集 void eventadd(int efd, int events, struct myevent_s *ev) { struct epoll_event epv = {0, {0}}; int op; epv.data.ptr = ev; epv.events = ev->events = events; if (ev->status == 1) { op = EPOLL_CTL_MOD; } else { op = EPOLL_CTL_ADD; ev->status = 1; } if (epoll_ctl(efd, op, ev->fd, &epv) < 0) printf("event add failed [fd=%d], events[%d]\n", ev->fd, events); else printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events); return; } //從事件合集中刪除 void eventdel(int efd, struct myevent_s *ev) { struct epoll_event epv = {0, {0}}; if (ev->status != 1) { return ; } epv.data.ptr = ev; ev->status = 0; epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); return; } //當有新的連線時,呼叫監聽listenfd的此回撥函式,指定新sockfd的回撥函式 void acceptconn(int lfd, int events, void *arg) { printf("****************************\n"); struct sockaddr_in cin; socklen_t len = sizeof(cin); int cfd, i; if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { /* 暫時不做出錯處理 */ } printf("%s: accept, %s\n", __func__, strerror(errno)); return; } do { for (i = 0; i < MAX_EVENTS; i++) { if (g_events[i].status == 0) { break; } } if (i == MAX_EVENTS) { printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS); break; } int flag = 0; if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno)); break; } eventset(&g_events[i], cfd, recvdata, &g_events[i]); eventadd(g_efd, EPOLLIN, &g_events[i]); } while(0); printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i); return; } void recvdata(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s *)arg; int len; len = recv(fd, ev->buf, sizeof(ev->buf), 0); eventdel(g_efd, ev); //接受完資料 刪除該監聽事件 if (len > 0) { ev->len = len; ev->buf[len] = '\0'; printf("C[%d]:%s\n", fd, ev->buf); /* 轉換為傳送事件 */ eventset(ev, fd, senddata, ev); eventadd(g_efd, EPOLLOUT, ev); //再把其新增到監聽事件中,此時回撥函式改為了senddata //將socket事件修改為EPOLLOUT用於伺服器傳送訊息給客戶端 } else if (len == 0) { close(ev->fd); /* ev-g_events 地址相減得到偏移元素位置 */ printf("[fd=%d] pos[%d], closed\n", fd, (int)(ev - g_events)); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } return; } void senddata(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s *)arg; int len; len = send(fd, ev->buf, ev->len, 0); printf("fd=%d\tev->buf=%s\ttev->len=%d\n", fd, ev->buf, ev->len); printf("send len = %d\n", len); eventdel(g_efd, ev); //刪除該事件 if (len > 0) { printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf); eventset(ev, fd, recvdata, ev); eventadd(g_efd, EPOLLIN, ev); //新增該事件,已更改回調函式為recvdata //修改socket事件為EPOLLIN用於接收客戶端發來的訊息 } else { close(ev->fd); printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return; } //初始化一個sockfd void initlistensocket(int efd, short port) { int lfd = socket(AF_INET, SOCK_STREAM, 0); fcntl(lfd, F_SETFL, O_NONBLOCK); eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); bind(lfd, (struct sockaddr *)&sin, sizeof(sin)); listen(lfd, 20); return; } int main(int argc, char *argv[]) { unsigned short port = SERV_PORT; if (argc == 2) port = atoi(argv[1]); g_efd = epoll_create(MAX_EVENTS+1); if (g_efd <= 0) printf("create efd in %s err %s\n", __func__, strerror(errno)); initlistensocket(g_efd, port); /* 事件迴圈 */ struct epoll_event events[MAX_EVENTS+1]; printf("server running:port[%d]\n", port); int checkpos = 0, i; while (1) { /* 超時驗證,每次測試100個連結,不測試listenfd 當客戶端60秒內沒有和伺服器通訊,則關閉此客戶端連結 */ long now = time(NULL); for (i = 0; i < 100; i++, checkpos++) { if (checkpos == MAX_EVENTS) { checkpos = 0; } if (g_events[checkpos].status != 1) { continue; } long duration = now - g_events[checkpos].last_active; if (duration >= 60) { close(g_events[checkpos].fd); printf("[fd=%d] timeout\n", g_events[checkpos].fd); eventdel(g_efd, &g_events[checkpos]); } } /* 等待事件發生 */ int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000); if (nfd < 0) { printf("epoll_wait error, exit\n"); break; } for (i = 0; i < nfd; i++) { struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //可讀 ev->call_back(ev->fd, events[i].events, ev->arg); } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //可寫 ev->call_back(ev->fd, events[i].events, ev->arg); } } } /* 退出前釋放所有資源 */ return 0; }
  • 為何使用epoll反應堆
    開發效率高,是從軟體工程層面考慮的…
    看來我一時半會是無法理解了,以後慢慢體會吧
    具體可以檢視這篇部落格
https://blog.csdn.net/russell_tao/article/details/17452997

epoll的ET模式

  • ET模式的注意事項
    使用ET模式期望使用非阻塞套接字,這樣可以避免一個控制代碼的阻塞而影響到處理多個檔案描述符的任務。

在ET模式下,epoll只在事件發生時發出通知,沒有新的事件,即使你上一次的沒有處理完,也不會再通知了。

比如,ET模式下accept存在的問題,考慮這種情況:多個連線同時到達,伺服器的TCP就緒佇列瞬間積累多個就緒連線,由於是邊緣觸發模式,epoll只會通知一次,accept只處理一個連線,導致TCP就緒佇列中剩下的連線都得不到處理。

解決辦法是用while迴圈抱住accept呼叫,處理完TCP就緒佇列中的所有連線後再退出迴圈。如何知道是否處理完就緒佇列中的所有連線呢?accept返回-1並且errno設定為EAGAIN就表示所有連線都處理完。

  • 在epoll的ET模式下,正確的讀寫方式為:
    讀:只要可讀,就一直讀,直到返回0,或者 errno = EAGAIN
    寫:只要可寫,就一直寫,直到資料傳送完,或者 errno = EAGAIN 下次再寫
  • EPOLLOUT事件觸發條件
    EPOLLOUT事件只有在連線時觸發一次,表示可寫,其他時候想要觸發,那你要先準備好下面條件:
    1.某次write,寫滿了傳送緩衝區,返回錯誤碼為EAGAIN。
    2.對端讀取了一些資料,又重新可寫了,此時會觸發EPOLLOUT。
    但在你的epoll的邏輯處理中,當有新的sockfd請求連線時,觸發的是EPOLLIN,表示你的監聽listenfd上有可讀事件,這個EPOLLOUT在連線時觸發,是因為socket設定為非阻塞,他會立即返回,那我們不知道他是否連線成功了,所以,當有EPOLLOUT被觸發,自然就是連線成功了,反之EPOLLOUT一直沒有觸發,自然就是連線失敗了