I/O多路複用機制(一)
在實際的開發中,我們經常會遇到這樣的場景,我們需要接受多個埠的資料、多個終端的資料抑或是多個檔案描述符對應的資料。那麼,遇到這樣的問題,你在程式中該怎麼做呢?通常的做法,在程式中對資料互動的描述符進行輪詢。那麼問題來了,輪詢的時間設定為多少呢?設定的太短,可以保證處理效能和速度,但是CPU的使用率太高,一旦處理的描述符數量多了起來,CPU可能就扛不住了。設定的時間太長,描述符處理的時間片太短,處於空閒的時間較長,效能和速度達不到要求。如果是伺服器的話,面對多個使用者的連線,處理速度和CPU使用效能是必須考慮的,而且最好要兼顧。這裡就需要使用到I/O多路複用機制,這就是博主即將要和小夥伴們探討的內容。
select簡介
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1,fd_set *readset,fd_set*writeset,fd_set *exceptset,const struct timeval *timeout);
maxfdp1:待監控的最大描述符數值加1。
readset、writeset和exceptset:指定我們要讓核心測試讀、寫和異常條件的描述字。如果對某一個的條件不感興趣,就可以把它設為空指標。
struct fd_set可以理解為一個集合,這個集合中存放的是檔案描述符,可通過以下四個巨集進行設定:
void FD_ZERO(fd_set *fdset); //清空集合
void FD_SET(int fd, fd_set*fdset); //將一個給定的檔案描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset); //將一個給定的檔案描述符從集合中刪除
int FD_ISSET(int fd, fd_set *fdset); // 檢查集合中指定的檔案描述符是否就緒
fd_set定義如下:
typedefstructfd_set {
u_int fd_count;//fd_set中監聽的檔案描述符個數
intfd_array[FD_SETSIZE];
//存放了要監聽的檔案描述符} fd_set;
timeout:告知核心等待輪詢的時間。其timeval結構用於指定這段時間的秒數和微秒數。
struct timeval{
long tv_sec; //seconds
long tv_usec; //microseconds
};
這個引數有三種可能:
(1)永遠等待下去:直到有至少一個描述字準備就緒才返回。將timeout設定為NULL。
(2)等待一段固定時間:在超時前,有一個描述字準備就緒就返回。為此,該引數必須指向一個timeval結構,而且其中的定時器值必須大於0。
(3)不等待:檢查描述符後立即返回,這稱為輪詢。為此,該引數必須指向一個timeval結構,而且其中的定時器值必須為0。
返回值:就緒描述符的數目,超時返回0,出錯返回-1
原理圖
select的工作模式:每次呼叫select,都需要把fd集合從使用者態拷貝到核心態,在核心中完成輪詢的工作,待輪詢結束再將fd集合從核心態拷貝到使用者態。注意,每次呼叫select前都要重新設定檔案描述符和時間,因為事件發生後,檔案描述符和時間都被核心做了修改。我們可以將需要監控的描述符(包括檔案、終端、套接字等等)新增到fd_set集合中,由select來監控,這樣可以將多處阻塞轉移到一處。例如,既要接收終端資料錄入,又要接收socket(阻塞socket)傳遞過來的資料,那麼在socket資料接收(read)和終端I/O(fgets)處均會阻塞。使用select後就不一樣了,我們可以將這些描述符加入fd_set集合中,阻塞的地方只有一處,就是select呼叫處。
程式例項
下面我們通過select實現一個簡單的伺服器回射例項,服務端監聽客戶端的連線請求,將已連線客戶端描述符新增到監控描述符集合中,select對監聽描述符和已連線客戶端描述符進行監控。當監聽描述符就緒,表示有新客戶端連線服務端,呼叫處理連線邏輯;當有客戶端描述符就緒,識別符號已連線客戶端有資料傳送給伺服器,服務端呼叫資料接收邏輯,將客戶端傳送過來的資料原樣傳送給客戶端。好了,廢話不多說了,直接看程式碼吧!
服務端程式碼
echo_svr.h
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <time.h>
#include <unistd.h>
#include <sys/socket.h>
#include <error.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
const int MAXFD = FD_SETSIZE;//FD_SETSIZE 1024 可以監控的最大描述符數量
const char *SERVERIP = "127.0.0.1";//服務端IP
const unsigned short PORT = 6666;//服務端埠號
typedef struct server_st
{
int cli_num; //已連線客戶端數量
int cli_fd[MAXFD-1];//存放已連線客戶端描述符
int maxfd; //存放描述符最大值
int index; //儲存最大的索引號(已連線客戶端資料下標)
fd_set allset; //監控的檔案描述符集合(select引數)
fd_set set; //監控的檔案描述符集合(中間值)
int ready; //已就緒描述符數量(select返回值)
}server_st_t;
//打印出錯資訊並推出程式
#define handle_error(msg)\
do{perror(msg); exit(EXIT_FAILURE);}while(0)
//定義類
class selectsocket
{
public:
//建構函式
selectsocket(const char *server_ip = SERVERIP, unsigned short port =PORT);
//解構函式
virtual~selectsocket();
//客戶處理函式(對外介面)
inthandle_cli_proc();
private:
intserver_init();//初始化server_st_t結構體
intserver_uninit();//釋放server_st_t結構體
inthandle_create_proc();//建立服務端監聽套接字
int handle_accept_proc();//處理客戶端連線
inthandle_recv_proc();//處理客戶端資料傳送
selectsocket(const selectsocket &ref);
selectsocket& operator=(const selectsocket &ref);
private:
server_st_t *m_server_st;
char m_server_IP[16];
unsigned short m_port;
int m_server_fd;
};
selectsocket::selectsocket(const char *server_ip /*=SERVERIP*/, unsigned short port /*= PORT*/)
{
bzero(m_server_IP, sizeof(m_server_IP));//將類成員地址空間清零
memcpy(m_server_IP, server_ip, strlen(server_ip));//給類成員負值
m_port =port;
m_server_st= NULL;
server_init();//初始化
handle_create_proc();//建立監聽套接字
}
selectsocket::~selectsocket()
{
int cli_fd= -1;
for (int i= 0; i < MAXFD; ++i)
{
cli_fd= m_server_st->cli_fd[i];
if (-1!= cli_fd)
{
close(cli_fd);
}
}
server_uninit();
close(m_server_fd);
}
int selectsocket::server_init()
{
m_server_st= (server_st_t*)malloc(sizeof(server_st_t));
if (NULL ==m_server_st) handle_error("malloc");
bzero(m_server_st, sizeof(server_st_t));
//memset(m_server_st->cli_fd, 0xFF, sizeof(m_server_st->cli_fd));
//初始化描述符陣列
for (inti=0; i<MAXFD; ++i)
{
m_server_st->cli_fd[i] = -1;
}
m_server_st->index = -1;//初始索引號
FD_ZERO(&m_server_st->allset);//情況集合
//FD_ZERO(&m_server_st->set);
return 0;
}
int selectsocket::server_uninit()
{
if (NULL !=m_server_st)
{
free(m_server_st);
}
m_server_st= NULL;
}
int selectsocket::handle_create_proc()
{
//建立TCP套接字
//引數1:協議族
//引數2:套接字型別
//引數3:使用的協議(0:使用套接字型別對應的預設協議)
if (-1 ==(m_server_fd = socket(AF_INET, SOCK_STREAM, 0)))handle_error("socket");
//地址結構體
structsockaddr_in serveraddr;
//將結構體變數空間清零
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;//協議族
//注意,設定埠和IP時,要將主機位元組序轉換為網路位元組序
serveraddr.sin_port = htons(m_port);
//serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
inet_pton(AF_INET, m_server_IP, &serveraddr.sin_addr);
int op =true;
///*一個埠釋放後會等待兩分鐘之後才能再被使用(TIME_WAIT狀態),SO_REUSEADDR是讓埠釋放後立即就可以被再次使用*/
if (-1 ==setsockopt(m_server_fd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)))handle_error("setsockopt");
//命名套接字
if (-1 ==bind(m_server_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)))handle_error("serveraddr");
//將套接字由主動變為被動(接受客戶端連線狀態)
if (-1 ==listen(m_server_fd, SOMAXCONN)) handle_error("listen");
return 0;
}
int selectsocket::handle_accept_proc()
{
//地址結構體
structsockaddr_in cli_addr;
socklen_tlen = (socklen_t)sizeof(cli_addr);
//將結構體變數空間清零
bzero(&cli_addr, sizeof(cli_addr));
//接受客戶端的連線請求
int cli_fd= accept(m_server_fd, (struct sockaddr*)&cli_addr, &len);
if (-1 ==cli_fd) handle_error("accept");
fprintf(stdout,"#%d %s:%d connected server!\n",m_server_st->cli_num, inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));
int i = 0;
//將新連結的客戶存入客戶端陣列中
for (i = 0;i < MAXFD; ++i)
{
if (-1== m_server_st->cli_fd[i])
{
//將已連線套接字描述符存入陣列
m_server_st->cli_fd[i] = cli_fd;
//m_server_st->maxfd儲存值最大的已連線套接字描述符
m_server_st->maxfd = m_server_st->maxfd > cli_fd ?m_server_st->maxfd : cli_fd;
//m_server_st->index儲存客戶陣列中已連線套接字描述符的最大索引
m_server_st->index = m_server_st->index > i ?m_server_st->index : i;
//將套接字描述符關聯到集合中
//FD_SET(cli_fd, &m_server_st->allset);
break;
}
}
//如果已連線套接字描述符超過了FD_SETSIZE報錯
if (i ==FD_SETSIZE) handle_error("too many connect");
++m_server_st->cli_num;//已連線客戶數加1
//m_server_st->set = m_server_st->allset;
}
int selectsocket::handle_recv_proc()
{
//如果陣列中不至當前一個使用者,遍歷陣列
for (int i= 0; i <= m_server_st->index; ++i)
{
//如果為空位置,continue
if (-1== m_server_st->cli_fd[i])
{
continue;
}
//如果非空位置,cli_fd儲存當前描述符
intcli_fd = m_server_st->cli_fd[i];
//如果當前描述符就緒,執行以下程式碼
if(FD_ISSET(cli_fd, &m_server_st->allset))
{
char buf[256] = {0};
//從網路中讀取資料
intr = read(cli_fd, buf, sizeof(buf));
//如果讀取出錯
if(r <= 0)
{
//在陣列中將當前位置設為空位置
m_server_st->cli_fd[i] = -1;
//將當前描述符從集合中清除
FD_CLR(cli_fd, &m_server_st->allset);
//關閉當前描述符
close(cli_fd);
--m_server_st->cli_num;//已連線客戶數減1
}
//如果接受成功,將資料寫回網路
write(cli_fd, buf, sizeof(buf));
//清空快取區
memset(buf, 0x00, sizeof(buf));
}
//每處理一個描述符,read減1
//如果read為0(所有就緒描述符都處理完畢),就不用繼續向後掃描
//if(--m_server_st->ready <= 0) break;
}
return 0;
}
int selectsocket::handle_cli_proc()
{
while (1)
{
/*每次呼叫select前都要重新設定檔案描述符集合和超時時間,因為事件發生後,檔案描述符和時間都被核心修改啦*/
FD_ZERO(&m_server_st->allset);
//將監聽套接字描述符裝入描述符集合
FD_SET(m_server_fd, &m_server_st->allset);
//儲存最大的描述符
m_server_st->maxfd = m_server_fd;
//設定超時時間為5秒
structtimeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
//更新集合
//m_server_st->set = m_server_st->allset;
//將已連線客戶端描述符新增到集合中
for(int i = 0; i<=m_server_st->index; ++i)
{
intcli_fd = m_server_st->cli_fd[i];
if(-1 != cli_fd)
{
FD_SET(cli_fd,&m_server_st->allset);
}
//m_server_st->maxfd儲存值最大的已連線套接字描述符
m_server_st->maxfd = m_server_st->maxfd > cli_fd ?m_server_st->maxfd : cli_fd;
}
//select阻塞等待描述符集合中是否有就緒的描述符
//引數1:描述符集合中最大描述符值加1
//引數2:讀就緒集合
//引數3:寫就緒集合
//引數4:異常就緒集合
//引數5:最長等待時間(NULL:無限等待,直到有描述符就緒)
//一旦有描述符就緒則返回,返回值為以就緒描述符的個數
m_server_st->ready = select(m_server_st->maxfd+1,&m_server_st->allset, NULL, NULL, &timeout);
//select函式返回異常
if (-1== m_server_st->ready) break;
//select函式等待超時
if (0== m_server_st->ready)
{
fprintf(stdout, "select timeout\n");
continue;
}
//如果有客戶斷連線伺服器,監聽套接字就緒,執行以下程式碼
if(FD_ISSET(m_server_fd, &m_server_st->allset))
{
//處理客戶端的連線
handle_accept_proc();
//已經處理過來sfd描述符,read減1
//如果此時read<=0,表示所有就緒描述符已處理完畢,返回select處繼續等待就緒描述符
if (--m_server_st->ready <= 0)
{
continue;
}
}
else
{
//處理客戶端傳送過來的資料
handle_recv_proc();
}
}
return 0;
}
main.cpp
#include "echo_svr.h"
int main()
{
selectsocket selectsock;
selectsock.handle_cli_proc();
}
客戶端程式碼
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <string.h>
int main()
{
//建立IPV4 TCP套接字
int sfd =socket(AF_INET, SOCK_STREAM, 0);
if (-1 ==sfd) perror("socket"), exit(EXIT_FAILURE);
//地址結構體
structsockaddr_in addr;
addr.sin_family = AF_INET;//協議族
addr.sin_port = htons(6666);//埠
//將地址串轉換為網路位元組序,儲存到addr.sin_addr中
inet_aton("127.0.0.1", &addr.sin_addr);//連線的服務端IP地址
//連線伺服器
if (-1 ==connect(sfd, (struct sockaddr *)&addr, sizeof(addr)))
{
perror("connect");
exit(EXIT_FAILURE);
}
charbuf[256] = {};
//從標準輸入讀取資料
while (NULL!= fgets(buf, sizeof(buf), stdin))
{
//將資料傳送給伺服器
write(sfd, buf, strlen(buf));
//清空快取區
memset(buf, 0x00, sizeof(buf));
//讀取伺服器放送過來的資料
int r =read(sfd, buf, sizeof(buf));
//接受失敗
if (r<= 0)
{
break;
}
//輸出
fprintf(stdout, buf, r);
//清空快取區
memset(buf, 0x00, sizeof(buf));
}
//關閉套接字描述符
close(sfd);
return 0;
}
程式執行截圖:
select缺陷
說到這裡,我們來談一談select的缺陷吧,主要有一下三點。
①每次呼叫select,都需要把fd集合從使用者態拷貝到核心態,這個開銷在fd很多時會很大
②同時每次呼叫select都需要在核心遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
③select支援的檔案描述符數量太少了,預設是1024
關於select、poll和epoll的對比,請觀看博主的另外一篇博文poll epoll select,在那裡博主對他們之間的區別、效能和訊息傳遞方式進行了總結,希望能對你有一點幫助。最後附上一張select的實現結構圖,如果想了解跟多關於select的實現原理和原始碼剖析,請參考文末連線。