1. 程式人生 > 其它 >unix網路程式設計2.5——高併發伺服器(五)epoll進階篇——基於epoll實現reactor

unix網路程式設計2.5——高併發伺服器(五)epoll進階篇——基於epoll實現reactor

目錄

系列文章

閱讀本文需要先閱讀下面的文章:

unix網路程式設計1.1——TCP協議詳解(一)

unix網路程式設計2.1——高併發伺服器(一)基礎——io與檔案描述符、socket程式設計與單程序服務端客戶端實現

unix網路程式設計2.2——高併發伺服器(二)多程序與多執行緒實現

unix網路程式設計2.3——高併發伺服器(三)多路IO複用之select

unix網路程式設計2.4——高併發伺服器(四)epoll基礎篇

前言

伺服器reactor模型

網路IO涉及的系統物件

  • 網路IO涉及的系統物件包括:使用者空間呼叫IO的程序或執行緒核心空間的核心系統
  • 如下圖所示,當IO操作read,它會經歷兩個階段:
  1. 等待資料準備就緒(如果是阻塞IO則會阻塞在這裡)
  2. 將資料從核心緩衝區拷貝到使用者程序或執行緒自定義的buf中

IO多路複用

  • 對於高併發程式設計,網路連線上的訊息處理往往分為兩個階段:等待訊息準備好訊息處理。當使用預設的阻塞socket時(如上圖的read),往往時把這兩個階段合二為一,這樣操作socket的程式碼所在的執行緒就得睡眠來等待訊息準備好,這導致了高併發下執行緒會頻繁的睡眠、喚醒,從而影響了cpu的使用效率。
  • 高併發程式設計方法會把兩個階段分開處理(等待訊息準備好訊息處理)。這要求必須使用非阻塞的IO,否則,處理訊息的程式碼段很容易導致條件不滿足時,所線上程又進入了睡眠等待階段,造成"死鎖"。
  • 此時就需要在等待訊息準備好階段,有執行緒主動查詢(這個執行緒還是需要睡眠進入阻塞態的),或者說讓1個執行緒為所有連線等待:這就是IO多路複用。 它也可能睡眠等待,但是不要緊,因為它"一對多,可以監控所有連線"。這樣,當執行緒被喚醒執行時,就一定是有一些連線準備好或者資料準備好了。(可以實現這個功能的介面代表為select和epoll,在之前的文章已經有所介紹,本文要介紹基於epoll實現reactor模型。)
  • 作為一個高效能伺服器程式通常需要考慮處理三類事件:I/O 事件,定時事件及訊號。兩種 高效 的事件處理模型: Reactor 和 Proactor 。

reactor模型

  • 首先來回想一下普通函式呼叫的機制:程式呼叫某函式,函式執行,程式等待,函式將結果和控制權返回給程式,程式繼續處理。 Reactor 釋義 反應堆,是一種事件驅動機制。和普通函式呼叫的不同之處在於:應用程式不是主動的呼叫某個 API 完成處理,而是恰恰相反, Reactor 逆置了事件處理流程,應用程式需要提供相應的介面並註冊到 Reactor 上,如果相應的時間發生, Reactor 將主動呼叫應用程式註冊的介面,這些介面又稱為 回撥函式
  • Reactor 模式是處理併發I/O 比較常見的一種模式,用於同步I/O,中心思想是將所有要處理的I/O 事件註冊到一箇中心I/O 多路複用器上,同時主執行緒/程序阻塞在多路複用器上;一旦有I/O 事件到來或是準備就緒(檔案描述符或socket 可讀、寫),多路複用器返回並將事先註冊的相應I/O 事件分發到對應的處理器中。

遇到的問題

1.陣列下標越界

  • 解放辦法:使用動態陣列,更好的資料結構(下面會說)

2.socket: Too many open files fd的個數限制

  • 解決辦法 ulimit 修改資源限制

ulimit 無法反覆修改,只能設定1次,重啟後失效,可以修改配置檔案

3.Cannot assign requested address 客戶端埠資源不夠

解決辦法

何為連線(針對tcp而言)

  • 五元組: (src_ip, src_port, des_ip, des_port, 協議) 確定1個連線

解決辦法1:檢視(修改)資源上限

確認port_range範圍
  • sudo sysctl -a | grep port_range
  • 或 cat /proc/sys/net/ipv4/ip_local_port_range
32768 60999
修改配置
  • 從本機修改核心引數考慮:參考,上圖可知port範圍是60999-32768=28231個(第一次測試程式,客戶端最多分配28231個,與其保持一致)
  • sudo vi /etc/sysctl.conf開啟,再新增 net.ipv4.ip_local_port_range = 15000 60999
  • 再執行:sudo sysctl -p /etc/sysctl.conf,使生效
  • 檢視是否生效:cat /proc/sys/net/ipv4/ip_local_port_range
15000 60999
這樣能用的port範圍:60999-15000 = 45999

解決辦法2:改變五元組

  • 從實際可用的併發考慮:當出現"Cannot assign requested address"說明客戶端埠資源不夠,,如果我們有多張網絡卡,可以改變src_ip(此時客戶端作為源),這樣網絡卡的數量可以決定連線的個數,百萬級併發很容易實現;同理,服務端也可以改變其ip或擴大監聽的埠範圍,總之就是改變連線五元組的其中之一。

event_block 資料結構————reactor實現核心

  • 觀察下圖,要實現這樣一個具有動態擴容能力的動態陣列,有以下幾個好處:
  1. 不用一次性malloc一個非常大的連續記憶體,而是用連結串列next指標的方式,記憶體不要求有一整片很大的,避免記憶體碎片衝擊;
  2. 按照sock_item[1024]的維度去申請和管理記憶體,用多少申請多少;
  3. 具體的,假設有100w個客戶端連線連線,需要100w個fd,對應100w個sock_item項,是迴圈去申請出來的,free的時候也是一片片去free的;
  4. sock_item中包括了:fd,rbuf,rlen,wbuf,wlen,events,callback,讀寫解耦等待訊息處理訊息(回撥函式實現)分成了兩個階段,便於業務實現。

啥時候需要這種資料結構

  • 陣列->"連結串列陣列",block塊,為什麼這裡用這個而不是紅黑樹
  • 這種資料結構什麼時候適用:用來做儲存,查詢的時間複雜度,儲存的空間複雜度
  • 儲存有序、有規律的資料 key / 取餘 1024等

測試併發(客戶端不斷連線)原始碼

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>

#define MAX_BUFFER		128
#define MAX_EPOLLSIZE	(384*1024)
#define MAX_PORT		1

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int isContinue = 0;

static int ntySetNonblock(int fd) {
	int flags;

	flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0) return flags;
	flags |= O_NONBLOCK;
	if (fcntl(fd, F_SETFL, flags) < 0) return -1;
	return 0;
}

static int ntySetReUseAddr(int fd) {
	int reuse = 1;
	return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}



int main(int argc, char **argv) {
	if (argc <= 2) {
		printf("Usage: %s ip port\n", argv[0]);
		exit(0);
	}

	const char *ip = argv[1];
	int port = atoi(argv[2]);
	int connections = 0;
	char buffer[128] = {0};
	int i = 0, index = 0;

	struct epoll_event events[MAX_EPOLLSIZE];
	
	int epoll_fd = epoll_create(MAX_EPOLLSIZE);
	
	strcpy(buffer, " Data From MulClient\n");
		
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = inet_addr(ip);

	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);

	while (1) {
		if (++index >= MAX_PORT) index = 0;
		
		struct epoll_event ev;
		int sockfd = 0;

		if (connections < 340000 && !isContinue) {
			sockfd = socket(AF_INET, SOCK_STREAM, 0);
			if (sockfd == -1) {
				perror("socket");
				goto err;
			}

			//ntySetReUseAddr(sockfd);
			addr.sin_port = htons(port+index);

			if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
				perror("connect");
				goto err;
			}
			ntySetNonblock(sockfd);
			ntySetReUseAddr(sockfd);

			sprintf(buffer, "Hello Server: client --> %d\n", connections);
			send(sockfd, buffer, strlen(buffer), 0);

			ev.data.fd = sockfd;
			ev.events = EPOLLIN | EPOLLOUT;
			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
		
			connections ++;
		}
		//connections ++;
		if (connections % 1000 == 999 || connections >= 340000) {
			struct timeval tv_cur;
			memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
			
			gettimeofday(&tv_begin, NULL);

			int time_used = TIME_SUB_MS(tv_begin, tv_cur);
			printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);

			int nfds = epoll_wait(epoll_fd, events, connections, 100);
			for (i = 0;i < nfds;i ++) {
				int clientfd = events[i].data.fd;

				if (events[i].events & EPOLLOUT) {
					sprintf(buffer, "data from %d\n", clientfd);
					send(sockfd, buffer, strlen(buffer), 0);
				} else if (events[i].events & EPOLLIN) {
					char rBuffer[MAX_BUFFER] = {0};				
					ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
					if (length > 0) {
						printf(" RecvBuffer:%s\n", rBuffer);

						if (!strcmp(rBuffer, "quit")) {
							isContinue = 0;
						}
						
					} else if (length == 0) {
						printf(" Disconnect clientfd:%d\n", clientfd);
						connections --;
						close(clientfd);
					} else {
						if (errno == EINTR) continue;

						printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
						close(clientfd);
					}
				} else {
					printf(" clientfd:%d, errno:%d\n", clientfd, errno);
					close(clientfd);
				}
			}
		}

		usleep(500);
	}

	return 0;

err:
	printf("error : %s\n", strerror(errno));
	return 0;
	
}

服務端原始碼server.c

#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <ctype.h>
#include <unistd.h>
#include <stdio.h>
// struct sockaddr_in對應的標頭檔案 <arpa/inet.h> 
#include <arpa/inet.h>  
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <errno.h> 
#include <fcntl.h>

#define SERVER_PORT 9999
#define PORT_COUNT 100
#define MAX_EVENTS 128
#define BUFF_SIZE 128

#define ITEMS_LEN 1024

extern int errno;

/*
* 待實現:
* 1.listen_fd和conn_fd的sock_item有沒有區別?
* 2.假色有100w的sock_item,如何快速插入和查詢?
*/

// TODO: listen_fd + conn_fd
typedef struct sock_item {
    int fd;
    
    char *r_buf;
    int r_len;

    char *w_buf;
    int w_len;

    int event; // fd對應的事件
    void (*recv)(int fd, char *buf, int len);
    void (*send)(int fd, char *buf, int len);

    void (*accept)(int fd);
} sock_item;

/*
* 使用場景:查詢,有序,有規律
* 1.查詢時間複雜度?
* 2.儲存空間複雜度?
*/
typedef struct event_block {
    sock_item *items;
    struct event_block *next;
} event_block;

// TODO
typedef struct reactor {
    int epfd;
    int blkcnt;
    event_block *ev_blk;
} reactor;

/* new event_block */
int ReactorResize(reactor *r)
{
    /* 不能做頭插法,因為是順序增加的 */ 
    if (r == NULL) return -1;

    event_block *ev_blk = r->ev_blk;
    while (ev_blk != NULL && ev_blk->next != NULL) {
        ev_blk = ev_blk->next; // 找到最後一個節點,後面會用尾插法,插入新的節點
    }

    /******************** 申請 sock_item ********************/
    sock_item *item = (sock_item *)malloc(ITEMS_LEN * sizeof(sock_item));
    if (item == NULL) return -1;
    memset(item, 0, ITEMS_LEN * sizeof(sock_item));

    /******************** 申請 event_block ********************/
    event_block *new_ev_blk = (event_block *)malloc(sizeof(event_block));
    if (new_ev_blk == NULL) {
        free(item); // 注意異常情況,前面的記憶體也要回收
        return -1;
    }
    new_ev_blk->items = item; 
    new_ev_blk->next = NULL;

    if (ev_blk == NULL) {
        r->ev_blk = new_ev_blk; // 這種情況一般是第一次申請記憶體,頭節點還是NULL
        r->ev_blk = new_ev_blk; // 這種情況一般是第一次申請記憶體,頭節點還是NULL
    } else {
        ev_blk->next = new_ev_blk; // 尾插法
    }

    r->blkcnt++;
    return 0;
}

sock_item* ReactorLookUp(reactor *r, int socket_fd)
{
    if (r == NULL) return NULL;
    // if (r->ev_blk == NULL) return NULL;
    if (socket_fd <= 0) return NULL;

    int blk_idx = socket_fd / ITEMS_LEN;
    /******************** 呼叫 ReactorResize 每1024個申請一個ev_blk塊 ********************/
    while (blk_idx >= r->blkcnt) {
        ReactorResize(r);
    }

    int i = 0;
    event_block *blk = r->ev_blk;
    while (i++ < blk_idx && blk != NULL) {
        blk = blk->next; // ReactorResize種保證blk非NULL
    }

    return &blk->items[socket_fd % ITEMS_LEN];
}

void SetNonBlocking(int fd)
{
    /* 設定fd為non-blocking*/
    int flag = fcntl(fd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(fd, F_SETFD, flag);
}

int InitServer(short port)
{
    int listen_fd;
    struct sockaddr_in server_addr;
    
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);

    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
#if 1
    SetNonBlocking(listen_fd);
#endif

    bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
    listen(listen_fd, 10);
    return listen_fd;
}

int IsListenFd(int *listen_fds, int fd)
{
    for (int i = 0; i < PORT_COUNT; i++) {
        if (listen_fds[i] == fd) {
            return 1;
        }
    }
    return 0;
}

int main(void)
{
    int conn_fd, client_fd;
    int i, n, res;
    /******************** epoll init ********************/
    reactor *r = (reactor *)calloc(1, sizeof(reactor));
    if (r == NULL) return -1;
    r->blkcnt = 0;
    r->epfd = epoll_create(1); // 紅黑樹頭節點
    int n_ready;
    struct epoll_event ev, events[MAX_EVENTS];
#if 0
    r->items = (sock_item *)calloc(MAX_EVENTS, sizeof(sock_item));
    if (r->items == NULL) return -1;
#endif

    /******************** 連線五元組 繫結多個port ********************/
    int listen_fds[PORT_COUNT];
    for (i = 0; i < PORT_COUNT; i++) {
        listen_fds[i] = InitServer(SERVER_PORT + i);
        ev.data.fd = listen_fds[i];
        ev.events = EPOLLIN; // 監聽listen_fd的讀事件
        res = epoll_ctl(r->epfd, EPOLL_CTL_ADD, listen_fds[i], &ev);
        if (res == -1) return -1;
    }

    struct sockaddr_in client_addr; 
    int client_addr_len = sizeof(client_addr);

    /******************** 埠複用 ********************/
#if 0
    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
#endif

    for (i = 0; i < PORT_COUNT; i++) {
        
    }

    while (1) {
        n_ready = epoll_wait(r->epfd, events, MAX_EVENTS, -1); // 阻塞監聽
        
        for (i = 0; i < n_ready; i++) {
            client_fd = events[i].data.fd;
            /**************************************** 建立連線階段 ****************************************/
            if (IsListenFd(listen_fds, client_fd)) {
                conn_fd = accept(client_fd, (struct sockaddr *)&client_addr, &client_addr_len);
#if 1
                SetNonBlocking(conn_fd);
#endif
                /* 監聽conn_fd的讀事件 */ 
                ev.data.fd = conn_fd;
                ev.events = EPOLLIN; 
                res = epoll_ctl(r->epfd, EPOLL_CTL_ADD, conn_fd, &ev);
                if (res == -1) return -1;
#if 0
                /* 為conn_fd申請記憶體 */
                r->items[conn_fd].fd = conn_fd;
                r->items[conn_fd].r_buf = calloc(1, BUFF_SIZE);
                r->items[conn_fd].r_len = 0;
                r->items[conn_fd].w_buf = calloc(1, BUFF_SIZE);
                r->items[conn_fd].w_len = 0;
                r->items[conn_fd].event = EPOLLIN;
#else
                sock_item *item = ReactorLookUp(r, conn_fd);
                if (item == NULL) return -1;
                item->fd = conn_fd;
                item->r_buf = calloc(1, BUFF_SIZE);
                item->r_len = 0;
                item->w_buf = calloc(1, BUFF_SIZE);
                item->w_len = 0;
#endif
            } else {
                /**************************************** 資料傳輸階段 ****************************************/
                
                if (events[i].events & EPOLLIN) {
                    sock_item *item = ReactorLookUp(r, client_fd);
                    if (item == NULL) return -1;
                    char *r_buf = item->r_buf;
                    char *w_buf = item->w_buf;
                    memset(r_buf, 0, BUFF_SIZE);
                    memset(w_buf, 0, BUFF_SIZE);
                    n = recv(client_fd, r_buf, BUFF_SIZE, 0);
                    if (n == 0) {
                        // 客戶端斷開了連線
                        free(r_buf);
                        free(w_buf);
                        // epoll_ctl(r->epfd, EPOLL_CTL_DEL, client_fd, NULL);
                        close(client_fd);
                    } else if (n > 0) {
                        printf("收到了客戶端發來的資料, n=%d, buf=%s\n", n, r_buf);
                        memcpy(w_buf, r_buf, BUFF_SIZE);

                        /* 讀事件->寫事件 */ 
                        ev.data.fd = client_fd;
                        ev.events = EPOLLOUT; 
                        res = epoll_ctl(r->epfd, EPOLL_CTL_MOD, client_fd, &ev);
                    } else {
                        return -1;
                    }
                } else if (events[i].events & EPOLLOUT) {
                    sock_item *item = ReactorLookUp(r, client_fd);
                    if (item == NULL) return -1;
                    char *w_buf = item->w_buf;
                    n = send(client_fd, w_buf, BUFF_SIZE, 0);
                    printf("給客戶端回的資料..., n=%d, buf=%s\n", n, w_buf);

                    /* 讀事件->寫事件 */ 
                    ev.data.fd = client_fd;
                    ev.events = EPOLLIN;
                    res = epoll_ctl(r->epfd, EPOLL_CTL_MOD, client_fd, &ev);
                }
            }
        }

    }
    return 0;
}