1. 程式人生 > 其它 >《Unix 網路程式設計》06:IO複用之select/poll

《Unix 網路程式設計》06:IO複用之select/poll

IO複用之select/poll

系列文章導航:《Unix 網路程式設計》筆記

概述

程序需要一種預先告知核心的能力,使得核心一旦發現程序指定的一個或多個 I/O 條件準備就緒,他就通知程序。這個能力被稱為 I/O 複用。

典型應用場景有:

  • 客戶處理多個描述符(如之前的應用那樣)
  • 客戶同時處理多個套接字,不過這種情況比較少見
  • 伺服器既要處理監聽套接字,又要處理已經連線的套接字
  • 伺服器既要處理 TCP,又要處理 UDP
  • 伺服器要處理多個服務或多個協議
  • 許多重要的應用程式也需要這種技術

IO模型

Unix 下的 5 種 I/O 模型:

  1. 阻塞式 I/O
  2. 非阻塞式 I/O
  3. I/O 複用(select 和 poll)
  4. 訊號驅動式 I/O (SIGIO)
  5. 非同步 I/O (aio_)

輸入操作的階段

  1. 等待資料準備好
  2. 從核心向程序複製資料

不同的 I/O 模型就是這兩個階段的行為的不同

阻塞式 I/O

阻塞 I/O 就是兩個階段都等待。

可以看出,這種方式比較緩慢,程序在等待期間不能做其他的事情。

非阻塞式 I/O

非阻塞 I/O 在所查詢的資料沒有就緒時不會阻塞(上述階段一),而是直接返回一個錯誤

使用者程序需要不斷地進行呼叫,詢問是否準備就緒,一旦就緒才開始處理。

當一個應用程序像這樣對一個非阻塞描述符迴圈呼叫查詢操作時,我們稱之為輪詢(pollling)。

這種做法往往耗費大量 CPU 時間,反而不太好用。但是在特定的場合下也能發揮其作用。

I/O 複用

I/O 複用會等待在上述兩個階段。但是,第一個階段的等待是等待在如 select 這些方法上的,而不是 recvfrom,而 select 可以同時等待多個套接字,所以說即使準備就緒 socket 的排在後面,也可以“插隊”得到響應。

這種方式也類似於多執行緒的阻塞式 I/O 模式。

而根據具體的細節,I/O 複用又可以分為:

  • select
  • poll
  • epoll

訊號驅動式 I/O

與核心建立 SIGIO 的訊號關聯並設定回撥,當核心有 FD 準備就緒時,傳送 SIGIO 訊號通知使用者,期間使用者應用可以執行其他業務,無需阻塞等待。

這種方式在階段一不會發生阻塞,而且和非阻塞式 I/O 相比也不會發生輪詢現象。

缺點是:

  • 當有大量 IO 操作時,訊號較多,SIGIO 處理函式不能及時處理可能導致訊號佇列溢位
  • 核心空間與使用者空間的頻繁訊號互動效能比較低

非同步 I/O

告知核心啟動某個操作,並讓核心完成整個操作後通知我們。(像是在給下屬佈置任務)

這種方式在上述兩個階段都是非阻塞的,其缺點有:

  • 由於呼叫方只需呼叫而無需等待,而具體工作可能很消耗資源,所以多執行緒高併發環境下可能會耗盡系統資源導致系統崩潰
  • 為了限制併發,需要對上述情況進行處理,從而導致回撥函式會變得複雜

問題:訊息阻塞

在上一章中已經描述了這個問題發生的原因和造成的原因。

這裡側重問題的解決。

select 函式

等待多個事件中的任何一個發生,並只在有一個或多個事件發生或經歷一段指定的時間後才喚醒它。

#include <sys/select.h>
#include <sys/time.h>

int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
          const struct timeval *timeout);

引數和返回值

這裡按照原書的倒序解釋各個引數:

timeout :最長等待時間

struct timeval {
  long tv_sec;
  long tv_usec;
}

分別代表秒和微秒:

  • 如果結構體的兩個引數均設為 0,則不等待
  • 如果結構體為空,則一直等待

readset、writeset、exceptset

設定具體關心的描述符集合,通常是一個整數陣列,不同的位標識不同的描述符。

void FD_ZERO(fd_set *fdset); /* clear all bits in fdset */
void FD_SET(int fd, fd_set *fdset); /* turn on the bit for fd in fdset */
void FD_CLR(int fd, fd_set *fdset); /* turn off the bit for fd in fdset */
int FD_ISSET(int fd, fd_set *fdset); /* is the bit for fd on in fdset ? */

例如:

fd_set rset;
FD_ZERO(&rset); /* initialize the set: all bits off */
FD_SET(1, &rset); /* turn on bit for fd 1 */
FD_SET(4, &rset); /* turn on bit for fd 4 */
FD_SET(5, &rset); /* turn on bit for fd 5 */

如果把這三個引數都設定為空,相當於獲得了一個比 sleep 更為精準的定時器。

maxfdp1

(max file descriptor plus 1)

最大描述符增加 1,這樣我們就不用遍歷完所有的位了

返回資訊

  • 有事件觸發,返回事件的數量
  • 定時器到時,返回 0
  • 出錯,如打斷,返回 -1

就緒條件

套接字準備好讀和寫的條件:

參考:socket - 描述符就緒條件

套接字異常的條件:

如果一個套接字存在帶外資料或仍處於帶外標記,那麼它有異常要處理(24章中講解)

最大描述符數

一般不會使用太多的描述符,在一些特殊應用場景下確實需要擔心這個問題

限制數量:

  • 早先的系統會限制最大描述符的數量,現在的 Unix 版本往往沒有這個限制
  • select 一般會定義一個 FD_SETSIZE 來限制最大描述符的數量,如果想要修改這個值,可能要重新編譯核心
  • 有些廠家正在擴充套件該值,以支援更大的連線數

如果強制修改造成的問題:

  • 描述符數量增大時可能存在擴充套件性問題

其他方法:

  • 改用 poll 代替 select,可以避免描述符有限的問題

str_cli 改進

void str_cli(FILE *fp, int sockfd)
{
    int maxfdp1;
    fd_set rset;
    char sendline[MAXLINE], recvline[MAXLINE];

    FD_ZERO(&rset);
    for (;;)
    {
        FD_SET(fileno(fp), &rset); // fileno 把標準IO檔案指標轉換成描述符
        FD_SET(sockfd, &rset);
        maxfdp1 = max(fileno(fp), sockfd) + 1;
        Select(maxfdp1, &rset, NULL, NULL, NULL);

        if (FD_ISSET(sockfd, &rset))
        { /* socket is readable */
            if (Readline(sockfd, recvline, MAXLINE) == 0)
                err_quit("str_cli: server terminated prematurely");
            Fputs(recvline, stdout);
        }

        if (FD_ISSET(fileno(fp), &rset))
        { /* input is readable */
            if (Fgets(sendline, MAXLINE, fp) == NULL)
                return; /* all done */
            Writen(sockfd, sendline, strlen(sendline));
        }
    }
}

問題:半關閉和緩衝

批量輸入

在前面的 echo 例子中,互動式地傳送訊息是很合適的,但是在該例子中,管道容量的利用率並不高,管道上始終只有少量的訊息在流動。

假如我們需要傳輸一個比較大的訊息,那麼用批量方式可以大大提高管道的利用效率

我們可以利用 Unix 的重定向實現,將檔案內容重定向到套接字中,如下圖所示:

但是批量輸入中有一個問題:對於檔案末尾的 EOF,strcli 函式會結束自身執行,並返回到 main 函式中。main 函式也執行完畢,程式隨之關閉。

在這種輸入方式下,標準輸入中的 EOF 並不意味著我們同時也完成了從套接字的讀取,可能還有訊息還在去或回來的路上。

我們需要的是一種關閉 TCP 連線一半的方法,也就是說,我們想給伺服器傳送一個 FIN,告訴它我們已經完成了資料的傳送,但是仍然保持套接字描述符的開啟以便繼續讀取伺服器發來的響應。這將由 shutdown 函式來完成。

緩衝機制

  • 我們的客戶端程式碼 str_cli 會在有資料時呼叫 fgets 讀取,並讀入 stdio 的緩衝區中
  • 然而 fgets 只返回其中第一行,其餘輸入行仍在 stdio 緩衝區中
  • 之後 write 把這一行輸入寫給伺服器
  • 但是儘管緩衝區中還有資料,select 卻不會被觸發了:因為 select 監聽的是我們指定的某一個描述符,而對 stdio 中的各種函式自帶的緩衝區毫無察覺
  • 可以把 fgets 之類的函式換成系統呼叫 read 和 write,從而避免這種問題

Select 和 stdio 一起用所產生的問題

shutdown

之前用的 close 函式的缺陷:

  • close 只是把相關的引用計數 -1,而不能起到立刻關閉的效果,而 shutdown 則是直接關閉(四次揮手)
  • close 終止兩個方向的資料傳送,shutdown 在這方面的控制更靈活一些
#include <sys/socket.h>

int shutdown(int sockfd, int howto);

howto 的可選值:

含義 說明
SHUT_RD 關閉讀 丟棄套接字接收緩衝區資料,之後接收的訊息都被確認,然後丟棄
SHUT_WR 關閉寫 留在傳送緩衝區的資料被髮送,後跟 TCP 的正常連線終止序列
SHUT_RDWR 讀寫都關閉 等同於上面兩個一起的作用

str_cli 改進

void str_cli(FILE *fp, int sockfd)
{
    int maxfdp1, stdineof;
    fd_set rset;
    char buf[MAXLINE];
    int n;

    stdineof = 0;
    FD_ZERO(&rset);
    for (;;)
    {
        if (stdineof == 0)
            FD_SET(fileno(fp), &rset);
        FD_SET(sockfd, &rset);
        maxfdp1 = max(fileno(fp), sockfd) + 1;
        Select(maxfdp1, &rset, NULL, NULL, NULL);

        if (FD_ISSET(sockfd, &rset))
        { /* socket is readable */
            if ((n = Read(sockfd, buf, MAXLINE)) == 0)
            {
                if (stdineof == 1)
                    return; /* normal termination */
                else
                    err_quit("str_cli: server terminated prematurely");
            }

            Write(fileno(stdout), buf, n);
        }

        if (FD_ISSET(fileno(fp), &rset))
        { /* input is readable */
            if ((n = Read(fileno(fp), buf, MAXLINE)) == 0)
            {
                stdineof = 1;
                Shutdown(sockfd, SHUT_WR); /* send FIN */
                FD_CLR(fileno(fp), &rset);
                continue;
            }

            Writen(sockfd, buf, n);
        }
    }
}
  • 我們用 Read 和 Write 代替了 Fgets 和 Fputs
  • 用 Shutdown 代替了直接終止

改進:select 代替多程序

多程序會佔用大量的系統資源,影響系統的吞吐量,或許我們可以改用 Select 來同時為多個客戶提供服務。

改進程式碼

int main(int argc, char** argv) {
    int i, maxi, maxfd, listenfd, connfd, sockfd;
    int nready, client[FD_SETSIZE];
    ssize_t n;
    fd_set rset, allset;
    char buf[MAXLINE];
    socklen_t clilen;
    struct sockaddr_in cliaddr, servaddr;

    listenfd = Socket(AF_INET, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);

    Bind(listenfd, (SA*)&servaddr, sizeof(servaddr));

    Listen(listenfd, LISTENQ);

    maxfd = listenfd; /* initialize */
    maxi = -1;        /* index into client[] array */
    for (i = 0; i < FD_SETSIZE; i++)
        client[i] = -1; /* -1 indicates available entry */
    FD_ZERO(&allset);
    FD_SET(listenfd, &allset);
    /* end fig01 */

    /* include fig02 */
    for (;;) {
        rset = allset; /* structure assignment */
        nready = Select(maxfd + 1, &rset, NULL, NULL, NULL);

        if (FD_ISSET(listenfd, &rset)) { /* new client connection */
            clilen = sizeof(cliaddr);
            connfd = Accept(listenfd, (SA*)&cliaddr, &clilen);
            printf("new client: %s, port %d\n",
                   Inet_ntop(AF_INET, &cliaddr.sin_addr, 4, NULL),
                   ntohs(cliaddr.sin_port));

            for (i = 0; i < FD_SETSIZE; i++) {
                if (client[i] < 0) {
                    client[i] = connfd; /* save descriptor */
                    break;
                }
            }
            if (i == FD_SETSIZE)
                err_quit("too many clients");

            FD_SET(connfd, &allset); /* add new descriptor to set */
            if (connfd > maxfd)
                maxfd = connfd; /* for select */
            if (i > maxi)
                maxi = i; /* max index in client[] array */

            if (--nready <= 0)
                continue; /* no more readable descriptors */
        }

        for (i = 0; i <= maxi; i++) { /* check all clients for data */
            if ((sockfd = client[i]) < 0)
                continue;
            if (FD_ISSET(sockfd, &rset)) {
                if ((n = Read(sockfd, buf, MAXLINE)) == 0) {
                    /*4connection closed by client */
                    Close(sockfd);
                    FD_CLR(sockfd, &allset);
                    client[i] = -1;
                } else
                    Writen(sockfd, buf, n);

                if (--nready <= 0)
                    break; /* no more readable descriptors */
            }
        }
    }
}

拒絕服務攻擊

在上述模型中,如果有惡意使用者不斷建立新的連結,但是不傳送有用的資訊,則服務會被一直阻塞,耗費伺服器的資源。

當一個伺服器在處理多個客戶時,它絕對不能阻塞於只與單個客戶相關的某個函式呼叫。否則,可能會遭受 DoS 型攻擊。

可能的解決方法包括:

  1. 使用非阻塞 IO
  2. 讓每個客戶由單獨的控制執行緒提供服務
  3. 對 IO 操作設定一個超時

pselect

#include <sys/socket.h>
#include <signal.h>
#include <time.h>

int pselect(
  int maxfdp1,
  fd_set *readset, fd_set *writeset, fd_set *exceptset,
  const struct timespec * timeout,
  const sigset_t *sigmask
);

select 對比的引數解釋:

  • 使用 timespec 代替 timeval ,最高精確到納秒

    struct timespec {
    time_t tv_sec;
    long tv_nsec;
    }
    
  • pselect 增加了第六個引數:一個指向訊號掩碼的指標。該

poll

函式介紹

事件如下:

poll 程式修改

略,參見原文