[半原創]nio學習-多路複用
文章前半部分轉載自 : https://mp.weixin.qq.com/s/YdIdoZ_yusVWza1PU7lWaw ,本文在格式上進行優化,方便閱讀 , 非原創
文章後半部分是自己寫的, 原創
概述
為了講多路複用,當然還是要跟風,採用鞭屍的思路,先講講傳統的網路 IO 的弊端,用拉踩的方式捧起多路複用 IO 的優勢。
為了方便理解,以下所有程式碼都是虛擬碼,知道其表達的意思即可。
阻塞 IO
服務端為了處理客戶端的連線和請求的資料,寫了如下程式碼。
listenfd = socket(); // 開啟一個網路通訊埠 bind(listenfd); // 繫結 listen(listenfd); // 監聽 while(1) { connfd = accept(listenfd); // 阻塞建立連線 int n = read(connfd, buf); // 阻塞讀資料 doSomeThing(buf); // 利用讀到的資料做些什麼 close(connfd); // 關閉連線,迴圈等待下一個連線 }
這段程式碼會執行得磕磕絆絆,就像這樣。
可以看到,服務端的執行緒阻塞在了兩個地方,一個是 accept 函式,一個是 read 函式。
如果再把 read 函式的細節展開,我們會發現其阻塞在了兩個階段。
這就是傳統的阻塞 IO。
整體流程如下圖。
所以,如果這個連線的客戶端一直不發資料,那麼服務端執行緒將會一直阻塞在 read 函式上不返回,也無法接受其他客戶端連線。
這肯定是不行的。
非阻塞 IO
為了解決上面的問題,其關鍵在於改造這個 read 函式。
有一種聰明的辦法是,每次都建立一個新的程序或執行緒,去呼叫 read 函式,並做業務處理。
while(1) { connfd = accept(listenfd); // 阻塞建立連線 pthread_create(doWork); // 建立一個新的執行緒 } void doWork() { int n = read(connfd, buf); // 阻塞讀資料 doSomeThing(buf); // 利用讀到的資料做些什麼 close(connfd); // 關閉連線,迴圈等待下一個連線 }
這樣,當給一個客戶端建立好連線後,就可以立刻等待新的客戶端連線,而不用阻塞在原客戶端的 read 請求上。
不過,這不叫非阻塞 IO,只不過用了多執行緒的手段使得主執行緒沒有卡在 read 函式上不往下走罷了。作業系統為我們提供的 read 函式仍然是阻塞的。
所以真正的非阻塞 IO,不能是通過我們使用者層的小把戲,而是要懇請作業系統為我們提供一個非阻塞的 read 函式。
這個 read 函式的效果是,如果沒有資料到達時(到達網絡卡並拷貝到了核心緩衝區),立刻返回一個錯誤值(-1),而不是阻塞地等待。
作業系統提供了這樣的功能,只需要在呼叫 read 前,將檔案描述符設定為非阻塞即可。
fcntl(connfd, F_SETFL, O_NONBLOCK); int n = read(connfd, buffer) != SUCCESS);
這樣,就需要使用者執行緒迴圈呼叫 read,直到返回值不為 -1,再開始處理業務。
這裡我們注意到一個細節。
非阻塞的 read,指的是在資料到達前,即資料還未到達網絡卡,或者到達網絡卡但還沒有拷貝到核心緩衝區之前,這個階段是非阻塞的。
當資料已到達核心緩衝區,此時呼叫 read 函式仍然是阻塞的,需要等待資料從核心緩衝區拷貝到使用者緩衝區,才能返回。
整體流程如下圖
IO 多路複用
為每個客戶端建立一個執行緒,伺服器端的執行緒資源很容易被耗光。
當然還有個聰明的辦法,我們可以每 accept 一個客戶端連線後,將這個檔案描述符(connfd)放到一個數組裡。
fdlist.add(connfd);
然後弄一個新的執行緒去不斷遍歷這個陣列,呼叫每一個元素的非阻塞 read 方法。
while(1) {
for(fd <-- fdlist) {
if(read(fd) != -1) {
doSomeThing();
}
}
}
這樣,我們就成功用一個執行緒處理了多個客戶端連線。
你是不是覺得這有些多路複用的意思?
但這和我們用多執行緒去將阻塞 IO 改造成看起來是非阻塞 IO 一樣,這種遍歷方式也只是我們使用者自己想出的小把戲,每次遍歷遇到 read 返回 -1 時仍然是一次浪費資源的系統呼叫。
在 while 迴圈裡做系統呼叫,就好比你做分散式專案時在 while 裡做 rpc 請求一樣,是不划算的。
所以,還是得懇請作業系統老大,提供給我們一個有這樣效果的函式,我們將一批檔案描述符通過一次系統呼叫傳給核心,由核心層去遍歷,才能真正解決這個問題。
select
select 是作業系統提供的系統呼叫函式,通過它,我們可以把一個檔案描述符的陣列發給作業系統, 讓作業系統去遍歷,確定哪個檔案描述符可以讀寫, 然後告訴我們去處理:
select系統呼叫的函式定義如下。
int select(
int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
// nfds:監控的檔案描述符集裡最大檔案描述符加1
// readfds:監控有讀資料到達檔案描述符集合,傳入傳出引數
// writefds:監控寫資料到達檔案描述符集合,傳入傳出引數
// exceptfds:監控異常發生達檔案描述符集合, 傳入傳出引數
// timeout:定時阻塞監控時間,3種情況
// 1.NULL,永遠等下去
// 2.設定timeval,等待固定時間
// 3.設定timeval裡時間均為0,檢查描述字後立即返回,輪詢
服務端程式碼,這樣來寫。
首先一個執行緒不斷接受客戶端連線,並把 socket 檔案描述符放到一個 list 裡。
while(1) {
connfd = accept(listenfd);
fcntl(connfd, F_SETFL, O_NONBLOCK);
fdlist.add(connfd);
}
然後,另一個執行緒不再自己遍歷,而是呼叫 select,將這批檔案描述符 list 交給作業系統去遍歷。
while(1) {
connfd = accept(listenfd);
fcntl(connfd, F_SETFL, O_NONBLOCK);
fdlist.add(connfd);
}
不過,當 select 函式返回後,使用者依然需要遍歷剛剛提交給作業系統的 list。
只不過,作業系統會將準備就緒的檔案描述符做上標識,使用者層將不會再有無意義的系統呼叫開銷。
while(1) {
nready = select(list);
// 使用者層依然要遍歷,只不過少了很多無效的系統呼叫
for(fd <-- fdlist) {
if(fd != -1) {
// 只讀已就緒的檔案描述符
read(fd, buf);
// 總共只有 nready 個已就緒描述符,不用過多遍歷
if(--nready == 0) break;
}
}
}
正如剛剛的動圖中所描述的,其直觀效果如下。(同一個動圖消耗了你兩次流量,氣不氣?)
可以看出幾個細節:
- select 呼叫需要傳入 fd 陣列,需要拷貝一份到核心,高併發場景下這樣的拷貝消耗的資源是驚人的。(可優化為不復制)
- select 在核心層仍然是通過遍歷的方式檢查檔案描述符的就緒狀態,是個同步過程,只不過無系統呼叫切換上下文的開銷。(核心層可優化為非同步事件通知)
- select 僅僅返回可讀檔案描述符的個數,具體哪個可讀還是要使用者自己遍歷。(可優化為只返回給使用者就緒的檔案描述符,無需使用者做無效的遍歷)
整個 select 的流程圖如下。
可以看到,這種方式,既做到了一個執行緒處理多個客戶端連線(檔案描述符),又減少了系統呼叫的開銷(多個檔案描述符只有一次 select 的系統呼叫 + n 次就緒狀態的檔案描述符的 read 系統呼叫)。
poll
poll 也是作業系統提供的系統呼叫函式。
int poll(struct pollfd *fds, nfds_tnfds, int timeout);
struct pollfd {
intfd; /*檔案描述符*/
shortevents; /*監控的事件*/
shortrevents; /*監控事件中滿足條件返回的事件*/
};
它和 select 的主要區別就是,去掉了 select 只能監聽 1024 個檔案描述符的限制。
epoll
epoll 是最終的大 boss,它解決了 select 和 poll 的一些問題。
還記得上面說的 select 的三個細節麼?
-
select 呼叫需要傳入 fd 陣列,需要拷貝一份到核心,高併發場景下這樣的拷貝消耗的資源是驚人的。(可優化為不復制)
-
select 在核心層仍然是通過遍歷的方式檢查檔案描述符的就緒狀態,是個同步過程,只不過無系統呼叫切換上下文的開銷。(核心層可優化為非同步事件通知)
-
select 僅僅返回可讀檔案描述符的個數,具體哪個可讀還是要使用者自己遍歷。(可優化為只返回給使用者就緒的檔案描述符,無需使用者做無效的遍歷)
所以 epoll 主要就是針對這三點進行了改進。 -
核心中儲存一份檔案描述符集合,無需使用者每次都重新傳入,只需告訴核心修改的部分即可。
-
核心不再通過輪詢的方式找到就緒的檔案描述符,而是通過非同步 IO 事件喚醒。
-
核心僅會將有 IO 事件的檔案描述符返回給使用者,使用者也無需遍歷整個檔案描述符集合。
具體,作業系統提供了這三個函式。
第一步,建立一個 epoll 控制代碼
int epoll_create(int size);
第二步,向核心新增、修改或刪除要監控的檔案描述符。
int epoll_ctl(
int epfd, int op, int fd, struct epoll_event *event);
第三步,類似發起了 select() 呼叫
int epoll_wait(
int epfd, struct epoll_event *events, int max events, int timeout);
使用起來,其內部原理就像如下一般絲滑。
如果你想繼續深入瞭解 epoll 的底層原理,推薦閱讀飛哥的《圖解 | 深入揭祕 epoll 是如何實現 IO 多路複用的!》,從 linux 原始碼級別,一行一行非常硬核地解讀 epoll 的實現原理,且配有大量方便理解的圖片,非常適合原始碼控的小夥伴閱讀。
後記
大白話總結一下。
一切的開始,都起源於這個 read 函式是作業系統提供的,而且是阻塞的,我們叫它 阻塞 IO。
為了破這個局,程式設計師在使用者態通過多執行緒來防止主執行緒卡死。
後來作業系統發現這個需求比較大,於是在作業系統層面提供了非阻塞的 read 函式,這樣程式設計師就可以在一個執行緒內完成多個檔案描述符的讀取,這就是 非阻塞 IO。
但多個檔案描述符的讀取就需要遍歷,當高併發場景越來越多時,使用者態遍歷的檔案描述符也越來越多,相當於在 while 迴圈裡進行了越來越多的系統呼叫。
後來作業系統又發現這個場景需求量較大,於是又在作業系統層面提供了這樣的遍歷檔案描述符的機制,這就是 IO 多路複用。
多路複用有三個函式,最開始是 select,然後又發明了 poll 解決了 select 檔案描述符的限制,然後又發明了 epoll 解決 select 的三個不足。
所以,IO 模型的演進,其實就是時代的變化,倒逼著作業系統將更多的功能加到自己的核心而已。
如果你建立了這樣的思維,很容易發現網上的一些錯誤。
比如好多文章說,多路複用之所以效率高,是因為用一個執行緒就可以監控多個檔案描述符。
這顯然是知其然而不知其所以然,多路複用產生的效果,完全可以由使用者態去遍歷檔案描述符並呼叫其非阻塞的 read 函式實現。而多路複用快的原因在於,作業系統提供了這樣的系統呼叫,使得原來的 while 迴圈裡多次系統呼叫,變成了一次系統呼叫 + 核心層遍歷這些檔案描述符。
就好比我們平時寫業務程式碼,把原來 while 迴圈裡調 http 介面進行批量,改成了讓對方提供一個批量新增的 http 介面,然後我們一次 rpc 請求就完成了批量新增。
epoll 驚群問題
傳統 JAVA 網路連線寫法
客戶端
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
try {
// 和伺服器建立連線
Socket socket = new Socket("localhost",8088);
// 要傳送給伺服器的資訊
OutputStream os = socket.getOutputStream();
PrintWriter pw = new PrintWriter(os);
pw.write("客戶端傳送資訊");
pw.flush();
socket.shutdownOutput();
// 從伺服器接收的資訊
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String info = null;
while((info = br.readLine())!=null){
System.out.println("我是客戶端,伺服器返回資訊:"+info);
}
br.close();
is.close();
os.close();
pw.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
服務端程式碼
public class SocketServer {
public static void main(String[] args) {
try {
// 建立服務端socket , 這個地方可以設定多個程序監聽同一個埠!!
ServerSocket serverSocket = new ServerSocket(8088);
// 建立客戶端socket
Socket socket = new Socket();
//迴圈監聽等待客戶端的連線
while(true){
// 監聽客戶端 , 如果是多個程序
socket = serverSocket.accept();
ServerThread thread = new ServerThread(socket);
thread.start();
InetAddress address=socket.getInetAddress();
System.out.println("當前客戶端的IP:"+address.getHostAddress());
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
public class ServerThread extends Thread{
private Socket socket = null;
public ServerThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream is=null;
InputStreamReader isr=null;
BufferedReader br=null;
OutputStream os=null;
PrintWriter pw=null;
try {
is = socket.getInputStream();
isr = new InputStreamReader(is);
br = new BufferedReader(isr);
String info = null;
while((info=br.readLine())!=null){
System.out.println("我是伺服器,客戶端說:"+info);
}
socket.shutdownInput();
os = socket.getOutputStream();
pw = new PrintWriter(os);
pw.write("伺服器歡迎你");
pw.flush();
} catch (Exception e) {
// TODO: handle exception
} finally{
//關閉資源
try {
if(pw!=null)
pw.close();
if(os!=null)
os.close();
if(br!=null)
br.close();
if(isr!=null)
isr.close();
if(is!=null)
is.close();
if(socket!=null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
epoll 如何使用
下面是 epoll 的示例 , 重點看一下返回值 , 即 epoll_wait 返回的資料和前面的 select ,poll 這些函式的呼叫的不同
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#define PROCESS_NUM 10
static int create_and_bind(char *port)
{
int fd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(atoi(port));
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
return fd;
}
static int make_socket_non_blocking(int sfd)
{
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1)
{
perror("fcntl");
return -1;
}
return 0;
}
#define MAXEVENTS 64
/**
* @brief
*
*
*
* @param argc
* @param argv
* @return int
*
* https://www.bookstack.cn/read/linuxapi/docs-epoll_ctl.md 參考這個文件
*
*/
int main(int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;
sfd = create_and_bind("8001");
if (sfd == -1)
abort();
s = make_socket_non_blocking(sfd);
if (s == -1)
abort();
s = listen(sfd, SOMAXCONN);
if (s == -1)
{
perror("listen");
abort();
}
/**
* @brief
*
* int epoll_create(int size);
*
*
*
*
*/
efd = epoll_create(MAXEVENTS);
if (efd == -1)
{
perror("epoll_create");
abort();
}
event.data.fd = sfd;
//event.events = EPOLLIN | EPOLLET;
event.events = EPOLLIN;
/**
*
* int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
* op : 操作,(新增,修改或是刪除)
*
* 向 epoll 裡面核心新增、修改或刪除要監控的檔案描述符。
*
*/
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror("epoll_ctl");
abort();
}
/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof event);
int k;
for (k = 0; k < PROCESS_NUM; k++)
{
int pid = fork();
if (pid == 0)
{
/**
* @brief
*
* pid == 0 表示進入到 子程序
*
*/
printf("在子程序 %d 裡 , 當前 k 值 : %d \n", getpid(), k);
/* The event loop */
while (1)
{
int n, i;
/**
* @brief
*
* epoll_wait 這個函式會阻塞 ,返回的是監聽事件的數量
* 其中 , events 表示出參,記錄準備好的fd。該引數為向量(陣列),由呼叫方分配空間。
*
*/
n = epoll_wait(efd, events, MAXEVENTS, -1);
printf("process %d 從 epoll_wait 中醒來 \n", getpid());
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}
else if (sfd == events[i].data.fd)
{
// 看到了嗎? 上面拿的是 events[i].data.fd ,就緒好的連線(控制代碼)
// 從打印出來的log 可以看到多個 子 process 都會從這個 epoll_wait 中喚醒 ,而最終
/* We have a notification on the listening socket, which means one or more incoming connections. */
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
/**
* @brief
*
* epoll_wait 了之後再進行 accept , 把準備好的控制代碼放到 accept 中去
*/
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1)
{
printf("process %d accept 失敗!\n", getpid());
break;
}
printf("process %d accept 成功!\n", getpid());
/* Make the incoming socket non-blocking and add it to the list of fds to monitor. */
close(infd);
}
}
}
}
}
int status;
wait(&status);
free(events);
close(sfd);
return EXIT_SUCCESS;
}
epoll_wait 返回的資料 epoll_event 的結構如下 , (注意: events
事件裡面放著epoll_data_t
)
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
nginx 是如何解決 epoll 驚群問題
執行上面的例子可以知道 epoll_wait 後 ,會有多個子 process 喚醒,讓我們來看一下nginx 有多少個子 process 是進行 epoll_wait 的.
以前的是這樣的 :
後來發現多個 worker (多個子 process會有驚群效應), 於是出現了一個鎖來控制驚群
但是可以而知, 用鎖來控制當併發量大的時候, 可能導致效能不好
於是出現了這種 ,不在侷限於一個 socket ,而是持有多個socket , 然後再分配的時候作業系統做負載均衡, 就可以最高效率地處理請求了.nginx 也是這樣解決的
accept_mutex 鎖 (應用層面)
如果開啟了 accept_mutex 鎖,每個 worker 都會先去搶自旋鎖,只有搶佔成功了,才把 socket 加入到 epoll 中,accept 請求,然後釋放鎖。accept_mutex 鎖也有負載均衡的作用。
accept_mutex 效率低下,特別是在長連線的時候。因為長連線時,一個程序長時間佔用 accept_mutex 鎖,使得其它程序得不到 accept 的機會(運氣衰,一直搶不到鎖)。因此不建議使用,預設是關閉的。
EPOLLEXCLUSIVE 標識 (作業系統層面)
EPOLLEXCLUSIVE 是 4.5+ 核心新新增的一個 epoll 的標識,Ngnix 在 1.11.3 之後添加了 NGX_EXCLUSIVE_EVENT。
EPOLLEXCLUSIVE 標識會保證一個事件發生時候只有一個執行緒會被喚醒,以避免多偵聽下的“驚群”問題。不過任一時候只能有一個工作執行緒呼叫 accept,限制了真正並行的吞吐量。
可以看到這是作業系統層面的優化了
SO_REUSEPORT 選項 (作業系統層面)
SO_REUSEPORT 是驚群最好的解決方法,Ngnix 在 1.9.1 中加入了這個選項,每個 worker 都有自己的 socket,這些 socket 都 bind 同一個埠。當新請求到來時,核心根據四元組資訊進行負載均衡,非常高效。(以前共用1個socket )
這算是提升最大的方案, 由一個socket , 到多個 socket