unix網路程式設計2.5——高併發伺服器(五)epoll進階篇——基於epoll實現reactor
阿新 • • 發佈:2022-12-03
目錄
unix網路程式設計2.1——高併發伺服器(一)基礎——io與檔案描述符、socket程式設計與單程序服務端客戶端實現
系列文章
閱讀本文需要先閱讀下面的文章:
unix網路程式設計1.1——TCP協議詳解(一)
unix網路程式設計2.1——高併發伺服器(一)基礎——io與檔案描述符、socket程式設計與單程序服務端客戶端實現
unix網路程式設計2.2——高併發伺服器(二)多程序與多執行緒實現
unix網路程式設計2.3——高併發伺服器(三)多路IO複用之select
unix網路程式設計2.4——高併發伺服器(四)epoll基礎篇
前言
- 在unix網路程式設計2.4——高併發伺服器(四)epoll基礎篇中,實現了基礎版的服務端,並且將fd對應的讀寫buf進行了隔離,但是讀寫buf是全域性變數,本文要用上文結構體sock_item與reactor將讀寫buf按照fd的隔離,動態管理起來。
伺服器reactor模型
網路IO涉及的系統物件
- 網路IO涉及的系統物件包括:使用者空間呼叫IO的程序或執行緒和核心空間的核心系統
- 如下圖所示,當IO操作read,它會經歷兩個階段:
- 等待資料準備就緒(如果是阻塞IO則會阻塞在這裡)
- 將資料從核心緩衝區拷貝到使用者程序或執行緒自定義的buf中
IO多路複用
- 對於高併發程式設計,網路連線上的訊息處理往往分為兩個階段:等待訊息準備好和訊息處理。當使用預設的阻塞socket時(如上圖的read),往往時把這兩個階段合二為一,這樣操作socket的程式碼所在的執行緒就得睡眠來等待訊息準備好,這導致了高併發下執行緒會頻繁的睡眠、喚醒,從而影響了cpu的使用效率。
- 高併發程式設計方法會把兩個階段分開處理(等待訊息準備好和訊息處理)。這要求必須使用非阻塞的IO,否則,處理訊息的程式碼段很容易導致條件不滿足時,所線上程又進入了睡眠等待階段,造成"死鎖"。
- 此時就需要在等待訊息準備好階段,有執行緒主動查詢(這個執行緒還是需要睡眠進入阻塞態的),或者說讓1個執行緒為所有連線等待:這就是IO多路複用。 它也可能睡眠等待,但是不要緊,因為它"一對多,可以監控所有連線"。這樣,當執行緒被喚醒執行時,就一定是有一些連線準備好或者資料準備好了。(可以實現這個功能的介面代表為select和epoll,在之前的文章已經有所介紹,本文要介紹基於epoll實現reactor模型。)
- 作為一個高效能伺服器程式通常需要考慮處理三類事件:I/O 事件,定時事件及訊號。兩種 高效 的事件處理模型: Reactor 和 Proactor 。
reactor模型
- 首先來回想一下普通函式呼叫的機制:程式呼叫某函式,函式執行,程式等待,函式將結果和控制權返回給程式,程式繼續處理。 Reactor 釋義 反應堆,是一種事件驅動機制。和普通函式呼叫的不同之處在於:應用程式不是主動的呼叫某個 API 完成處理,而是恰恰相反, Reactor 逆置了事件處理流程,應用程式需要提供相應的介面並註冊到 Reactor 上,如果相應的時間發生, Reactor 將主動呼叫應用程式註冊的介面,這些介面又稱為 回撥函式
- Reactor 模式是處理併發I/O 比較常見的一種模式,用於同步I/O,中心思想是將所有要處理的I/O 事件註冊到一箇中心I/O 多路複用器上,同時主執行緒/程序阻塞在多路複用器上;一旦有I/O 事件到來或是準備就緒(檔案描述符或socket 可讀、寫),多路複用器返回並將事先註冊的相應I/O 事件分發到對應的處理器中。
遇到的問題
1.陣列下標越界
- 解放辦法:使用動態陣列,更好的資料結構(下面會說)
2.socket: Too many open files fd的個數限制
- 解決辦法 ulimit 修改資源限制
ulimit 無法反覆修改,只能設定1次,重啟後失效,可以修改配置檔案
3.Cannot assign requested address 客戶端埠資源不夠
解決辦法
何為連線(針對tcp而言)
- 五元組: (src_ip, src_port, des_ip, des_port, 協議) 確定1個連線
解決辦法1:檢視(修改)資源上限
確認port_range範圍
- sudo sysctl -a | grep port_range
- 或 cat /proc/sys/net/ipv4/ip_local_port_range
32768 60999
修改配置
- 從本機修改核心引數考慮:參考,上圖可知port範圍是60999-32768=28231個(第一次測試程式,客戶端最多分配28231個,與其保持一致)
- sudo vi /etc/sysctl.conf開啟,再新增 net.ipv4.ip_local_port_range = 15000 60999
- 再執行:sudo sysctl -p /etc/sysctl.conf,使生效
- 檢視是否生效:cat /proc/sys/net/ipv4/ip_local_port_range
15000 60999
這樣能用的port範圍:60999-15000 = 45999
解決辦法2:改變五元組
- 從實際可用的併發考慮:當出現"Cannot assign requested address"說明客戶端埠資源不夠,,如果我們有多張網絡卡,可以改變src_ip(此時客戶端作為源),這樣網絡卡的數量可以決定連線的個數,百萬級併發很容易實現;同理,服務端也可以改變其ip或擴大監聽的埠範圍,總之就是改變連線五元組的其中之一。
event_block 資料結構————reactor實現核心
- 觀察下圖,要實現這樣一個具有動態擴容能力的動態陣列,有以下幾個好處:
- 不用一次性malloc一個非常大的連續記憶體,而是用連結串列next指標的方式,記憶體不要求有一整片很大的,避免記憶體碎片衝擊;
- 按照sock_item[1024]的維度去申請和管理記憶體,用多少申請多少;
- 具體的,假設有100w個客戶端連線連線,需要100w個fd,對應100w個sock_item項,是迴圈去申請出來的,free的時候也是一片片去free的;
- sock_item中包括了:fd,rbuf,rlen,wbuf,wlen,events,callback,讀寫解耦,等待訊息與處理訊息(回撥函式實現)分成了兩個階段,便於業務實現。
啥時候需要這種資料結構
- 陣列->"連結串列陣列",block塊,為什麼這裡用這個而不是紅黑樹
- 這種資料結構什麼時候適用:用來做儲存,查詢的時間複雜度,儲存的空間複雜度
- 儲存有序、有規律的資料 key / 取餘 1024等
測試併發(客戶端不斷連線)原始碼
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#define MAX_BUFFER 128
#define MAX_EPOLLSIZE (384*1024)
#define MAX_PORT 1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int isContinue = 0;
static int ntySetNonblock(int fd) {
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return -1;
return 0;
}
static int ntySetReUseAddr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
int main(int argc, char **argv) {
if (argc <= 2) {
printf("Usage: %s ip port\n", argv[0]);
exit(0);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {0};
int i = 0, index = 0;
struct epoll_event events[MAX_EPOLLSIZE];
int epoll_fd = epoll_create(MAX_EPOLLSIZE);
strcpy(buffer, " Data From MulClient\n");
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
while (1) {
if (++index >= MAX_PORT) index = 0;
struct epoll_event ev;
int sockfd = 0;
if (connections < 340000 && !isContinue) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
goto err;
}
//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port+index);
if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("connect");
goto err;
}
ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);
sprintf(buffer, "Hello Server: client --> %d\n", connections);
send(sockfd, buffer, strlen(buffer), 0);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
connections ++;
}
//connections ++;
if (connections % 1000 == 999 || connections >= 340000) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i ++) {
int clientfd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
sprintf(buffer, "data from %d\n", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rBuffer[MAX_BUFFER] = {0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {
printf(" RecvBuffer:%s\n", rBuffer);
if (!strcmp(rBuffer, "quit")) {
isContinue = 0;
}
} else if (length == 0) {
printf(" Disconnect clientfd:%d\n", clientfd);
connections --;
close(clientfd);
} else {
if (errno == EINTR) continue;
printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
} else {
printf(" clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
}
}
usleep(500);
}
return 0;
err:
printf("error : %s\n", strerror(errno));
return 0;
}
服務端原始碼server.c
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <ctype.h>
#include <unistd.h>
#include <stdio.h>
// struct sockaddr_in對應的標頭檔案 <arpa/inet.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <errno.h>
#include <fcntl.h>
#define SERVER_PORT 9999
#define PORT_COUNT 100
#define MAX_EVENTS 128
#define BUFF_SIZE 128
#define ITEMS_LEN 1024
extern int errno;
/*
* 待實現:
* 1.listen_fd和conn_fd的sock_item有沒有區別?
* 2.假色有100w的sock_item,如何快速插入和查詢?
*/
// TODO: listen_fd + conn_fd
typedef struct sock_item {
int fd;
char *r_buf;
int r_len;
char *w_buf;
int w_len;
int event; // fd對應的事件
void (*recv)(int fd, char *buf, int len);
void (*send)(int fd, char *buf, int len);
void (*accept)(int fd);
} sock_item;
/*
* 使用場景:查詢,有序,有規律
* 1.查詢時間複雜度?
* 2.儲存空間複雜度?
*/
typedef struct event_block {
sock_item *items;
struct event_block *next;
} event_block;
// TODO
typedef struct reactor {
int epfd;
int blkcnt;
event_block *ev_blk;
} reactor;
/* new event_block */
int ReactorResize(reactor *r)
{
/* 不能做頭插法,因為是順序增加的 */
if (r == NULL) return -1;
event_block *ev_blk = r->ev_blk;
while (ev_blk != NULL && ev_blk->next != NULL) {
ev_blk = ev_blk->next; // 找到最後一個節點,後面會用尾插法,插入新的節點
}
/******************** 申請 sock_item ********************/
sock_item *item = (sock_item *)malloc(ITEMS_LEN * sizeof(sock_item));
if (item == NULL) return -1;
memset(item, 0, ITEMS_LEN * sizeof(sock_item));
/******************** 申請 event_block ********************/
event_block *new_ev_blk = (event_block *)malloc(sizeof(event_block));
if (new_ev_blk == NULL) {
free(item); // 注意異常情況,前面的記憶體也要回收
return -1;
}
new_ev_blk->items = item;
new_ev_blk->next = NULL;
if (ev_blk == NULL) {
r->ev_blk = new_ev_blk; // 這種情況一般是第一次申請記憶體,頭節點還是NULL
r->ev_blk = new_ev_blk; // 這種情況一般是第一次申請記憶體,頭節點還是NULL
} else {
ev_blk->next = new_ev_blk; // 尾插法
}
r->blkcnt++;
return 0;
}
sock_item* ReactorLookUp(reactor *r, int socket_fd)
{
if (r == NULL) return NULL;
// if (r->ev_blk == NULL) return NULL;
if (socket_fd <= 0) return NULL;
int blk_idx = socket_fd / ITEMS_LEN;
/******************** 呼叫 ReactorResize 每1024個申請一個ev_blk塊 ********************/
while (blk_idx >= r->blkcnt) {
ReactorResize(r);
}
int i = 0;
event_block *blk = r->ev_blk;
while (i++ < blk_idx && blk != NULL) {
blk = blk->next; // ReactorResize種保證blk非NULL
}
return &blk->items[socket_fd % ITEMS_LEN];
}
void SetNonBlocking(int fd)
{
/* 設定fd為non-blocking*/
int flag = fcntl(fd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(fd, F_SETFD, flag);
}
int InitServer(short port)
{
int listen_fd;
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
#if 1
SetNonBlocking(listen_fd);
#endif
bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
listen(listen_fd, 10);
return listen_fd;
}
int IsListenFd(int *listen_fds, int fd)
{
for (int i = 0; i < PORT_COUNT; i++) {
if (listen_fds[i] == fd) {
return 1;
}
}
return 0;
}
int main(void)
{
int conn_fd, client_fd;
int i, n, res;
/******************** epoll init ********************/
reactor *r = (reactor *)calloc(1, sizeof(reactor));
if (r == NULL) return -1;
r->blkcnt = 0;
r->epfd = epoll_create(1); // 紅黑樹頭節點
int n_ready;
struct epoll_event ev, events[MAX_EVENTS];
#if 0
r->items = (sock_item *)calloc(MAX_EVENTS, sizeof(sock_item));
if (r->items == NULL) return -1;
#endif
/******************** 連線五元組 繫結多個port ********************/
int listen_fds[PORT_COUNT];
for (i = 0; i < PORT_COUNT; i++) {
listen_fds[i] = InitServer(SERVER_PORT + i);
ev.data.fd = listen_fds[i];
ev.events = EPOLLIN; // 監聽listen_fd的讀事件
res = epoll_ctl(r->epfd, EPOLL_CTL_ADD, listen_fds[i], &ev);
if (res == -1) return -1;
}
struct sockaddr_in client_addr;
int client_addr_len = sizeof(client_addr);
/******************** 埠複用 ********************/
#if 0
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
#endif
for (i = 0; i < PORT_COUNT; i++) {
}
while (1) {
n_ready = epoll_wait(r->epfd, events, MAX_EVENTS, -1); // 阻塞監聽
for (i = 0; i < n_ready; i++) {
client_fd = events[i].data.fd;
/**************************************** 建立連線階段 ****************************************/
if (IsListenFd(listen_fds, client_fd)) {
conn_fd = accept(client_fd, (struct sockaddr *)&client_addr, &client_addr_len);
#if 1
SetNonBlocking(conn_fd);
#endif
/* 監聽conn_fd的讀事件 */
ev.data.fd = conn_fd;
ev.events = EPOLLIN;
res = epoll_ctl(r->epfd, EPOLL_CTL_ADD, conn_fd, &ev);
if (res == -1) return -1;
#if 0
/* 為conn_fd申請記憶體 */
r->items[conn_fd].fd = conn_fd;
r->items[conn_fd].r_buf = calloc(1, BUFF_SIZE);
r->items[conn_fd].r_len = 0;
r->items[conn_fd].w_buf = calloc(1, BUFF_SIZE);
r->items[conn_fd].w_len = 0;
r->items[conn_fd].event = EPOLLIN;
#else
sock_item *item = ReactorLookUp(r, conn_fd);
if (item == NULL) return -1;
item->fd = conn_fd;
item->r_buf = calloc(1, BUFF_SIZE);
item->r_len = 0;
item->w_buf = calloc(1, BUFF_SIZE);
item->w_len = 0;
#endif
} else {
/**************************************** 資料傳輸階段 ****************************************/
if (events[i].events & EPOLLIN) {
sock_item *item = ReactorLookUp(r, client_fd);
if (item == NULL) return -1;
char *r_buf = item->r_buf;
char *w_buf = item->w_buf;
memset(r_buf, 0, BUFF_SIZE);
memset(w_buf, 0, BUFF_SIZE);
n = recv(client_fd, r_buf, BUFF_SIZE, 0);
if (n == 0) {
// 客戶端斷開了連線
free(r_buf);
free(w_buf);
// epoll_ctl(r->epfd, EPOLL_CTL_DEL, client_fd, NULL);
close(client_fd);
} else if (n > 0) {
printf("收到了客戶端發來的資料, n=%d, buf=%s\n", n, r_buf);
memcpy(w_buf, r_buf, BUFF_SIZE);
/* 讀事件->寫事件 */
ev.data.fd = client_fd;
ev.events = EPOLLOUT;
res = epoll_ctl(r->epfd, EPOLL_CTL_MOD, client_fd, &ev);
} else {
return -1;
}
} else if (events[i].events & EPOLLOUT) {
sock_item *item = ReactorLookUp(r, client_fd);
if (item == NULL) return -1;
char *w_buf = item->w_buf;
n = send(client_fd, w_buf, BUFF_SIZE, 0);
printf("給客戶端回的資料..., n=%d, buf=%s\n", n, w_buf);
/* 讀事件->寫事件 */
ev.data.fd = client_fd;
ev.events = EPOLLIN;
res = epoll_ctl(r->epfd, EPOLL_CTL_MOD, client_fd, &ev);
}
}
}
}
return 0;
}