1. 程式人生 > >I/O多路複用機制(一)

I/O多路複用機制(一)

      在實際的開發中,我們經常會遇到這樣的場景,我們需要接受多個埠的資料、多個終端的資料抑或是多個檔案描述符對應的資料。那麼,遇到這樣的問題,你在程式中該怎麼做呢?通常的做法,在程式中對資料互動的描述符進行輪詢。那麼問題來了,輪詢的時間設定為多少呢?設定的太短,可以保證處理效能和速度,但是CPU的使用率太高,一旦處理的描述符數量多了起來,CPU可能就扛不住了。設定的時間太長,描述符處理的時間片太短,處於空閒的時間較長,效能和速度達不到要求。如果是伺服器的話,面對多個使用者的連線,處理速度和CPU使用效能是必須考慮的,而且最好要兼顧。這裡就需要使用到I/O多路複用機制,這就是博主即將要和小夥伴們探討的內容。

select簡介

#include <sys/select.h>

#include <sys/time.h>

int select(int maxfdp1,fd_set *readset,fd_set*writeset,fd_set *exceptset,const struct timeval *timeout);

maxfdp1:待監控的最大描述符數值加1。

readset、writeset和exceptset:指定我們要讓核心測試讀、寫和異常條件的描述字。如果對某一個的條件不感興趣,就可以把它設為空指標。

struct fd_set可以理解為一個集合,這個集合中存放的是檔案描述符,可通過以下四個巨集進行設定:

void FD_ZERO(fd_set *fdset);         //清空集合

void FD_SET(int fd, fd_set*fdset);   //將一個給定的檔案描述符加入集合之中

void FD_CLR(int fd, fd_set *fdset);  //將一個給定的檔案描述符從集合中刪除

int FD_ISSET(int fd, fd_set *fdset);  // 檢查集合中指定的檔案描述符是否就緒

fd_set定義如下:

typedefstructfd_set {

u_int fd_count;//fd_set中監聽的檔案描述符個數

intfd_array[FD_SETSIZE];

//存放了要監聽的檔案描述符

} fd_set;

timeout:告知核心等待輪詢的時間。其timeval結構用於指定這段時間的秒數和微秒數。

        struct timeval{

                  long tv_sec;   //seconds

                  long tv_usec;  //microseconds

       };

這個引數有三種可能:

(1)永遠等待下去:直到有至少一個描述字準備就緒才返回。將timeout設定為NULL。

(2)等待一段固定時間:在超時前,有一個描述字準備就緒就返回。為此,該引數必須指向一個timeval結構,而且其中的定時器值必須大於0。

(3)不等待:檢查描述符後立即返回,這稱為輪詢。為此,該引數必須指向一個timeval結構,而且其中的定時器值必須為0。

返回值:就緒描述符的數目,超時返回0,出錯返回-1

原理圖

       select的工作模式:每次呼叫select,都需要把fd集合從使用者態拷貝到核心態,在核心中完成輪詢的工作,待輪詢結束再將fd集合從核心態拷貝到使用者態。注意,每次呼叫select前都要重新設定檔案描述符和時間,因為事件發生後,檔案描述符和時間都被核心做了修改。我們可以將需要監控的描述符(包括檔案、終端、套接字等等)新增到fd_set集合中,由select來監控,這樣可以將多處阻塞轉移到一處。例如,既要接收終端資料錄入,又要接收socket(阻塞socket)傳遞過來的資料,那麼在socket資料接收(read)和終端I/O(fgets)處均會阻塞。使用select後就不一樣了,我們可以將這些描述符加入fd_set集合中,阻塞的地方只有一處,就是select呼叫處。

程式例項

       下面我們通過select實現一個簡單的伺服器回射例項,服務端監聽客戶端的連線請求,將已連線客戶端描述符新增到監控描述符集合中,select對監聽描述符和已連線客戶端描述符進行監控。當監聽描述符就緒,表示有新客戶端連線服務端,呼叫處理連線邏輯;當有客戶端描述符就緒,識別符號已連線客戶端有資料傳送給伺服器,服務端呼叫資料接收邏輯,將客戶端傳送過來的資料原樣傳送給客戶端。好了,廢話不多說了,直接看程式碼吧!

服務端程式碼

echo_svr.h

#include <stdio.h>

#include <arpa/inet.h>

#include <sys/select.h>

#include <time.h>

#include <unistd.h>

#include <sys/socket.h>

#include <error.h>

#include <sys/types.h>

#include <stdlib.h>

#include <string.h>

const int MAXFD = FD_SETSIZE;//FD_SETSIZE 1024 可以監控的最大描述符數量

const char *SERVERIP = "127.0.0.1";//服務端IP

const unsigned short PORT = 6666;//服務端埠號

typedef struct server_st

{

    int     cli_num;        //已連線客戶端數量

    int     cli_fd[MAXFD-1];//存放已連線客戶端描述符

    int    maxfd;          //存放描述符最大值

    int     index;          //儲存最大的索引號(已連線客戶端資料下標)

    fd_set  allset;         //監控的檔案描述符集合(select引數)

    fd_set  set;            //監控的檔案描述符集合(中間值)

    int     ready;          //已就緒描述符數量(select返回值)

}server_st_t;

//打印出錯資訊並推出程式

#define handle_error(msg)\

    do{perror(msg); exit(EXIT_FAILURE);}while(0)

//定義類

class selectsocket

{

    public:

        //建構函式

       selectsocket(const char *server_ip = SERVERIP, unsigned short port =PORT);

        //解構函式

        virtual~selectsocket();

        //客戶處理函式(對外介面)

        inthandle_cli_proc();

    private:

        intserver_init();//初始化server_st_t結構體

        intserver_uninit();//釋放server_st_t結構體

        inthandle_create_proc();//建立服務端監聽套接字

        int handle_accept_proc();//處理客戶端連線

        inthandle_recv_proc();//處理客戶端資料傳送

       selectsocket(const selectsocket &ref);

       selectsocket& operator=(const selectsocket &ref);

    private:

       server_st_t     *m_server_st;

       char            m_server_IP[16];

       unsigned short  m_port;

       int             m_server_fd;

};

selectsocket::selectsocket(const char *server_ip /*=SERVERIP*/, unsigned short port /*= PORT*/)

{

   bzero(m_server_IP, sizeof(m_server_IP));//將類成員地址空間清零

   memcpy(m_server_IP, server_ip, strlen(server_ip));//給類成員負值

    m_port =port;

    m_server_st= NULL;

   server_init();//初始化

   handle_create_proc();//建立監聽套接字

}

selectsocket::~selectsocket()

{

    int cli_fd= -1;

    for (int i= 0; i < MAXFD; ++i)

    {

        cli_fd= m_server_st->cli_fd[i];

        if (-1!= cli_fd)

        {

           close(cli_fd);

        }

    }

   server_uninit();

   close(m_server_fd);

}

int selectsocket::server_init()

{

    m_server_st= (server_st_t*)malloc(sizeof(server_st_t));

    if (NULL ==m_server_st) handle_error("malloc");

   bzero(m_server_st, sizeof(server_st_t));

   //memset(m_server_st->cli_fd, 0xFF, sizeof(m_server_st->cli_fd));

    //初始化描述符陣列

    for (inti=0; i<MAXFD; ++i)

    {

       m_server_st->cli_fd[i] = -1;

    }

   m_server_st->index = -1;//初始索引號

   FD_ZERO(&m_server_st->allset);//情況集合

   //FD_ZERO(&m_server_st->set);

    return 0;

}

int selectsocket::server_uninit()

{

    if (NULL !=m_server_st)

    {

       free(m_server_st);

    }

    m_server_st= NULL;

}

int selectsocket::handle_create_proc()

{

    //建立TCP套接字

    //引數1:協議族

    //引數2:套接字型別

    //引數3:使用的協議(0:使用套接字型別對應的預設協議)

    if (-1 ==(m_server_fd = socket(AF_INET, SOCK_STREAM, 0)))handle_error("socket");

    //地址結構體

    structsockaddr_in serveraddr;

    //將結構體變數空間清零

   bzero(&serveraddr, sizeof(serveraddr));

   serveraddr.sin_family = AF_INET;//協議族

    //注意,設定埠和IP時,要將主機位元組序轉換為網路位元組序

   serveraddr.sin_port = htons(m_port);

   //serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

   inet_pton(AF_INET, m_server_IP, &serveraddr.sin_addr);

    int op =true;

    ///*一個埠釋放後會等待兩分鐘之後才能再被使用(TIME_WAIT狀態),SO_REUSEADDR是讓埠釋放後立即就可以被再次使用*/

    if (-1 ==setsockopt(m_server_fd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)))handle_error("setsockopt");

    //命名套接字

    if (-1 ==bind(m_server_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))handle_error("serveraddr");

    //將套接字由主動變為被動(接受客戶端連線狀態)

    if (-1 ==listen(m_server_fd, SOMAXCONN)) handle_error("listen");

    return 0;

}

int selectsocket::handle_accept_proc()

{

    //地址結構體

    structsockaddr_in cli_addr;

    socklen_tlen = (socklen_t)sizeof(cli_addr);

    //將結構體變數空間清零

   bzero(&cli_addr, sizeof(cli_addr));

    //接受客戶端的連線請求

    int cli_fd= accept(m_server_fd, (struct sockaddr*)&cli_addr, &len);

    if (-1 ==cli_fd) handle_error("accept");

    fprintf(stdout,"#%d  %s:%d  connected server!\n",m_server_st->cli_num, inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));

    int i = 0;

    //將新連結的客戶存入客戶端陣列中

    for (i = 0;i < MAXFD; ++i)

    {

        if (-1== m_server_st->cli_fd[i])

        {

            //將已連線套接字描述符存入陣列

           m_server_st->cli_fd[i] = cli_fd;

           //m_server_st->maxfd儲存值最大的已連線套接字描述符

           m_server_st->maxfd = m_server_st->maxfd > cli_fd ?m_server_st->maxfd : cli_fd;

           //m_server_st->index儲存客戶陣列中已連線套接字描述符的最大索引

           m_server_st->index = m_server_st->index > i ?m_server_st->index : i;

            //將套接字描述符關聯到集合中

           //FD_SET(cli_fd, &m_server_st->allset);

           break;

        }

    }

    //如果已連線套接字描述符超過了FD_SETSIZE報錯

    if (i ==FD_SETSIZE) handle_error("too many connect");

   ++m_server_st->cli_num;//已連線客戶數加1

   //m_server_st->set = m_server_st->allset;

}

int selectsocket::handle_recv_proc()

{

    //如果陣列中不至當前一個使用者,遍歷陣列

    for (int i= 0; i <= m_server_st->index; ++i)

    {

        //如果為空位置,continue

        if (-1== m_server_st->cli_fd[i])

        {

           continue;

        }

        //如果非空位置,cli_fd儲存當前描述符

        intcli_fd = m_server_st->cli_fd[i];

        //如果當前描述符就緒,執行以下程式碼

        if(FD_ISSET(cli_fd, &m_server_st->allset))

        {

           char buf[256] = {0};

            //從網路中讀取資料

            intr = read(cli_fd, buf, sizeof(buf));

            //如果讀取出錯

            if(r <= 0)

            {

               //在陣列中將當前位置設為空位置

               m_server_st->cli_fd[i] = -1;

               //將當前描述符從集合中清除

               FD_CLR(cli_fd, &m_server_st->allset);

               //關閉當前描述符

               close(cli_fd);

               --m_server_st->cli_num;//已連線客戶數減1

            }

            //如果接受成功,將資料寫回網路

           write(cli_fd, buf, sizeof(buf));

            //清空快取區

           memset(buf, 0x00, sizeof(buf));

        }

        //每處理一個描述符,read減1

        //如果read為0(所有就緒描述符都處理完畢),就不用繼續向後掃描

        //if(--m_server_st->ready <= 0) break;

    }

    return 0;

}

int selectsocket::handle_cli_proc()

{

    while (1)

    {

        /*每次呼叫select前都要重新設定檔案描述符集合和超時時間,因為事件發生後,檔案描述符和時間都被核心修改啦*/

       FD_ZERO(&m_server_st->allset);

        //將監聽套接字描述符裝入描述符集合

       FD_SET(m_server_fd, &m_server_st->allset);

        //儲存最大的描述符

       m_server_st->maxfd = m_server_fd;

        //設定超時時間為5秒

        structtimeval timeout;

       timeout.tv_sec = 5;

       timeout.tv_usec = 0;

        //更新集合

       //m_server_st->set = m_server_st->allset;

        //將已連線客戶端描述符新增到集合中

        for(int i = 0; i<=m_server_st->index; ++i)

        {

            intcli_fd = m_server_st->cli_fd[i];

            if(-1 != cli_fd)

            {

                FD_SET(cli_fd,&m_server_st->allset);

            }

           //m_server_st->maxfd儲存值最大的已連線套接字描述符

           m_server_st->maxfd = m_server_st->maxfd > cli_fd ?m_server_st->maxfd : cli_fd;

        }

       //select阻塞等待描述符集合中是否有就緒的描述符

        //引數1:描述符集合中最大描述符值加1

        //引數2:讀就緒集合

        //引數3:寫就緒集合

        //引數4:異常就緒集合

        //引數5:最長等待時間(NULL:無限等待,直到有描述符就緒)

        //一旦有描述符就緒則返回,返回值為以就緒描述符的個數

       m_server_st->ready = select(m_server_st->maxfd+1,&m_server_st->allset, NULL, NULL, &timeout);

       //select函式返回異常

        if (-1== m_server_st->ready) break;

       //select函式等待超時

        if (0== m_server_st->ready)

        {

           fprintf(stdout, "select timeout\n");

           continue;

        }

        //如果有客戶斷連線伺服器,監聽套接字就緒,執行以下程式碼

        if(FD_ISSET(m_server_fd, &m_server_st->allset))

        {

            //處理客戶端的連線

           handle_accept_proc();

            //已經處理過來sfd描述符,read減1

            //如果此時read<=0,表示所有就緒描述符已處理完畢,返回select處繼續等待就緒描述符

            if (--m_server_st->ready <= 0)

            {

               continue;

           } 

        }

        else

        {

            //處理客戶端傳送過來的資料

           handle_recv_proc();

        }

    }

    return 0;

}

main.cpp

#include "echo_svr.h"

int main()

{

   selectsocket selectsock;

   selectsock.handle_cli_proc();

}

客戶端程式碼

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <arpa/inet.h>

#include <sys/socket.h>

#include <string.h>

int main()

{

    //建立IPV4 TCP套接字

    int sfd =socket(AF_INET, SOCK_STREAM, 0);

    if (-1 ==sfd) perror("socket"), exit(EXIT_FAILURE);

    //地址結構體

    structsockaddr_in addr;

   addr.sin_family = AF_INET;//協議族

   addr.sin_port = htons(6666);//埠

    //將地址串轉換為網路位元組序,儲存到addr.sin_addr中

   inet_aton("127.0.0.1", &addr.sin_addr);//連線的服務端IP地址

    //連線伺服器

    if (-1 ==connect(sfd, (struct sockaddr *)&addr, sizeof(addr)))

    {

       perror("connect");

       exit(EXIT_FAILURE);

    }

    charbuf[256] = {};

    //從標準輸入讀取資料

    while (NULL!= fgets(buf, sizeof(buf), stdin))

    {

        //將資料傳送給伺服器

       write(sfd, buf, strlen(buf));

        //清空快取區

       memset(buf, 0x00, sizeof(buf));

        //讀取伺服器放送過來的資料

        int r =read(sfd, buf, sizeof(buf));

        //接受失敗

        if (r<= 0)

        {

           break;

        }

        //輸出

       fprintf(stdout, buf, r);

        //清空快取區

       memset(buf, 0x00, sizeof(buf));

    }

    //關閉套接字描述符

    close(sfd);

    return 0;

}

程式執行截圖:

 

select缺陷

       說到這裡,我們來談一談select的缺陷吧,主要有一下三點。

①每次呼叫select,都需要把fd集合從使用者態拷貝到核心態,這個開銷在fd很多時會很大

②同時每次呼叫select都需要在核心遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大

③select支援的檔案描述符數量太少了,預設是1024

     關於select、poll和epoll的對比,請觀看博主的另外一篇博文poll epoll select,在那裡博主對他們之間的區別、效能和訊息傳遞方式進行了總結,希望能對你有一點幫助。最後附上一張select的實現結構圖,如果想了解跟多關於select的實現原理和原始碼剖析,請參考文末連線。