1. 程式人生 > 其它 >Linux I/O複用技術:select, poll, epoll使用區別

Linux I/O複用技術:select, poll, epoll使用區別

目錄

I/O複用簡介

傳統的程式設計模型中,要確定某個檔案描述符是否發生關心的事件,需要對其進行輪詢。一旦要監聽的檔案描述符數量眾多,可能會導致效率很低。

I/O 複用技術能有效減少需要輪詢的檔案描述符數量,將其縮減至1個,即I/O複用的系統呼叫本身,同時,程式也能監聽多個檔案描述符。這對提高程式效能很重要。I/O複用本身是阻塞的,並不能讓程式併發執行。但I/O複用通過監聽檔案描述符事件,如果事件就緒,就通知應用程式執行相應的處理流程;如果沒有就緒事件,就阻塞在等待事件就緒的一個select/poll/epoll 呼叫上,而不是每個檔案描述符,從而實現併發執行。

使用I/O複用技術的常見場景:

  • 客戶端程式要同時處理多個socket,比如非阻塞connect;
  • 客戶端程式要同時處理多個使用者輸入和網路連線,比如網路聊天室;
  • TCP伺服器要同時處理監聽socket和連線socket。這是I/O複用使用最多的場合;
  • 伺服器要同時處理TCP請求和UDP請求;
  • 伺服器要同時監聽多個埠,或者處理多種服務,比如xinetd伺服器;

簡單來說,I/O複用就是適合應用程式程式要同時處理多個IO事件,而IO事件通常也不是一次性完成的,需要一個過程來完成。

Linux上,實現I/O複用的系統呼叫有3個:select、poll、epoll。

檔案描述符就緒

談這3個I/O複用前,先了解一下什麼是檔案描述符就緒。哪些情況下檔案描述符就緒?
所謂檔案描述符就緒,是指檔案描述符對應檔案可讀、可寫或者出現異常。

例如,在網路程式設計中,下列情況下socket可讀:
1)socket 核心接收緩衝區的位元組數 >= 低水平位標記SO_RCVLOWAT。此時,可無阻塞read該socket,返回的位元組數>0;

2)socket 通訊對端關閉連線。此時,對該socket的read操作將返回0;

3)監聽socket 上有新的連線請求;

4)socket上有未處理的錯誤。可使用getsockopt來讀取和清除該錯誤(SO_ERROR);

下列情況下socket可寫:
1)socket 核心傳送緩衝區中可用位元組數 >= 低水平位標記SO_SNDLOWAT。此時,可無阻塞地write該socket,返回位元組數>0;

2)socket的寫操作被關閉。對寫操作被關閉的socket執行寫操作,將觸發一個SIGPIPE訊號;

3)socket使用非阻塞connect連線成功或失敗(超時)之後;

4)socket上有未處理的錯誤。此時,可用使用getsockopt來讀取和清除該錯誤(SO_ERROR);

socket能處理的異常情況只有一種:socket上收到帶外資料(out-of-band data)。

當然,I/O複用不僅用於監聽socket,還可以用於監聽外部裝置,本地管道、訊息佇列、UNIX Domain Socket(域套接字)、timerfd(Linux特有定時器)、eventfd(Linux特有事件通知)等等有對應fd存在的地方。

select系統呼叫

用途:在一段指定時間內,監聽使用者感興趣的檔案描述符上的可讀、可寫和異常事件。(3個檔案描述符集合,用陣列表示)

特點:
1)要設定監聽的fd時,分別設定3個集合:readfs、writefd、exceptfds,分別用於監聽讀事件、寫事件、異常事件 這3類事件。
2)用一組巨集定義FD_ZERO/FD_SET/FD_CLR/FD_ISSET,對監聽的fd進行操作。
3)每個while迴圈裡面,都需要為select重新設定監聽的3類集合。將fd集合從使用者態拷貝到核心態,在監聽的fd數量較多時,開銷也會比較大。
4)監聽的fd數量有上限限制(預設1024)。這是select使用場景的重要限制。
5)監聽到有就緒事件時,不知道具體是哪個,需要用FD_ISSET對所有fd逐個檢測,從而判斷具體是哪個fd發生就緒事件。這是select效能相比epoll較低重要原因。

select函式原型

/* According to POSIX.1-2001 */
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

#include <sys/select.h>

int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
  • 引數

1)ndfs: 指定被監聽的檔案描述符總數。通常被設定為select監聽的所有檔案描述符最大值 + 1,因為檔案描述符是從0開始計數的。

2)readfds, writefds, exceptfds: 分別指向可讀、可寫和異常等事件對於的檔案描述符集合。應用程式呼叫select函式時,通過這3個引數傳入自己感興趣的檔案描述符。select呼叫返回3個集合中檔案描述符就緒的總數,由核心修改,用於通知應用程式有就緒事件。這3個引數是fd_set*型別。

#include <typesizes.h>
#define __FD_SETSIZE 1024

#include <sys/select.h>
#define FD_SETSIZE  __FD_SETSIZE
typedef long int __fd_mask;
#define __NFDBITS (8 * (int) sizeof(__fd_mask) )       /* 1個long int型別數 8byte, 每1byte 8個bit */

typedef struct {
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];      /* 陣列長度(byte數)為 (fd最大值+1) / (8 * sizeof(long int) ) */
#define __FDS_BITS(set) ((set)->fds_bits)
} fd_set;

fd_set結構體包含一個整型陣列,該陣列的每個元素的每個bit位標記一個檔案描述符。fd_set能容納的檔案描述符數量由FD_SETSIZE指定,這也限制了select能同時處理的檔案描述符的總量。

由於操作繁瑣,可以使用下面一系列巨集來訪問fd_set結構體中的bit位:

#include <sys/select.h>

void FD_CLR(int fd, fd_set *set);     /* 清除set的所有位 */
int  FD_ISSET(int fd, fd_set *set);   /* 設定set的位fd */
void FD_SET(int fd, fd_set *set);     /* 清除set的位fd */
void FD_ZERO(fd_set *set);            /* 測試set的位fd是否被設定 */

3)timeout 用於設定select()超時時間。timeval是值-結果引數,核心將修改它,以告訴應用程式select等待了多久。不過,不能完全信任select返回的timeout值,如呼叫失敗時timeout值不確定。

timeval結構體定義:

#include <sys/time.h>

struct timeval {
    long    tv_sec;         /* seconds */
    long    tv_usec;        /* microseconds */
};
  • 返回值

如果timeout.tv_sec和.tv_usec都傳遞0,則select將立即返回;如果timeout傳遞NULL,則select將一直阻塞,直到監聽的某個檔案描述符就緒。
select成功返回就緒(可讀、可寫、異常)檔案描述符的總數。
如果在超時時間內,沒有任何檔案描述符就緒,select將返回0;出錯時,返回-1並設定errno;
如果在select等待期間,程式接收到訊號,則select立即返回-1,並設定errno為EINTR。

select呼叫模型

int fd1, fd2, fd3, ..., fdn;
...
int max_fd = fdn; // 要監聽的檔案描述符中, 數值上最大值, 要求 < 1024 (0..1023)
struct timeval timeout = {sec, usec}; // select超時時間

while(1) {
    // 重新設定3個集合readfds, writefds, exceptionfds
    FD_SET(fdi1..fdi2, &readfds);
    FD_SET(fdj1..fdj2, &writefds);
    FD_SET(fdk1..fdk2, &exceptionfds);

    int n = select(max_fd + 1, readfds, writefds, exceptionfds, &timeout);
    if (n < 0) {
        // error
    }
    else {
        // 逐個檢測檔案描述符就緒事件, 如果檢測到監聽的事件發生, 就呼叫相應的處理事件程式碼
        if (FD_ISSET(fdi1, readfds)) {
           // 處理事件
        }
        ...
        if (FD_ISSET(fdi2, readfds)) {
            // 處理事件
        }

        if (FD_ISSET(fdj1, writefds)) {
            // 處理事件
        }
        ...
        if (FD_ISSET(fdj2, writefds)) {
            // 處理事件
        }

        if (FD_ISSET(fdk1, exceptionfds)) {
            // 處理事件
        }
        ...
        if (FD_ISSET(fdk2, exceptionfds)) {
            // 處理事件
        }
    }
}

示例:處理帶外資料

socket上接收到普通資料和帶外資料,都將使select返回,但socket處於不同的就緒狀態:前者處於可讀狀態,後者處於異常狀態。下面的程式描述了select如何同時處理二者:

只展示使用select的核心部分程式碼,詳細程式碼見Giteeselect_outoufband.c | Gitee地址

// from https://gitee.com/fortunely/linuxstudy/blob/master/advancedio/select/select_outoufband.c

int main()
{
    ...
    int connfd = accept(listenfd, ...);

    char buf[1024];
    fd_set read_fds;
    fd_set exception_fds;
    FD_ZERO(&read_fds);
    FD_ZERO(&exception_fds);

    while (1) {
        memset(buf, '\0', sizeof(buf));

        FD_SET(connfd, &read_fds);
        FD_SET(connfd, &exception_fds);

        ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL); // 有幾個事件(fd)就緒, select返回值就是多少
        if (ret < 0) {
            printf("select failure\n");
            break;
        }
        
        // 遍歷select監聽事件陣列, 判斷事件是否就緒, 如果就緒, 就處理事件

        /* 對於可讀事件,採用普通的recv函式讀取資料 */
        if (FD_ISSET(connfd, &read_fds)) {
            ret = recv(connfd, buf, sizeof(buf) - 1, 0);
            if (ret <= 0) break;
            buf[ret] = '\0'; /* buf末尾新增null終結符,轉化為字串 */
            printf("get %d bytes of normal data: %s\n", ret, buf);
        }
        /* 對於異常事件,採用MSG_OOB標誌的recv函式讀取帶外資料 */
        else if (FD_ISSET(connfd, &exception_fds)) {
            ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
            if (ret < 0)
                break;
            buf[ret] = '\0';
            printf("get %d bytes of oob data: %s\n", ret, buf);
        }
    }
    ...
}

POLL系統呼叫

poll類似於select,也是在指定時間內輪詢一定數量的檔案描述符,以測試其中是否有就緒者。

特點:
1)不受監聽的檔案描述符數量上限 (select是1024)的限制。
2)poll接受一個pollfd結構陣列作為要監聽的檔案描述符集合,以引數傳入。 pollfd結構包含要監聽的檔案描述符、事件型別,以及實際發生的就緒事件。
3)要監聽的陣列本身,不要每個迴圈都重新設定。但同select,每次迴圈都要將監聽的fd集合,作為poll引數,從使用者態傳拷貝到核心態。
4)就緒事件發生時,同select,要對每個監聽的檔案描述符逐一進行檢測。這是導致poll相比epoll更低效的重要原因。

poll函式原型

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

#define _GNU_SOURCE         /* See feature_test_macros(7) */
#include <poll.h>

int ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *timeout_ts, const sigset_t *sigmask);
  • 引數

1)fds pollfd結構的陣列,指定所有感興趣的檔案描述符上發生的可讀、可寫、異常事件。pollfd結構定義:

struct pollfd {
    int fd;        /* file descriptor */
    short events;  /* requested events */
    short revents; /* returned events */
}

poll支援的事件型別(pollfd.events/.revents支援的值):

事件 描述 可作為輸入? 可作為輸出?
POLLIN 資料(包括普通資料和優先資料)可讀
POLLRDNORM 普通資料可讀
POLLRDBAND 優先順序帶資料可讀(Linux不支援)
POLLPRI 高優先順序資料可讀,比如TCP帶外資料
POLLOUT 資料(包括普通資料和優先順序資料)可寫
POLLWRNORM 普通資料可寫
POLLWRBAND 優先順序帶資料可寫
PLLRDHUP TCP連線被對方關閉,或者對方關閉了寫操作。它由GNU引入
POLLERR 錯誤
POLLHUP 掛起。比如管道的寫端被關閉後,讀端描述符上將收到POLLHUP事件
POLLNVAL 檔案描述符沒有開啟

如何區分socket上接收到的,是有效資料,還是對方關閉連線的請求?
有兩種方式:
方式一:根據recv呼叫,返回值如果是0,說明對方關閉了連線請求。

方式二:Linux 2.6.17後,GNU為poll系統呼叫增加POLLRDHUP事件,在socket上接收到對方關閉連線的請求之後觸發。不過使用POLLRDHUP事件時,需要住程式碼的最開始處定義_GNU_SOURCE。

2)nfds 指定被監聽事件集合fds的大小。型別nfds_t定義:

typedef unsigned long int nfds_t;

3)timeout 指定poll的超時值,單位ms。當timeout為-1時,poll呼叫將永遠阻塞,直到某個事件發生;當timeout為0時,poll呼叫將立即返回。

  • 返回值
    含義同select返回值。

poll呼叫模型

struct pollfd *fds = (struct pollfd*)malloc(sizeof(struct pollfd) * fds_num);

// e.g.
fds[0].fd = STDIN_FILENO; // 要監聽的檔案描述符
fds[0].events = POLLIN;     // 設定請求事件
// 無需設定.revents, 該值由核心維護

// 設定 fds[1..fds_num-1] 檔案描述符及請求事件

int timeout = num; // 超時時間, 單位: ms

while (1) {
    do {
        int n = poll(fds, fds_num, num);
    } while(n == -1 && errno == EINTR); // 多設定一層do-while, 是為了訊號喚醒後能恢復poll呼叫

    if (n >) { // 監聽到有檔案描述符就緒, 對所有監聽事件逐一檢測
        if (fds[0].events == fds[0].revents) { // 只有請求事件與返回事件一致時, 才說明是poll監聽到的就緒事件
            // 處理事件
        }
        ...
        if (fds[fds_num - 1].events = fds[fds_num - 1].revents) {
            // 處理事件
        }
    }
}

free(fds);

poll示例:同時監聽鍵盤輸入事件和滑鼠移動事件

完整程式碼見 Gitee地址 poll.c | Gitee

// from https://gitee.com/fortunely/linuxstudy/blob/master/advancedio/poll/poll.c

int main() {
    int ret;
    char buf1[100];
    int buf2 = 0;
    struct pollfd fds[POLLFD_NUM];

    // 具體是哪個mouse, 可以cat /dev/input/mouse? 進行測試
    int mousefd = open("/dev/input/mouse1", O_RDONLY);
    
    fds[0].fd = STDIN_FILENO; // 標準輸入檔案描述符,程序啟動時已預設開啟
    fds[0].events = POLLIN; // 請求事件
    fds[1].fd = mousefd;
    fds[1].events = POLLIN; // 請求事件

    while (1) {
        do {
//            ret = poll(fds, POLLFD_NUM, -1); // 第三個引數timeout = -1 無限期等待
              ret = poll(fds, POLLFD_NUM, 3000); // 超時時間3000ms
        }while(ret == -1 && errno == EINTR);

        if (ret > 0) { // 有動靜的fd數量
            if (fds[0].events == fds[0].revents) {// 請求事件與返回的實際事件一致
                memset(buf1, 0, sizeof buf1);
                ret = read(fds[0].fd, buf1, sizeof buf1);
            }
            if (fds[1].events == fds[1].revents) {
                buf2 = 0;
                ret = read(fds[1].fd, &buf2, sizeof buf2); // 注意buf2是一個int變數,而非地址
            }
        }
        else if (ret == 0) printf("time out\n");
    }

    close(mousefd);
}

epoll系統呼叫

核心事件表

epoll 是Linux 特有I/O複用函式,實現上和使用上與select、poll有很大差異:
首先,epoll使用一組函式來完成任務,而不是單個函式。
其次,epoll把使用者關心的檔案描述符上的事件放在核心裡的一個事件表中,從而無須像select和poll那樣每次呼叫都要重複傳入檔案描述符集或事件集。但epoll需要使用一個額外的檔案描述符,來唯一標識核心中的這個事件表。

epoll函式組

epoll_create 函式

這個檔案描述符,如何建立?
使用如下epoll_create函式建立:

#include <sys/epoll.h>

int epoll_create(int size);
int epoll_create1(int flags);

size:現在不起作用,只是給核心一個提示,告訴它事件表需要多大。該函式返回的檔案描述符將用作其他所有epoll系統呼叫的第一個引數,以指定要訪問的核心事件表。

epoll_ctl 函式

epoll_ctl用來操作epoll的核心事件表:

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epfd:由epoll_create建立檔案描述符,對應核心中一個epoll監聽事件表。

fd:要操作的檔案描述符,op引數指定操作型別。操作型別有3種:

  • EPOLL_CTL_ADD 往事件表中註冊fd上的事件;
  • EPOLL_CTL_MOD 修改fd上的註冊事件;
  • EPOLL_CTL_DEL 刪除fd上的註冊事件;

event:指定要監聽的事件,是epoll_event結構型別指標。epoll_event定義:

struct epoll_event {
    __unit32_t events;  /* epoll事件型別 */
    epoll_data_t data;  /* 使用者資料 */
};

其中,events成員描述事件型別。epoll支援的事件型別和poll幾部相同。表示epoll事件型別的巨集是在poll對應的事件型別巨集前加“E”,比如epoll的資料可讀事件是EPOLLIN。但epoll有2個額外的事件型別:EPOLLET,EPOLLONESHOT。它們對於epoll的高效運作非常關鍵。data成員用於儲存使用者資料,其型別epoll_data_t定義:

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

epoll_data_t 是一個聯合體,其4個成員使用最多的是fd,它指定事件所從屬的目標檔案描述符。ptr成員可用來指定與fd相關的使用者資料。但由於epoll_data_t是一個聯合體,不能同時使用ptr成員和fd成員。

epoll_ctl成功時返回0,失敗返回-1並設定errno。

epoll_wait函式

epoll系列系統呼叫主要介面epoll_wait函式,它在一段超時時間內等待一組檔案描述符上的事件,其原型:

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);

成功時,返回就緒的檔案描述符的個數;失敗時,返回-1並設定errno。

timeout: 含義與poll介面的timeout引數相同;

maxevents: 指定最多監聽多少個事件,必須 > 0;

epoll_wait函式如果檢測到事件,就將所有就緒的事件從核心事件表(由epfd引數指定)中複製到它的第二個引數events指向的陣列中。該陣列只用於輸出epoll_wait檢測到的就緒事件,而不像select和poll的陣列那樣,既用於傳入使用者註冊事件,又用於輸出核心檢測到的就就緒事件。這樣,極大地提高了應用程式索引就緒檔案描述符的效率。

poll和epoll在使用上的差別:

/* 如何索引poll返回的就緒檔案描述符 */
int ret = poll(fds, MAX_EVENT_NUMBER, -1); /* 阻塞等待監聽檔案描述符對應事件 */

/* 遍歷所有已註冊檔案描述符,找到其中就緒者 */
for (int i = 0; i < MAX_EVENT_NUBMER; ++i) {
    if (fds[i].revents & POLLIN) { /* 判斷第i個檔案描述符是否就緒 */
        int sockfd = fds[i].fd;
        /* 處理sockfd */
    }
}

/* 如何索引epoll返回的就緒檔案描述符 */
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); /* 阻塞等待註冊的檔案描述符 */
/* 只需要遍歷就緒的ret檔案描述符 */
for (int i = 0; i < ret; i++) {
    int sockfd = events[i].data.fd;
    /* sockfd 肯定就緒, 直接處理 */
}

epoll呼叫模型:LT和ET模式

epoll對檔案描述符操作,有兩種模式:LT(Level Trigger,電平觸發)模式,ET(Edge Trigger,邊沿觸發)模式。LT模式是預設的工作模式,該模式下,epoll相當於一個效率較高的poll。當往epoll核心事件表中註冊一個檔案描述符上的EPOLLET事件時,epoll將以ET模式來操作該檔案描述符。

LT模式

採用LT工作模式的檔案描述符,當epoll_wait檢測到其上有事件發生並將事件通知應用程式後,應用程式可以不立即處理該事件。這樣,當應用程式下一次呼叫epoll_wait時,epoll_wait還會再次嚮應用程式通告此事件,直到該事件被處理。

ET模式

採用ET模式的檔案描述符,當epoll_wait檢測到其上有事件發生,並將事件通知應用程式後,應用程式必須立即處理該事件,因為後續的epoll_wait呼叫將不再向應用程式通知這一事件。

ET模式在很大程度上降低了同一個epoll事件被重複觸發的次數,因此ET模式效率比LT模式要高。

LT和ET模式伺服器呼叫例程

同樣,只展示部分關鍵程式碼。完整程式碼,詳見Gitee地址 epoll.c | Gitee地址

// from https://gitee.com/fortunely/linuxstudy/blob/master/advancedio/epoll/epoll.c

/* 將檔案描述符fd上的EPOLLIN註冊到epollfd指示的epoll核心事件表中,引數enable_et指定是否對fd啟用ET模式 */
void addfd(int epollfd, int fd, bool enable_et)
{
        struct epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN;
        if (enable_et) {
                event.events |= EPOLLET; // 注意: 這裡的.events 添加了EPOLLET標識, 表示對該事件啟動ET模式
        }
        epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
        setnonblocking(fd);
}

/* LT模式的工作流程 */
void lt(struct epoll_event* events, int number, int epollfd, int listenfd)
{
        char buf[BUFFER_SIZE];
        for (int i = 0; i < number; i++) {
                int sockfd = events[i].data.fd;
                if (sockfd == listenfd) { // 監聽連線到listenfd事件就緒, 說明有連線請求
                       // 處理連線就緒事件
                       ...
            int connfd = accept(listenfd, (SA *)&client_address, &client_addrlength); // accept新連線
            addfd(epollfd, connfd, false); /* 對connfd禁用ET mode */
                }
                else if (events[i].events & EPOLLN) { // 其他輸入事件
                        /* 只要 socket讀快取中還有未讀出的資料,這段程式碼就被觸發 */
                        memset(buf, '\0', BUFFER_SIZE);
                        int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                        ...
                }
                else {
                        printf("something else happened \n");
                }
        }
}

voi et (struct epoll_event *evets, int number, int epollfd, int listenfd)
{
        char buf[BUFFER_SIZE]; // 1024
        for (int i = 0 ; i < number; i++) {
                int sockfd = events[i].data.fd;
                if (sockfd == listenfd) {
                        struct sockaddr_in client_address;
                        socklen_t client_addrlength = sizeof(client_address);
                        addfd(epollfd, connfd, true); /* 對connfd開啟ET mode */
                }
                else if(events[i].event & EPOLLIN) {
                        /* 這段程式碼不會被重複觸發,所以我們迴圈讀取資料,以確保把socket讀快取中的所有資料讀出 */
                        printf("event trigger once\n");
                        while (1) {
                                memset(buf, '\0', BUFFER_SIZE, 0);
                                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                                if (ret < 0) {
                                        /* 對於非阻塞IO,下面的條件成立表示資料已經全部讀取完畢。此後,epoll就能再次觸發sockfd上的EPOLLIN事件,以驅動下一次讀操作 */
                                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                                                printf("read later\n");
                                                break;
                                        }
                                        close(sockfd);
                                        break;
                                }
                                else if (ret == 0 )
                                {
                                        close(sockfd);
                                }
                                else {
                                        printf("get %d bytes of content: %s\n", ret, buf);
                                }
                        }
                }
                else {
                        printf("get %d bytes of contet: %s\n", ret, buf);
                }
        }
}

int main(int argc, char *argv[]) 
{
    ...
        int ret = 0;
        int listenfd = socket(AF_IENT, SOCK_STREAM, 0);

        ret = bind(listenfd, (SA *)&address, sizeof(address));
        ret = listen(listenfd, 5);

        epoll_event events[MAX_EVENT_NUMBER];
        int epollfd = epoll_create(5);

        addfd(epollfd, listenfd, true);
        while (1) {
                int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);

                lt(events, ret, epollfd, listenfd); /* use LT mode*/
                // et(events, ret, epollfd, listenfd); /* use ET mode */
        }

        close(listenfd);
        return 0;
}

注意:
1)ET模式下,事件被觸發的次數要比LT模式下少很多;
2)每個使用ET模式的檔案描述符都應該是非阻塞的。如果檔案描述符是阻塞的,那麼讀或寫操作將會因為沒有後續的事件,而一直處於阻塞狀態(飢渴狀態);

EPOLLONESHOT事件

即使使用ET模式,一個socket上的某個事件還是可能被觸發多次,在這併發程式中會引起一個檔案,比如一個執行緒(或程序)在讀取完某個socket上的資料後開始處理這些資料,而在資料的處理過程中該socket上又有新資料可讀(EPOLLIN再次被觸發),此時另外一個執行緒被喚醒來讀取這些新的資料。於是出現2個執行緒同時操作一個socket的局面。
----這不是期望的,我們期望的是:一個socket連線在任一時刻,都只會被一個執行緒處理。 這點可以使用epoll的EPOLLONESHOT事件實現。

註冊了EPOLLONESHOT事件的檔案描述符,OS最多觸發其上註冊的一個可讀、可寫或異常事件,且只觸發一次,除非我們使用epoll_ctl函式重置該檔案描述符上註冊的EPOLLONESHOT事件。這樣,當一個執行緒在處理某個socket時,其他執行緒不可能有機會操作該socket。反過來,註冊了EPOLLONESHOT事件的socket一旦被某個執行緒處理完畢,該執行緒就應該立刻重置這個socket上的EPOLLONESHOT事件,以確保這個socket下一次可讀時,其EPOLLIN事件就能被觸發,進而讓其他工作執行緒有機會繼續處理該socket。

PS:epoll只會觸發一次EPOLLONESHOT事件,直到重置該事件,接著允許再觸發一次。

在fd上註冊EPOLLONESHOT事件方式:

/* 將fd上的EPOLLIN和EPOLLET事件註冊到epollfd指示的epoll核心事件表中,引數oneshot指定是否註冊fd上的EPOLLONESHOT事件  */
void addfd (int epollfd, int fd, bool oneshot)
{
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if (oneshot)
        event.events |= EPOLLONESHOT;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd); // 注意: 如果將fd註冊為ET模式(EPOLLET事件), 則檔案描述符應設為non-blocking
}

/* 將fd設為non-blocking */
int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

重置fd上的EPOLLONESHOT事件方式:

/* 重置fd上的事件。這樣操作之後,儘管fd上的EPOLLONESHOT事件被註冊,但是OS仍然會觸發fd上的EPOLLIN事件,且只觸發一次 */
void reset_oneshot(int epollfd, int fd)
{
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // 必須註冊EPOLLONESHOT事件, 其他事件根據實際情況決定
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);   // epoll_ctl重置fd上註冊的EPOLLONESHOT事件
}

EPOLLONESHOT例程

兩個執行緒同時對註冊EPOLLONESHOT事件的同一個非阻塞sockfd,進行阻塞操作recv

演示如何使用epoll對fd重置EPOLLONESHOT事件。同樣的只展示部分核心程式碼,完整程式碼見epoll_EPOLLONESHOT.c

// from https://gitee.com/fortunely/linuxstudy/blob/master/advancedio/epoll/epoll_EPOLLONESHOT.c

struct fds {
    int epollfd;
    int sockfd;
};

int main()
{
    int listenfd = socekt();
    bind(listenfd);
    listen(listenfd);
    
    struct epoll_event events[MAX_EVENT_NUMBER]; // 1024
    int epollfd = epoll_create(5);   /* 5沒有意義,但必須>0 */
    
    /* 注意:監聽socket listenfd 上是不能註冊EPOLLONESHOT事件的,否則應用程式只能處理一個客戶連線!因為後續的客戶連線請求將不再觸發listenfd上的EPOLLIN事件 */
    addfd(epollfd, listenfd, false);

    while (1) {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        for (int i = 0; i < ret; i++) {
            int sockfd = events[i].data.fd;
            if (sockfd == listenfd) {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (SA *)&client_address,  &client_addrlength);

                /* 對每個非監聽檔案描述符都註冊EPOLLONESHOT事件 */
                addfd(epollfd, connfd, true);
            }
            else if (events[i].events & EPOLLIN ) { /* 普通可讀資料就緒 */
                pthread_t thread;
                struct fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;

                /* 新啟動一個工作執行緒為sockfd服務 */
                pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
            }
            else {
                printf("something else happend\n");
            }
        }
    }
}

/* 工作執行緒 */
void *worker(void *arg)
{
    int sockfd = ((struct fds *)arg)->sockfd;
    int epollfd = ((struct fds *)arg)->epollfd;
    printf("start new thread to receive data on fd: %d\n", sockfd);
    char buf[BUFFER_SIZE]; // 1024
    memset(buf, '\0', BUFFER_SIZE);

    /* 迴圈讀取sockfd上的資料,直到遇到EAGAIN錯誤 */
    while (1) {
        int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
        if (ret == 0) {
            close(sockfd);
            printf("foreiner closed the connection\n");
            break;
        }
        else if (ret < 0) {
            if (errno == EAGAIN) { // 在ET模式下, 對non-blocking fd呼叫阻塞操作recv,  該操作可能沒有完成就返回, 從而產生該錯誤碼, 但不會破壞socket
                reset_oneshot(epollfd, sockfd); // 重置sockfd上註冊的EPOLLONESHOT事件以及其他事件
                printf("read later\n");
                break;
            }
        }
        else {
            printf("get content: %s\n", buf);
            /* 休眠5秒,模擬資料處理過程 */
            sleep(5);
        }
    }
    printf("end thread receiving data on fd: %d\n", sockfd);
}

select/poll/epoll 比較

select、poll、epoll三組I/O複用系統呼叫,共同點:
1)都能同時監聽多個檔案描述符。
2)都有超時等待功能,由timeout引數指定超時時間,或者到一個或多個檔案描述符上有監聽事件發生時返回。
3)返回值是就緒的檔案描述符的數量。返回0表示沒有事件發生。

那麼它們有什麼區別?在什麼情況下選用哪種系統呼叫呢?
下面從事件集、最大支援檔案描述符數、工作模式和程式設計模型,等4個方面進一步比較它們的異同,以明確在實際應用中應該選擇使用哪個或哪些。

1)事件集
select通過fd_set告訴核心監聽哪些檔案描述符,不過fd_set並沒有將檔案描述符和事件繫結,因此需要提供3個這種型別的引數(readfds/writefds/exceptionfds),分別傳入要監聽的可讀、可寫、異常事件集。正因為這樣,select不能處理更多型別的事件,另一方面由於核心對fd_set集合的線上修改,應用程式下次呼叫select前不得不重置這3個fd_set,因此每次select呼叫前,都要重新設定這3個fd_set並拷貝到核心。

poll的事件集pollfd更“聰明”一些。它把檔案描述符和事件都定義繫結到一起,任何事件都被統一處理,從而使得程式設計介面更簡潔。並且,核心每次修改的是pollfd結構體的revents成員(return events),而events不變(request events),因此下次呼叫poll時應用程式無需重置pollfd型別的事件集引數

由於每次select和poll呼叫都返回整個使用者註冊的事件集合(就緒的,和未就緒的),所以應用程式索引就緒檔案描述符的時間複雜度O(n)。

epoll採用與select和poll完全不同的方式來管理使用者註冊的事件:它在核心中維護一個事件表,並提供一個獨立的系統呼叫epoll_ctl,來控制往其中新增、刪除、修改事件。這樣,每次epoll_wait呼叫都直接從該核心事件表中取得使用者註冊的事件,而無需反覆從使用者控制元件讀入這些事件。epoll_wait系統呼叫的events引數僅僅用了返回就緒的事件,應用程式索引就緒檔案描述符的複雜度O(1)。

PS:epoll_wait 傳出的事件集,就是就緒事件集。

2)最大支援檔案描述符數
poll和epoll_wait分別用nfds和maxevents引數指定最多監聽多少個檔案描述符和事件,這2個數制都能達到系統允許開啟的最大檔案描述符數目,即65535($ cat /proc/sys/fs/file-max檢視);select允許監聽的檔案描述符的最大數量有限制(通常是1024),雖然使用者可以修改該限制,但可能導致不可預期的後果。

3)工作模式
select和poll都只能工作在相對低效的LT模式,而epoll可以工作在ET高效模式。而且epoll還支援EPOLLONESHOT事件,能進一步減少可讀、可寫和異常等事件被觸發的次數。

4)程式設計模型
select和poll採用的都是輪詢的方式,即每次呼叫都掃描整個註冊檔案描述符集合,並將其中就緒的檔案描述符返回給使用者選擇,檢測就緒事件演算法時間複雜度O(n)。
epoll採用回撥的方式,核心檢測到就緒的檔案描述符時,將觸發回撥函式,回撥函式就將該檔案描述符上對應的事件插入核心就緒事件佇列。核心最後在適當的時機將該就緒事件佇列中的內容拷貝到使用者控制元件。因此,epoll_wait無需輪詢整個檔案描述符集合來檢測哪些事件已經就緒,演算法時間複雜度O(1)。
當活動連線比較多的時候,epoll_wait效率未必比select和poll高,因為此時回撥函式被觸發得過於頻繁。因此,epoll_wait適用於連線數較多,但活動連線較少的情況。

活動連線多時,會發生頻繁回撥,佔用大量CPU開銷。
連線數多時,select、poll每次輪詢不得不遍歷所有註冊事件的集合,浪費大量CPU時間。

總結

系統呼叫 select poll epoll
事件集合 使用者通過3個引數分別傳入感興趣的可讀、可寫及異常等事件,
核心通過對這些引數的線上修改來返回其中的就緒事件。
這使得每次呼叫select都要重置這3個集合引數
統一處理所有事件型別,因此只需要一個事件集引數。
使用者通過pollfd,events傳入感興趣的事件,
核心通過修改pollfd,revents反饋其中就緒的事件
核心通過一個事件表直接管理使用者感興趣的所有事件。
因此每次呼叫epoll_wait時,無須反覆傳入使用者感興趣的事件。
epoll_wait系統呼叫的引數events僅用來反饋就緒的事件
應用程式索引就緒檔案描述符的時間複雜度 O(n) O(n) O(1)
最大支援檔案描述符數 一般有最大值限制FD_SETSIZE 系統支援的最大檔案描述符數(如65535),
也就是說不受限於FD_SETSIZE
系統支援的最大檔案描述符數(如65535),
不受限於FD_SETSIZE
工作模式 LT LT LT和ET
核心實現和工作效率 採用輪詢方式檢測就緒事件 採用輪詢方式來檢測就緒事件 採用回撥方式檢測就緒事件

另附一張從網上找到的一個簡單彙總圖(具體出處已忘記):