利用epoll和多程序解決高併發問題
阿新 • • 發佈:2019-02-06
1、服務端程式碼,開啟8個工作程序
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <signal.h> #include <sys/epoll.h> #include <unistd.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <fcntl.h> #include <sys/mman.h> #include <pthread.h> #include <errno.h> #include <unistd.h> #define WORKER_MAX 1024 #define EVENT_LIST_MAX 128 #define EVENT_MAX 12 #define WORK_REAL 8 #define SERVER_PORT 8080 static int workers[WORKER_MAX]; static int icEpollFd = -1; static int cur_pid; typedef int (*PFCALLBACL)(struct epoll_event *); typedef struct EPOLL_DATA_S { int iEpoll_Fd; int iEvent_Fd; PFCALLBACL pfCallBack; }Epoll_Data_S; /* 互斥量 */ static pthread_mutex_t *mutex; /* 建立共享的mutex */ static void initMutex(void) { pthread_mutexattr_t attr; int ret; //設定互斥量為程序間共享 mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0); if( MAP_FAILED==mutex) { perror("mutex mmap failed"); return; } //設定attr的屬性 pthread_mutexattr_init(&attr); ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED); if(ret != 0) { fprintf(stderr, "mutex set shared failed"); return; } pthread_mutex_init(mutex, &attr); return; } static int startup(unsigned short port) { struct sockaddr_in servAddr; unsigned value = 1; int listenFd; memset(&servAddr, 0, sizeof(servAddr)); //協議域(ip地址和埠) servAddr.sin_family = AF_INET; //繫結預設網絡卡 servAddr.sin_addr.s_addr = htonl(INADDR_ANY); //埠 servAddr.sin_port = htons(port); //建立套接字 if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)); //繫結套接字 if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) { printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } //開始監聽,設定最大連線請求 if (listen(listenFd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } return listenFd; } static int create_workers(unsigned int worker_num) { unsigned int i; unsigned int real_num = worker_num; int pid; if (real_num > WORKER_MAX) { real_num = WORKER_MAX; } for (i = 0; i < real_num; i++) { pid = fork(); if (0 == pid) { return 0; } else if (0 < pid) { workers[i] = pid; continue; } else { printf("fork error\r\n"); return 0; } } return 1; } /* 建立epoll */ static int create_epoll(unsigned int event_num) { int epoll_fd; epoll_fd = epoll_create(event_num); if (-1 == epoll_fd) { printf("epoll create failed\r\n"); return -1; } return epoll_fd; } /* 將事件加到epoll */ static void add_event_epoll(int iEpoll_Fd, int iEvent_Fd, PFCALLBACL pfCallBack) { int op = EPOLL_CTL_ADD; struct epoll_event ee; Epoll_Data_S *data; data = malloc(sizeof(Epoll_Data_S)); if (NULL == data) { return; } /* 設定私有資料 */ data->iEpoll_Fd = iEpoll_Fd; data->iEvent_Fd = iEvent_Fd; data->pfCallBack = pfCallBack; ee.events = EPOLLIN | EPOLLOUT | EPOLLHUP; //ee.events = EPOLLIN|EPOLLET; ee.data.ptr = (void *)data; if (epoll_ctl(iEpoll_Fd, op, iEvent_Fd, &ee) == -1) { printf("epoll_ctl(%d, %d) failed", op, iEvent_Fd); return; } return; } /* 從epoll刪除事件 */ static void del_event_epoll(int iEpoll_Fd, int iEvent_Fd) { int op = EPOLL_CTL_DEL; if (epoll_ctl(iEpoll_Fd, op, iEvent_Fd, NULL) == -1) { printf("epoll_ctl(%d, %d) failed", op, iEvent_Fd); } return; } static int make_socket_non_blocking(int fd) { int flags, s; flags = fcntl (fd, F_GETFL, 0); if (flags == -1) { perror ("fcntl"); return -1; } flags |= O_NONBLOCK; s = fcntl (fd, F_SETFL, flags); if (s == -1) { perror ("fcntl"); return -1; } return 0; } /* 處理Receive事件 */ static int proc_receive(struct epoll_event *pstEvent) { char buff[4096]; /* 快取 */ int len; Epoll_Data_S *data = (Epoll_Data_S *)(pstEvent->data.ptr); int iEpoll_Fd = data->iEpoll_Fd; int iEvent_Fd = data->iEvent_Fd; if (pstEvent->events & EPOLLIN) { while(1) { /* 讀取資料 */ len = (int)recv(iEvent_Fd, buff, sizeof(buff), 0); //printf("proc_receive iEvent_Fd = %d pid = %d len = %d\r\n", iEvent_Fd, getpid(), len); if (len <= 0) { if (errno == EINTR) { continue; } del_event_epoll(iEpoll_Fd, iEvent_Fd); close(iEvent_Fd); free(data); } else if (len > 0) { buff[len] = '\0'; printf("pid %d receive data: %s\r\n", cur_pid, buff); //usleep(10000); } break; } } else if (pstEvent->events & EPOLLHUP) { printf("receive EPOLLHUP or EPOLLOUT\r\n"); del_event_epoll(iEpoll_Fd, iEvent_Fd); close(iEvent_Fd); free(data); } else { // printf("receive others pstEvent->events=%d\r\n", pstEvent->events); } return 0; } /* 處理Accept事件 */ static int proc_accept(struct epoll_event *pstEvent) { int newFd; Epoll_Data_S *data = (Epoll_Data_S *)(pstEvent->data.ptr); int iEpoll_Fd = data->iEpoll_Fd; int iEvent_Fd = data->iEvent_Fd; if (pthread_mutex_trylock(mutex)==0) { while(-1 != (newFd = accept(iEvent_Fd, (struct sockaddr *)NULL, NULL))) { make_socket_non_blocking(newFd); //printf("accept pid = %d\r\n", getpid()); add_event_epoll(icEpollFd, newFd, proc_receive); } pthread_mutex_unlock(mutex); } return 0; } static void handleterm(int sig) { int i; for (i = 0; i < WORK_REAL; i++) { /* 殺掉子程序 */ kill(workers[i], SIGTERM); } return; } static void proc_epoll(int iEpollFd, int timeout) { int iEventNum; int i; struct epoll_event events[EVENT_LIST_MAX]; iEventNum = epoll_wait(iEpollFd, events, EVENT_LIST_MAX, timeout); for (i = 0; i < iEventNum; i++) { Epoll_Data_S *data = (Epoll_Data_S *)(events[i].data.ptr); data->pfCallBack(&(events[i])); } return; } int main() { int iServerFd = -1; int bParent; int iEpollFd = -1; /* 初始化互斥量 */ initMutex(); /* 初始化,建立監聽埠 */ iServerFd = startup(SERVER_PORT); if (-1 == iServerFd) { return 0; } make_socket_non_blocking(iServerFd); /* 父程序建立epoll */ iEpollFd = create_epoll(EVENT_MAX); if (-1 == iEpollFd) { close(iServerFd); return 0; } /* 將監聽埠加到epoll */ add_event_epoll(iEpollFd, iServerFd, proc_accept); /* 建立子程序 */ bParent = create_workers(WORK_REAL); /* 主程序 */ if (bParent) { while(1) { signal(SIGTERM, handleterm); pause(); } } else { /* 子程序建立epoll */ icEpollFd = create_epoll(EVENT_MAX); if (-1 == icEpollFd) { close(iServerFd); return 0; } cur_pid = getpid(); while (1) { /* 處理父epoll訊息 */ proc_epoll(iEpollFd, 50); /* 處理子epoll訊息 */ proc_epoll(icEpollFd, 50); } } return 0; }
2、客戶端程式碼,發起高併發連線,暫定連線數為2000,可以自己調
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> #include<arpa/inet.h> #include <fcntl.h> #include <errno.h> const int MAXLINE = 5; int count = 1; static int make_socket_non_blocking(int fd) { int flags, s; flags = fcntl (fd, F_GETFL, 0); if (flags == -1) { perror ("fcntl"); return -1; } flags |= O_NONBLOCK; s = fcntl (fd, F_SETFL, flags); if (s == -1) { perror ("fcntl"); return -1; } return 0; } void sockconn() { int sockfd; struct sockaddr_in server_addr; struct hostent *host; char buf[100]; unsigned int value = 1; host = gethostbyname("127.0.0.1"); sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { perror("socket error\r\n"); return; } //setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)); //make_socket_non_blocking(sockfd); bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(8080); server_addr.sin_addr = *((struct in_addr*) host->h_addr); int cn = connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)); if (cn == -1) { printf("connect error errno=%d\r\n", errno); return; } // char *buf = "h"; sprintf(buf, "%d", count); count++; write(sockfd, buf, strlen(buf)); close(sockfd); printf("client send %s\r\n", buf); return; } int main(void) { int i; for (i = 0; i < 2000; i++) { sockconn(); } return 0; }
3、測試結果
1秒多時間內,處理了2000個連線
4、需要注意的問題連線太多時,open files too many問題
localhost:/share/code # ulimit -a core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 7900 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) 865300 open files (-n) 1024 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 7900 virtual memory (kbytes, -v) 2491600 file locks (-x) unlimited localhost:/share/code #
解決方法:
localhost:/share/code # ulimit -n 100000
localhost:/share/code # ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 7900
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) 865300
open files (-n) 100000
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 7900
virtual memory (kbytes, -v) 2491600
file locks (-x) unlimited
localhost:/share/code #
2)客戶端connect錯誤
原因:處於TIME-WAIT狀態的socket太多,socket不夠用
解決:減少TIME-WAIT時間
localhost:/share/code # echo "1" > /proc/sys/net/ipv4/tcp_tw_recycle
localhost:/share/code # echo "1" > /proc/sys/net/ipv4/tcp_tw_reuse
localhost:/share/code #
5、參考文章