1. 程式人生 > 其它 >[半原創]nio學習-多路複用

[半原創]nio學習-多路複用

文章前半部分轉載自 : https://mp.weixin.qq.com/s/YdIdoZ_yusVWza1PU7lWaw ,本文在格式上進行優化,方便閱讀 , 非原創
文章後半部分是自己寫的, 原創

概述

為了講多路複用,當然還是要跟風,採用鞭屍的思路,先講講傳統的網路 IO 的弊端,用拉踩的方式捧起多路複用 IO 的優勢。
為了方便理解,以下所有程式碼都是虛擬碼,知道其表達的意思即可。

阻塞 IO

服務端為了處理客戶端的連線和請求的資料,寫了如下程式碼。


listenfd = socket();   // 開啟一個網路通訊埠
bind(listenfd);        // 繫結
listen(listenfd);      // 監聽
while(1) {
  connfd = accept(listenfd);  // 阻塞建立連線
  int n = read(connfd, buf);  // 阻塞讀資料
  doSomeThing(buf);  // 利用讀到的資料做些什麼
  close(connfd);     // 關閉連線,迴圈等待下一個連線
}

這段程式碼會執行得磕磕絆絆,就像這樣。

可以看到,服務端的執行緒阻塞在了兩個地方,一個是 accept 函式,一個是 read 函式。

如果再把 read 函式的細節展開,我們會發現其阻塞在了兩個階段。

這就是傳統的阻塞 IO。

整體流程如下圖。

所以,如果這個連線的客戶端一直不發資料,那麼服務端執行緒將會一直阻塞在 read 函式上不返回,也無法接受其他客戶端連線。

這肯定是不行的。

非阻塞 IO

為了解決上面的問題,其關鍵在於改造這個 read 函式。

有一種聰明的辦法是,每次都建立一個新的程序或執行緒,去呼叫 read 函式,並做業務處理。

while(1) {
  connfd = accept(listenfd);  // 阻塞建立連線
  pthread_create(doWork);  // 建立一個新的執行緒
}
void doWork() {
  int n = read(connfd, buf);  // 阻塞讀資料
  doSomeThing(buf);  // 利用讀到的資料做些什麼
  close(connfd);     // 關閉連線,迴圈等待下一個連線
}

這樣,當給一個客戶端建立好連線後,就可以立刻等待新的客戶端連線,而不用阻塞在原客戶端的 read 請求上。

不過,這不叫非阻塞 IO,只不過用了多執行緒的手段使得主執行緒沒有卡在 read 函式上不往下走罷了。作業系統為我們提供的 read 函式仍然是阻塞的。

所以真正的非阻塞 IO,不能是通過我們使用者層的小把戲,而是要懇請作業系統為我們提供一個非阻塞的 read 函式。

這個 read 函式的效果是,如果沒有資料到達時(到達網絡卡並拷貝到了核心緩衝區),立刻返回一個錯誤值(-1),而不是阻塞地等待。

作業系統提供了這樣的功能,只需要在呼叫 read 前,將檔案描述符設定為非阻塞即可。

fcntl(connfd, F_SETFL, O_NONBLOCK);
int n = read(connfd, buffer) != SUCCESS);

這樣,就需要使用者執行緒迴圈呼叫 read,直到返回值不為 -1,再開始處理業務。

這裡我們注意到一個細節。

非阻塞的 read,指的是在資料到達前,即資料還未到達網絡卡,或者到達網絡卡但還沒有拷貝到核心緩衝區之前,這個階段是非阻塞的。

當資料已到達核心緩衝區,此時呼叫 read 函式仍然是阻塞的,需要等待資料從核心緩衝區拷貝到使用者緩衝區,才能返回。

整體流程如下圖

IO 多路複用

為每個客戶端建立一個執行緒,伺服器端的執行緒資源很容易被耗光。

當然還有個聰明的辦法,我們可以每 accept 一個客戶端連線後,將這個檔案描述符(connfd)放到一個數組裡。

fdlist.add(connfd);

然後弄一個新的執行緒去不斷遍歷這個陣列,呼叫每一個元素的非阻塞 read 方法。

while(1) {
  for(fd <-- fdlist) {
    if(read(fd) != -1) {
      doSomeThing();
    }
  }
}

這樣,我們就成功用一個執行緒處理了多個客戶端連線。

你是不是覺得這有些多路複用的意思?

但這和我們用多執行緒去將阻塞 IO 改造成看起來是非阻塞 IO 一樣,這種遍歷方式也只是我們使用者自己想出的小把戲,每次遍歷遇到 read 返回 -1 時仍然是一次浪費資源的系統呼叫。

在 while 迴圈裡做系統呼叫,就好比你做分散式專案時在 while 裡做 rpc 請求一樣,是不划算的。

所以,還是得懇請作業系統老大,提供給我們一個有這樣效果的函式,我們將一批檔案描述符通過一次系統呼叫傳給核心,由核心層去遍歷,才能真正解決這個問題。

select

select 是作業系統提供的系統呼叫函式,通過它,我們可以把一個檔案描述符的陣列發給作業系統, 讓作業系統去遍歷,確定哪個檔案描述符可以讀寫, 然後告訴我們去處理:

select系統呼叫的函式定義如下。

int select(
    int nfds,
    fd_set *readfds,
    fd_set *writefds,
    fd_set *exceptfds,
    struct timeval *timeout);
// nfds:監控的檔案描述符集裡最大檔案描述符加1
// readfds:監控有讀資料到達檔案描述符集合,傳入傳出引數
// writefds:監控寫資料到達檔案描述符集合,傳入傳出引數
// exceptfds:監控異常發生達檔案描述符集合, 傳入傳出引數
// timeout:定時阻塞監控時間,3種情況
//  1.NULL,永遠等下去
//  2.設定timeval,等待固定時間
//  3.設定timeval裡時間均為0,檢查描述字後立即返回,輪詢

服務端程式碼,這樣來寫。

首先一個執行緒不斷接受客戶端連線,並把 socket 檔案描述符放到一個 list 裡。

while(1) {
  connfd = accept(listenfd);
  fcntl(connfd, F_SETFL, O_NONBLOCK);
  fdlist.add(connfd);
}

然後,另一個執行緒不再自己遍歷,而是呼叫 select,將這批檔案描述符 list 交給作業系統去遍歷。

while(1) {
  connfd = accept(listenfd);
  fcntl(connfd, F_SETFL, O_NONBLOCK);
  fdlist.add(connfd);
}

不過,當 select 函式返回後,使用者依然需要遍歷剛剛提交給作業系統的 list。

只不過,作業系統會將準備就緒的檔案描述符做上標識,使用者層將不會再有無意義的系統呼叫開銷。

while(1) {
  nready = select(list);
  // 使用者層依然要遍歷,只不過少了很多無效的系統呼叫
  for(fd <-- fdlist) {
    if(fd != -1) {
      // 只讀已就緒的檔案描述符
      read(fd, buf);
      // 總共只有 nready 個已就緒描述符,不用過多遍歷
      if(--nready == 0) break;
    }
  }
}

正如剛剛的動圖中所描述的,其直觀效果如下。(同一個動圖消耗了你兩次流量,氣不氣?)

可以看出幾個細節:

  1. select 呼叫需要傳入 fd 陣列,需要拷貝一份到核心,高併發場景下這樣的拷貝消耗的資源是驚人的。(可優化為不復制)
  2. select 在核心層仍然是通過遍歷的方式檢查檔案描述符的就緒狀態,是個同步過程,只不過無系統呼叫切換上下文的開銷。(核心層可優化為非同步事件通知)
  3. select 僅僅返回可讀檔案描述符的個數,具體哪個可讀還是要使用者自己遍歷。(可優化為只返回給使用者就緒的檔案描述符,無需使用者做無效的遍歷)
    整個 select 的流程圖如下。

可以看到,這種方式,既做到了一個執行緒處理多個客戶端連線(檔案描述符),又減少了系統呼叫的開銷(多個檔案描述符只有一次 select 的系統呼叫 + n 次就緒狀態的檔案描述符的 read 系統呼叫)。

poll

poll 也是作業系統提供的系統呼叫函式。

int poll(struct pollfd *fds, nfds_tnfds, int timeout);

struct pollfd {
  intfd; /*檔案描述符*/
  shortevents; /*監控的事件*/
  shortrevents; /*監控事件中滿足條件返回的事件*/
};

它和 select 的主要區別就是,去掉了 select 只能監聽 1024 個檔案描述符的限制。

epoll

epoll 是最終的大 boss,它解決了 select 和 poll 的一些問題。

還記得上面說的 select 的三個細節麼?

  1. select 呼叫需要傳入 fd 陣列,需要拷貝一份到核心,高併發場景下這樣的拷貝消耗的資源是驚人的。(可優化為不復制)

  2. select 在核心層仍然是通過遍歷的方式檢查檔案描述符的就緒狀態,是個同步過程,只不過無系統呼叫切換上下文的開銷。(核心層可優化為非同步事件通知)

  3. select 僅僅返回可讀檔案描述符的個數,具體哪個可讀還是要使用者自己遍歷。(可優化為只返回給使用者就緒的檔案描述符,無需使用者做無效的遍歷)
    所以 epoll 主要就是針對這三點進行了改進。

  4. 核心中儲存一份檔案描述符集合,無需使用者每次都重新傳入,只需告訴核心修改的部分即可。

  5. 核心不再通過輪詢的方式找到就緒的檔案描述符,而是通過非同步 IO 事件喚醒。

  6. 核心僅會將有 IO 事件的檔案描述符返回給使用者,使用者也無需遍歷整個檔案描述符集合。
    具體,作業系統提供了這三個函式。

第一步,建立一個 epoll 控制代碼

int epoll_create(int size);

第二步,向核心新增、修改或刪除要監控的檔案描述符。

int epoll_ctl(
  int epfd, int op, int fd, struct epoll_event *event);

第三步,類似發起了 select() 呼叫

int epoll_wait(
  int epfd, struct epoll_event *events, int max events, int timeout);

使用起來,其內部原理就像如下一般絲滑。

如果你想繼續深入瞭解 epoll 的底層原理,推薦閱讀飛哥的《圖解 | 深入揭祕 epoll 是如何實現 IO 多路複用的!》,從 linux 原始碼級別,一行一行非常硬核地解讀 epoll 的實現原理,且配有大量方便理解的圖片,非常適合原始碼控的小夥伴閱讀。

後記

大白話總結一下。

一切的開始,都起源於這個 read 函式是作業系統提供的,而且是阻塞的,我們叫它 阻塞 IO。
為了破這個局,程式設計師在使用者態通過多執行緒來防止主執行緒卡死。
後來作業系統發現這個需求比較大,於是在作業系統層面提供了非阻塞的 read 函式,這樣程式設計師就可以在一個執行緒內完成多個檔案描述符的讀取,這就是 非阻塞 IO。
但多個檔案描述符的讀取就需要遍歷,當高併發場景越來越多時,使用者態遍歷的檔案描述符也越來越多,相當於在 while 迴圈裡進行了越來越多的系統呼叫。

後來作業系統又發現這個場景需求量較大,於是又在作業系統層面提供了這樣的遍歷檔案描述符的機制,這就是 IO 多路複用。

多路複用有三個函式,最開始是 select,然後又發明了 poll 解決了 select 檔案描述符的限制,然後又發明了 epoll 解決 select 的三個不足。

所以,IO 模型的演進,其實就是時代的變化,倒逼著作業系統將更多的功能加到自己的核心而已。
如果你建立了這樣的思維,很容易發現網上的一些錯誤。

比如好多文章說,多路複用之所以效率高,是因為用一個執行緒就可以監控多個檔案描述符。
這顯然是知其然而不知其所以然,多路複用產生的效果,完全可以由使用者態去遍歷檔案描述符並呼叫其非阻塞的 read 函式實現。而多路複用快的原因在於,作業系統提供了這樣的系統呼叫,使得原來的 while 迴圈裡多次系統呼叫,變成了一次系統呼叫 + 核心層遍歷這些檔案描述符。

就好比我們平時寫業務程式碼,把原來 while 迴圈裡調 http 介面進行批量,改成了讓對方提供一個批量新增的 http 介面,然後我們一次 rpc 請求就完成了批量新增。

epoll 驚群問題

傳統 JAVA 網路連線寫法

客戶端

public class SocketClient {
	
	public static void main(String[] args) throws InterruptedException {
		try {
			// 和伺服器建立連線
			Socket socket = new Socket("localhost",8088);
			
			// 要傳送給伺服器的資訊
			OutputStream os = socket.getOutputStream();
			PrintWriter pw = new PrintWriter(os);
			pw.write("客戶端傳送資訊");
			pw.flush();
			
			socket.shutdownOutput();
			
			// 從伺服器接收的資訊
			InputStream is = socket.getInputStream();
			BufferedReader br = new BufferedReader(new InputStreamReader(is));
			String info = null;
			while((info = br.readLine())!=null){
				System.out.println("我是客戶端,伺服器返回資訊:"+info);
			}
			
			br.close();
			is.close();
			os.close();
			pw.close();
			socket.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
 
}

服務端程式碼

public class SocketServer {
	
	public static void main(String[] args) {
		try {
			// 建立服務端socket , 這個地方可以設定多個程序監聽同一個埠!! 
			ServerSocket serverSocket = new ServerSocket(8088);
			
			// 建立客戶端socket
			Socket socket = new Socket();	
			
			//迴圈監聽等待客戶端的連線
            while(true){
            	// 監聽客戶端 , 如果是多個程序
            	socket = serverSocket.accept();
            	
            	ServerThread thread = new ServerThread(socket);
            	thread.start();
            	
            	InetAddress address=socket.getInetAddress();
                System.out.println("當前客戶端的IP:"+address.getHostAddress());
            }
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
 
}
public class ServerThread extends Thread{
	
	private Socket socket = null;
	
	public ServerThread(Socket socket) {
		this.socket = socket;
	}
 
	@Override
	public void run() {
		InputStream is=null;
        InputStreamReader isr=null;
        BufferedReader br=null;
        OutputStream os=null;
        PrintWriter pw=null;
        try {
			is = socket.getInputStream();
			isr = new InputStreamReader(is);
			br = new BufferedReader(isr);
			
			String info = null;
			
			while((info=br.readLine())!=null){
				System.out.println("我是伺服器,客戶端說:"+info);
			}
			socket.shutdownInput();
			
			os = socket.getOutputStream();
			pw = new PrintWriter(os);
			pw.write("伺服器歡迎你");
			
			pw.flush();
        } catch (Exception e) {
			// TODO: handle exception
		} finally{
			//關閉資源
            try {
                if(pw!=null)
                    pw.close();
                if(os!=null)
                    os.close();
                if(br!=null)
                    br.close();
                if(isr!=null)
                    isr.close();
                if(is!=null)
                    is.close();
                if(socket!=null)
                    socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
		}
	}
 
}

epoll 如何使用

下面是 epoll 的示例 , 重點看一下返回值 , 即 epoll_wait 返回的資料和前面的 select ,poll 這些函式的呼叫的不同

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#define PROCESS_NUM 10
static int create_and_bind(char *port)
{
    int fd = socket(PF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(atoi(port));
    bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    return fd;
}

static int make_socket_non_blocking(int sfd)
{
    int flags, s;

    flags = fcntl(sfd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl");
        return -1;
    }

    flags |= O_NONBLOCK;
    s = fcntl(sfd, F_SETFL, flags);
    if (s == -1)
    {
        perror("fcntl");
        return -1;
    }

    return 0;
}

#define MAXEVENTS 64


/**
 * @brief 
 * 
 * 
 * 
 * @param argc 
 * @param argv 
 * @return int 
 * 
 * https://www.bookstack.cn/read/linuxapi/docs-epoll_ctl.md 參考這個文件
 * 
 */
int main(int argc, char *argv[])
{
    int sfd, s;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;

    sfd = create_and_bind("8001");
    if (sfd == -1)
        abort();

    s = make_socket_non_blocking(sfd);
    if (s == -1)
        abort();

    s = listen(sfd, SOMAXCONN);
    if (s == -1)
    {
        perror("listen");
        abort();
    }

    /**
     * @brief 
     * 
     * int epoll_create(int size);
     * 
     * 
     * 
     * 
     */

    efd = epoll_create(MAXEVENTS);
    if (efd == -1)
    {
        perror("epoll_create");
        abort();
    }

    event.data.fd = sfd;
    //event.events = EPOLLIN | EPOLLET;
    event.events = EPOLLIN;

    /**
     * 
     *  int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
     *  op : 操作,(新增,修改或是刪除)
     *  
     *  向 epoll 裡面核心新增、修改或刪除要監控的檔案描述符。
     * 
     */

    s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);
    if (s == -1)
    {
        perror("epoll_ctl");
        abort();
    }

    /* Buffer where events are returned */
    events = calloc(MAXEVENTS, sizeof event);
    int k;
    for (k = 0; k < PROCESS_NUM; k++)
    {
        int pid = fork();
        if (pid == 0)
        {
            /**
             * @brief 
             *  
             *  pid == 0 表示進入到 子程序 
             * 
             */


            printf("在子程序 %d 裡 , 當前 k 值 :  %d \n", getpid(), k);

            /* The event loop */
            while (1)
            {
                int n, i;

                /**
                 * @brief 
                 *  
                 *  epoll_wait 這個函式會阻塞 ,返回的是監聽事件的數量
                 *  其中 , events 表示出參,記錄準備好的fd。該引數為向量(陣列),由呼叫方分配空間。
                 * 
                 */

                n = epoll_wait(efd, events, MAXEVENTS, -1);
                printf("process %d 從 epoll_wait 中醒來 \n", getpid());
                for (i = 0; i < n; i++)
                {
                    if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN)))
                    {
                        /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */
                        fprintf(stderr, "epoll error\n");
                        close(events[i].data.fd);
                        continue;
                    }
                    else if (sfd == events[i].data.fd)
                    {

                        // 看到了嗎? 上面拿的是 events[i].data.fd ,就緒好的連線(控制代碼)
                        // 從打印出來的log 可以看到多個 子 process 都會從這個 epoll_wait 中喚醒 ,而最終

                        /* We have a notification on the listening socket, which means one or more incoming connections. */
                        struct sockaddr in_addr;
                        socklen_t in_len;
                        int infd;
                        char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                        in_len = sizeof in_addr;

                        /**
                         * @brief
                         * 
                         *  epoll_wait 了之後再進行 accept , 把準備好的控制代碼放到 accept 中去 
                         */
                        infd = accept(sfd, &in_addr, &in_len);
                        if (infd == -1)
                        {
                            printf("process %d accept 失敗!\n", getpid());
                            break;
                        }
                        printf("process %d accept 成功!\n", getpid());

                        /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */
                        close(infd);
                    }
                }
            }
        }
    }
    int status;
    wait(&status);
    free(events);
    close(sfd);
    return EXIT_SUCCESS;
}

epoll_wait 返回的資料 epoll_event 的結構如下 , (注意: events事件裡面放著epoll_data_t)

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;
struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

nginx 是如何解決 epoll 驚群問題

執行上面的例子可以知道 epoll_wait 後 ,會有多個子 process 喚醒,讓我們來看一下nginx 有多少個子 process 是進行 epoll_wait 的.
以前的是這樣的 :

後來發現多個 worker (多個子 process會有驚群效應), 於是出現了一個鎖來控制驚群

但是可以而知, 用鎖來控制當併發量大的時候, 可能導致效能不好

於是出現了這種 ,不在侷限於一個 socket ,而是持有多個socket , 然後再分配的時候作業系統做負載均衡, 就可以最高效率地處理請求了.nginx 也是這樣解決的

accept_mutex 鎖 (應用層面)

如果開啟了 accept_mutex 鎖,每個 worker 都會先去搶自旋鎖,只有搶佔成功了,才把 socket 加入到 epoll 中,accept 請求,然後釋放鎖。accept_mutex 鎖也有負載均衡的作用。

accept_mutex 效率低下,特別是在長連線的時候。因為長連線時,一個程序長時間佔用 accept_mutex 鎖,使得其它程序得不到 accept 的機會(運氣衰,一直搶不到鎖)。因此不建議使用,預設是關閉的。

EPOLLEXCLUSIVE 標識 (作業系統層面)

EPOLLEXCLUSIVE 是 4.5+ 核心新新增的一個 epoll 的標識,Ngnix 在 1.11.3 之後添加了 NGX_EXCLUSIVE_EVENT。
EPOLLEXCLUSIVE 標識會保證一個事件發生時候只有一個執行緒會被喚醒,以避免多偵聽下的“驚群”問題。不過任一時候只能有一個工作執行緒呼叫 accept,限制了真正並行的吞吐量。

可以看到這是作業系統層面的優化了

SO_REUSEPORT 選項 (作業系統層面)

SO_REUSEPORT 是驚群最好的解決方法,Ngnix 在 1.9.1 中加入了這個選項,每個 worker 都有自己的 socket,這些 socket 都 bind 同一個埠。當新請求到來時,核心根據四元組資訊進行負載均衡,非常高效。(以前共用1個socket )

這算是提升最大的方案, 由一個socket , 到多個 socket

參考