多執行緒網路處理伺服器demo
阿新 • • 發佈:2019-02-03
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/wait.h> #include <unistd.h> #include <arpa/inet.h> //#include <openssl/ssl.h> //#include <openssl/err.h> #include <fcntl.h> #include <sys/epoll.h> #include <sys/time.h> #include <sys/resource.h> #include <pthread.h> #include <assert.h> //#define DEBUG_TILERA #ifdef DEBUG_TILERA #include <tmc/alloc.h> #include <arch/cycle.h> #include <arch/spr.h> #include <tmc/cpus.h> #include <tmc/sync.h> #include <tmc/task.h> #endif /* These are non-NULL pointers that will result in page faults * under normal circumstances, used to verify that nobody uses * non-initialized list entries. */ #define MAXBUF 1024 #define MAXEPOLLSIZE 500000 #define MAX_THREAD_NUMBER 200 int THREAD_NUMBER = 50; int kdpfd; struct epoll_event events[MAXEPOLLSIZE]; struct epoll_event thread_events[MAX_THREAD_NUMBER][MAXEPOLLSIZE]; int fdpool[MAX_THREAD_NUMBER] = {-1}; pthread_t handle_receive_thrdid[MAX_THREAD_NUMBER]; int msgcount = 0; int timecount = 0; int count_packet= 0; pthread_mutex_t connet_count_lock = PTHREAD_MUTEX_INITIALIZER; int connect_count = 0; pthread_mutex_t curfds_lock; int curfds; char buffer[MAX_THREAD_NUMBER][MAXBUF + 1]; pthread_t thread_count; cpu_set_t cpus; void BubbleSort(unsigned char R[],int n) { int i,j; unsigned char temp; for (i=0; i<n-1; i++ ) { for (j=n-2; j>=i; j--) { if (R[j]>R[j+1]) { temp=R[j]; R[j]=R[j+1]; R[j+1]=temp; } } } } /* setnonblocking - 設定控制代碼為非阻塞方式 */ int setnonblocking(int sockfd) { if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) { return -1; } return 0; } /* handle_message - 處理每個 socket 上的訊息收發 */ static void *handle_count(void* arg) { int precount, speed; while(1) { precount = msgcount; sleep(5); timecount += 5; //printf("The tcp connection count is %d\n",count_tcp); //printf("The received packets count is %d, time %d\n",msgcount, timecount); speed = msgcount - precount ; printf("The received speed is %d/5seconds, connect %d, tatol packets %d\n",speed, connect_count, msgcount); } return NULL; } static void * handl_receive_msg(void * arg) { int fdind = 0; int nfds = 0; int len; struct epoll_event ev; int fdi; char* buf ; fdind = (int)arg; buf = (char*)&buffer[fdind]; // printf("fd... %d \n", fdind); while(1) { nfds = epoll_wait(fdpool[fdind], &thread_events[fdind][0], MAXEPOLLSIZE, -1); if (nfds == -1) { perror("epoll_wait"); break; } for( fdi = 0; fdi < nfds; fdi++) { if((thread_events[fdind][fdi].events & EPOLLIN) /*&&(!(thread_events[fdind][fdi].events & EPOLLRDHUP))*/) { while((-1 != (len = recv(thread_events[fdind][fdi].data.fd, buf, MAXBUF, 0))) ||((-1 == len) && (EAGAIN != errno))) { //perror("recv error "); //printf("recv error %d fd %d\n", errno, thread_events[fdind][fdi].data.fd); //goto next; if (len > 0) { /*printf ("%d receive message success total %d bytes data msgcount %d\n", new_fd, len, msgcount);*/ msgcount++; BubbleSort(buf ,len); } else if(len == 0) { //printf("the socket %d is closed \n", new_fd); epoll_ctl(fdpool[fdind], EPOLL_CTL_DEL, thread_events[fdind][fdi].data.fd,&ev); close(thread_events[fdind][fdi].data.fd); pthread_mutex_lock(&connet_count_lock); connect_count--; pthread_mutex_unlock (&connet_count_lock); break; } else { printf(" socket %d receive message fail error code: %d, error message: '%s'\n", thread_events[fdind][fdi].events, errno, strerror(errno)); epoll_ctl(fdpool[fdind], EPOLL_CTL_DEL, thread_events[fdind][fdi].data.fd,&ev); close(thread_events[fdind][fdi].data.fd); pthread_mutex_lock(&connet_count_lock); connect_count--; pthread_mutex_unlock (&connet_count_lock); //pthread_mutex_lock (&(curfds_lock)); //curfds--; //pthread_mutex_unlock (&(curfds_lock)); break; } } } /* else if((thread_events[fdind][fdi].events & EPOLLRDHUP)) { epoll_ctl(fdpool[fdind], EPOLL_CTL_DEL, thread_events[fdind][fdi].data.fd,&ev); close(thread_events[fdind][fdi].data.fd); pthread_mutex_lock(&connet_count_lock); connect_count--; pthread_mutex_unlock (&connet_count_lock); printf("event.... %x \n", thread_events[fdind][fdi].events); }*/ else { printf("other event %u\n",thread_events[fdind][fdi].events ); } } } return NULL; } int fd_index = 0; int main(int argc, char **argv) { int listener, new_fd, nfds, n, ret; socklen_t len; struct sockaddr_in my_addr, their_addr; unsigned int myport, lisnum; struct epoll_event ev; struct rlimit rt; int fdind; int ind ; if(5 != argc) { printf("Usage: %s <thread_number(0 ~ 200)> <port(0-65535)> <listen queue number> <IP Address> \n", argv[0]); exit(1); } if(argv[1]) THREAD_NUMBER = atoi(argv[1]); if (argv[2]) myport = atoi(argv[2]); else myport = 7838; if (argv[3]) lisnum = atoi(argv[3]); else lisnum = 2; #ifdef DEBUG_TILERA if (tmc_cpus_get_my_affinity(&cpus) != 0) { printf("tmc_cpus_get_my_affinity() failed.\n"); tmc_task_die("tmc_cpus_get_my_affinity() failed."); } if (tmc_cpus_count(&cpus) < MAX_THREAD) { printf("\nInsufficient cpus available.\n"); tmc_task_die("Insufficient cpus available."); } #endif pthread_mutex_init (&connet_count_lock, NULL); pthread_mutex_init (&(curfds_lock), NULL); for( ind = 0; ind < THREAD_NUMBER; ind++ ) { fdpool[ind] = epoll_create(MAXEPOLLSIZE); } for( ind = 0; ind < THREAD_NUMBER; ind++) { pthread_create(&handle_receive_thrdid[ind], NULL, &handl_receive_msg, (void*)ind); } if (pthread_create(&thread_count, NULL, &handle_count, NULL) != 0) { #ifdef DEBUG_TILERA tmc_task_die("pthread_create() failed."); #endif } /* 設定每個程序允許開啟的最大檔案數 */ rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE; if (setrlimit(RLIMIT_NOFILE, &rt) == -1) { perror("setrlimit"); exit(1); } else printf("set the system resource success!\n"); /* 開啟 socket 監聽 */ if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); exit(1); } else printf("socket create success!n"); setnonblocking(listener); bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = PF_INET; my_addr.sin_port = htons(myport); if (argv[4]) my_addr.sin_addr.s_addr = inet_addr(argv[4]); else my_addr.sin_addr.s_addr = INADDR_ANY; if (bind (listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } else printf("IP address and port bing success!\n"); if (listen(listener, lisnum) == -1) { perror("listen"); exit(1); } else printf("start to work!\n"); /* 建立 epoll 控制代碼,把監聽 socket 加入到 epoll 集合裡 */ kdpfd = epoll_create(MAXEPOLLSIZE); len = sizeof(struct sockaddr_in); ev.events = EPOLLIN; ev.data.fd = listener; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%d\n", listener); return -1; } else printf("listen socket add to epoll success\n"); curfds = 1; while (1) { /* 等待有事件發生 */ nfds = epoll_wait(kdpfd, events, MAXEPOLLSIZE, -1); if (nfds == -1) { perror("epoll_wait"); break; } /* 處理所有事件 */ for (n = 0; n < nfds; ++n) { // printf("The number of fd %d \n", nfds); if (events[n].data.fd == listener) { new_fd = accept(listener, (struct sockaddr *) &their_addr, &len); if (new_fd < 0) { perror("accept"); continue; } //else // printf("connect from %x:%x, allocate socket for %x\n", inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), new_fd); pthread_mutex_lock(&connet_count_lock); connect_count++; pthread_mutex_unlock (&connet_count_lock); setnonblocking(new_fd); ev.events = EPOLLIN | EPOLLET; ev.data.fd = new_fd; fdind = fd_index % THREAD_NUMBER; if (epoll_ctl(fdpool[fdind], EPOLL_CTL_ADD, new_fd, &ev) < 0) { fprintf(stderr, "add socket '%d' to epoll fail %s\n", new_fd, strerror(errno)); // pool_destroy (); return -1; } fd_index++; //pthread_mutex_lock (&(curfds_lock)); // curfds++; // pthread_mutex_unlock (&(curfds_lock)); } else { printf("other event \n"); } } } close(listener); // pool_destroy (); return 0; }