伺服器開發——Reactor模式
阿新 • • 發佈:2018-12-18
博主的程式碼是C++寫的,本人改成C語言的。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> //for htonl() and htons() #include <unistd.h> #include <fcntl.h> #include <sys/epoll.h> #include <signal.h> //for signal() #include <pthread.h> #include <errno.h> #include <time.h> #include <stdbool.h> // for bool #include <sys/stat.h> #include <sys/syscall.h> //for gettid #define gettid() syscall(__NR_gettid) #define WORKER_THREAD_NUM 5 #define min(a, b) ((a <= b) ? (a) : (b)) int g_epollfd = 0; bool g_bStop = false; int g_listenfd = 0; pthread_t g_acceptthreadid = 0; pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 }; pthread_cond_t g_acceptcond; pthread_mutex_t g_acceptmutex; pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/; pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/; pthread_mutex_t g_clientmutex; typedef struct g_listClients_ { int fd; struct g_listClients_ *next; }g_listClients; static g_listClients *head = NULL; void prog_exit(int signo) { signal(SIGINT, SIG_IGN);//SIG_IGN 忽略訊號的處理程式 //signal(SIGKILL, SIG_IGN);//該訊號不能被阻塞、處理或者忽略 signal(SIGTERM, SIG_IGN); printf("program recv signal %d to exit\n" ,signo); g_bStop = true; //將g_listenfd 從g_epollfd 事件集合刪除 epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL); //shutdown 可以立即關閉連線,SHUT_RDWR表示同時關閉讀和寫 //但是並不會關閉檔案描述符,關閉描述符還是需要close函式 //防止dup或者fork系統呼叫將fd計數增加情況 shutdown(g_listenfd, SHUT_RDWR); //close系統呼叫並非立即關閉一個連線,而是將fd的引用計數減1 //只有當fd的計數引用為0時才真正關閉fd,讀和寫全部關閉 close(g_listenfd); close(g_epollfd); pthread_cond_destroy(&g_acceptcond); pthread_mutex_destroy(&g_acceptmutex); pthread_cond_destroy(&g_cond); pthread_mutex_destroy(&g_mutex); pthread_mutex_destroy(&g_clientmutex); if(head != NULL) { free(head); head = NULL; } printf("prog_exit !\n"); } bool create_server_listener(const char* ip, short port) { //socket函式第一個引數也經常看到設定為PF_INET, //在Unix/Linux系統中,在不同的版本中這兩者有微小差別. //對於BSD,是AF,對於POSIX是PF.其實值是一樣的影響不大 g_listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (g_listenfd == -1) return false; //SO_REUSEADDR和SO_REUSEPORT兩個標誌是設定地址和埠複用, //如果不設定當我們重啟程式時,如果有和客戶端通訊,然後會經過四次揮手進入TIME_WAIT狀態 //會有2min的MSL,在這2min中埠不能使用 int on = 1; setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on)); struct sockaddr_in servaddr; bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = inet_addr(ip); servaddr.sin_port = htons(port); if (bind(g_listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) return false; //自核心版本2.2之後,listen第二個引數表示全連線的個數 // proc/sys/net/ipv4/tcp_max_syn_backlog 中定義了半連線的個數 if (listen(g_listenfd, 50) == -1) return false; //epoll_create 引數用來告訴核心這個監聽的數目一共有多大(2.6.27版本核心之後該引數忽略不用) //不同於select第一個引數(最大監聽的fd+1) g_epollfd = epoll_create(10); if (g_epollfd == -1) return false; struct epoll_event e; memset(&e, 0, sizeof(e)); //events 表示epoll事件,EPOLLIN 可讀事件 EPOLLRDHUP 套接字對端關閉 e.events = EPOLLIN | EPOLLRDHUP; e.data.fd = g_listenfd; //往g_epollfd事件表中註冊g_listenfd上的可讀和關閉事件 if (epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1) { return false; } return true; } void release_client(int clientfd) { if (epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1) printf("release client socket failed as call epoll_ctl failed \n"); close(clientfd); } void* accept_thread_func(void* arg) { while (!g_bStop) { //在pthread_cond_wait之前一定要加鎖 //pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx //然後阻塞在等待對列裡休眠,直到再次被喚醒(大多數情況下是等待的條件成立而被喚醒,喚醒後 //該程序會先鎖定pthread_mutex_lock(&mtx);,再讀取資源 pthread_mutex_lock(&g_acceptmutex); pthread_cond_wait(&g_acceptcond, &g_acceptmutex); //有可能會存在虛假喚醒,要注意 struct sockaddr_in clientaddr; socklen_t addrlen; int newfd = accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen); pthread_mutex_unlock(&g_acceptmutex); if (newfd == -1) continue; printf("new client connected: [%s] : [%d]\n", inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port)); //將新socket設定為non-blocking //F_GETFL 獲取許可權 F_SETFL設定許可權 int oldflag = fcntl(newfd, F_GETFL, 0); int newflag = oldflag | O_NONBLOCK; if (fcntl(newfd, F_SETFL, newflag) == -1) { printf("fcntl error, oldflag = [%d] , newflag = [%d]\n", oldflag ,newflag); continue; } struct epoll_event e; memset(&e, 0, sizeof(e)); //設定可讀事件 關閉連線事件 和邊緣觸發模式 //預設模式是水平觸發即當被監控的檔案描述符上有可讀寫事件發生時 //epoll_wait()會通知處理程式去讀寫。如果這次沒有把資料一次性全部讀寫完(如讀寫緩衝區太小 //那麼下次呼叫 epoll_wait()時,它還會通知你在上沒讀寫完的檔案描述符上繼續讀寫. //邊緣觸發即當被監控的檔案描述符上有可讀寫事件發生時,epoll_wait()會通知處理程式去讀寫 //如果這次沒有把資料全部讀寫完(如讀寫緩衝區太小),那麼下次呼叫epoll_wait()時, //它不會通知你,也就是它只會通知你一次,直到該檔案描述符上出現第二次可讀寫事件才會通知你 e.events = EPOLLIN | EPOLLRDHUP | EPOLLET; e.data.fd = newfd; if (epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1) { printf("epoll_ctl error, fd = %d \n", newfd ); } } return NULL; } void* worker_thread_func(void* arg) { while (!g_bStop) { int clientfd; pthread_mutex_lock(&g_clientmutex); while (head == NULL) //while迴圈是防止意外喚醒 pthread_cond_wait(&g_cond, &g_clientmutex); clientfd = head->fd; head = head->next; pthread_mutex_unlock(&g_clientmutex); //gettid 獲取的是核心中執行緒ID,而pthread_self 是posix描述的執行緒ID printf("in pthread_self = [%ld] gettid= [%ld]\n", pthread_self(), (long)gettid()); char buff[256]; char sendbuf[1024]; int sendlen = 0; bool bError = false; int nRecv = 0, count = 0; memset(buff, 0, sizeof(buff)); do { nRecv = recv(clientfd, buff + count, 256 - count, 0); if (nRecv == -1) { if (errno == EWOULDBLOCK) //EWOULDBLOCK 用於非阻塞模式,不需要重新讀或者寫 ,EINTR:指操作被中斷喚醒,需要重新讀/寫 break; else { printf("recv error, client disconnected, fd = [%d]\n", clientfd); release_client(clientfd); bError = true; break; } } //對端關閉了socket,這端也關閉。 else if (nRecv == 0) { printf("peer closed, client disconnected, fd = %d \n" ,clientfd); release_client(clientfd); bError = true; break; } count = count + nRecv; }while(nRecv > 0 ); //出錯了,就不要再繼續往下執行了 if (bError) continue; printf("client msg:[%d] %s\n", count, buff); //將訊息加上時間標籤後發回 time_t now = time(NULL); struct tm* nowstr = localtime(&now); memset(sendbuf, 0, sizeof(sendbuf)); sprintf(sendbuf, "%04d-%02d-%02d %02d:%02d:%02d ", nowstr->tm_year + 1900, nowstr->tm_mon + 1, nowstr->tm_mday, nowstr->tm_hour, nowstr->tm_min, nowstr->tm_sec); strcat(sendbuf + strlen(sendbuf), "server reply:"); strcat(sendbuf, buff); sendlen = strlen(sendbuf); int sendcount = 0; int nSent = 0; do { nSent = send(clientfd , sendbuf + sendcount, sendlen - sendcount, 0); if (nSent == -1) { if (errno == EWOULDBLOCK) { sleep(10); continue; } else { printf("send error, fd = [%d] \n", clientfd); release_client(clientfd); break; } } sendcount += nSent; }while(sendcount < sendlen); printf("send [%d]:%s\n", sendcount, sendbuf); } return NULL; } bool daemon_run() { int pid; signal(SIGCHLD, SIG_IGN); //建立一個守護程序要遵循以下步驟 // 1、fork建立子程序 pid = fork(); if (pid < 0) { printf("fork error \n"); exit(-1); } //父程序退出,子程序獨立執行 else if (pid > 0) { exit(0); } //之前parent和child執行在同一個session裡,parent是會話(session)的領頭程序, //parent程序作為會話的領頭程序,如果exit結束執行的話,那麼子程序會成為孤兒程序,並被init收養。 //2、umask設定檔案許可權掩碼 //設定0表示當前程序建立檔案或者目錄的最大操作許可權為(~0) & mode 即0777&mode, mode為系統呼叫open的第三個引數 umask(0); //3、setsid 建立新回話,設定本程序為程序組的首領 //執行setsid()之後,child將重新獲得一個新的會話(session)id。 //這時parent退出之後,將不會影響到child了。 pid_t sid = setsid(); if(sid < 0) { return false; } //4、切換工作目錄 if((chdir("/")) < 0) { return false; } //5、關閉標準輸入裝置、標準輸出裝置和標準錯誤輸出裝置並重定向到/dev/null檔案 int i; for( i = 0; i < 3; ++i) { close(i); open("/dev/null", O_RDWR); dup(0); dup(0); } return true; //上述功能也可以通過庫函式daemon(0, 0)完成 } int main(int argc, char* argv[]) { short port = 0; int ch; bool bdaemon = false; while ((ch = getopt(argc, argv, "p:d")) != -1) { switch (ch) { case 'd': bdaemon = true; break; case 'p': port = atol(optarg); break; } } if (bdaemon) if(!daemon_run()) { printf("daemon run error!\n"); return -1; } if (port == 0) port = 12345; if (!create_server_listener("0.0.0.0", port)) { printf("Unable to create listen server: ip=0.0.0.0, port= [%d] \n", port); return -1; } //設定訊號處理 signal(SIGCHLD, SIG_DFL); signal(SIGPIPE, SIG_IGN); signal(SIGINT, prog_exit); //signal(SIGKILL, prog_exit);//該訊號不能被阻塞、處理或者忽略 signal(SIGTERM, prog_exit); //pthread_cond_init 和直接賦值為PTHREAD_COND_INITIALIZER效果一樣 pthread_cond_init(&g_acceptcond, NULL); //pthread_mutex_init 和直接賦值為PTHREAD_MUTEX_INITIALIZER效果一樣 pthread_mutex_init(&g_acceptmutex, NULL); pthread_cond_init(&g_cond, NULL); pthread_mutex_init(&g_mutex, NULL); pthread_mutex_init(&g_clientmutex, NULL); pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL); //啟動工作執行緒 for (int i = 0; i < WORKER_THREAD_NUM; ++i) { pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL); printf("creat thread [%ld]\n", g_threadid[i]); } while (!g_bStop) { struct epoll_event ev[1024]; //1024為最大監聽描述符的個數,返回值為就緒的描述符的個數,就緒的檔案描述符存在ev陣列中 int n = epoll_wait(g_epollfd, ev, 1024, 10); if (n == 0)//沒有就緒事件 continue; else if (n < 0)//出錯 { printf("epoll_wait error \n"); continue; } for (int i = 0; i < n; ++i) { //通知接收連線執行緒接收新連線 if (ev[i].data.fd == g_listenfd) //pthread_cond_signal不會有“驚群現象”產生,他最多隻給一個執行緒發訊號 //假如有多個執行緒正在阻塞等待著這個條件變數的話,那麼是根據各等待執行緒優先順序的高低確定哪個執行緒接收到訊號開始繼續執行 //如果各執行緒優先順序相同,則根據等待時間的長短來確定哪個執行緒獲得訊號 pthread_cond_signal(&g_acceptcond); //通知普通工作執行緒接收資料 else { g_listClients new_node; pthread_mutex_lock(&g_clientmutex); //g_listClients.push_back(ev[i].data.fd); if(head == NULL) //頭插入連結串列可能會導致先插入的fd不能及時處理 { head = (g_listClients *)malloc(sizeof(g_listClients)); if(head == NULL) { printf("malloc error!\n"); pthread_mutex_unlock(&g_clientmutex); break; } head->fd = ev[i].data.fd; head->next = NULL; } else { new_node.fd = ev[i].data.fd; new_node.next = head; head = &new_node; } pthread_mutex_unlock(&g_clientmutex); pthread_cond_signal(&g_cond); } } } free(head); head = NULL; return 0; }