1. 程式人生 > >multi-threaded server, pthreads, sleep

multi-threaded server, pthreads, sleep

3 0 multi-threaded server, pthreads, sleep

multi-threaded server, pthreads, sleep


I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle 
requests of multiple clients.

But here I have a problem. I find a solution but it is not how 
it must be... i think. When threads working without sleep(1) 
I can't get response from server but when I put sleep(1) in 
thread function as you will see in code, everything works fine 
and server can make echo nearly for 40000 clients between 
1 or 2 seconds. What am I doing wrong here? 

thanks,
parahat


------------ server.c -------------
For Linux:
$ cc server.c -o server -lpthread 

For FreeBSD:
$ cc server.c -o server -pthread
-----------------------------------

Code:

#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
	pthread_t tid;
	int client_count;
	int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}

void *thread_init_func(void *arg)
{
	int tid = (int) arg;
	
	int readsocks;
	int i;
	char buffer[MAX_CLIENT_BUFFER];
	char c;
	int n;
#ifdef DEBUG
	printf("thread %d created\n", tid);
	printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
	memset((int *) &threads[tid].clients, 0, sizeof(threads[tid].clients));
	memset((char *) &buffer, 0, sizeof(buffer));	
	while(1)
	{
#ifdef DEBUG
		printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
		sleep(3);
#endif
		sleep(1); /* <-- it works ??? :-| */

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[tid].clients[i] != 0)
			{
				n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);
				if(n == 0)
				{
#ifdef DEBUG
					printf("client %d closed connection 0\n", threads[tid].clients[i]);
#endif
					threads[tid].clients[i] = 0;
					threads[tid].client_count--;
					memset((char *) &buffer, 0, strlen(buffer));
				}
				else if(n < 0)
				{
					if(errno == EAGAIN)
					{
#ifdef DEBUG
						printf("errno: EAGAIN\n");
#endif
					}
					else {
#ifdef DEBUG
						printf("errno: %d\n", errno);
#endif
						threads[tid].clients[i] = 0;
						threads[tid].client_count--;
						memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
						printf("client %d closed connection -1\n", threads[tid].clients[i]);
#endif
					}
				}
				else {
#ifdef DEBUG
					printf("%d bytes received from %d - %s\n", n, threads[tid].clients[i], buffer);
#endif
					
					send(threads[tid].clients[i], buffer, strlen(buffer), 0);
					memset((char *) &buffer, 0, strlen(buffer));
				}
			}
		}
	}
}

int choose_thread()
{
	int i=MAX_THREAD-1;
	int min = 0;
	while(i > -1)
	{
		if(threads[i].client_count < threads[i-1].client_count)
		{
			min = i;
			break;
		}
		i--;
	}		
	return min;
}

int main()
{
	char c;
	struct sockaddr_in srv, cli;
	int clifd;
	int tid;
	int i;
	int choosen;

	signal(SIGPIPE, SIG_IGN);
	
	if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		perror("sockfd\n");
		exit(1);
	}
	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_addr.s_addr = INADDR_ANY;
	srv.sin_port = htons(PORT);

	if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		perror("bind\n");
		exit(1);
	}
	
	listen(listenfd, 1024);

	
	/* create threads  */
	for(i = 0; i < MAX_THREAD; i++)
	{
		pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *) i);
		threads[i].client_count = 0;
	}
	

	for( ; ; )
	{
		clifd = accept(listenfd, NULL, NULL);
		nonblock(clifd);

		pthread_mutex_lock(&new_connection_mutex);
		
		choosen = choose_thread();

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[choosen].clients[i] == 0)
			{
#ifdef DEBUG
				printf("before threads clifd\n");
#endif
				threads[choosen].clients[i] = clifd;
#ifdef DEBUG
				printf("after threads clifd\n");
#endif
				threads[choosen].client_count++;
				break;
			}
		}

#ifdef DEBUG
		printf("choosen: %d\n", choosen);

		for(i = 0; i < MAX_THREAD; i++)
		{
			printf("threads[%d].client_count:%d\n", i, threads[i].client_count);
		}
#endif

		pthread_mutex_unlock(&new_connection_mutex);
	}

	if(errno)
	{
		printf("errno: %d", errno);
	}
	
	return 0;
}



===========================================================

--------------- test.c -------------------- 
$ cc test.c -o test
-------------------------------------------

Code:

#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>


#define MAX_CLIENT 1000 
/*#define DEBUG*/

int PORT = 3355;
char *IP = "192.168.222.22";

enum {
	U_INT_SIZE = sizeof(unsigned int)
};

int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}


void cli_con()
{	
	struct sockaddr_in srv;
	struct hostent *h_name;
	int clifd;
	int n;

	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_port = htons(PORT);
	inet_pton(AF_INET, IP, &srv.sin_addr);	
	
	if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		printf("ERROR clifd\n");
		exit(1);
	}

	if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		printf("ERROR connect\n");
		exit(1);
	}

	nonblock(clifd);
	for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
	{
		if(list[n] == 0) {
#ifdef DEBUG
			printf("connected %d\n", clifd);
#endif
			list[n] = clifd;
			clifd = -1;
		}
	}
	
	if(clifd != -1) {
		printf("list FULL");
	}
}

void set_list() {
	int n;
	FD_ZERO(&fdset);

	for(n = 0; n < MAX_CLIENT; n++)
	{
		if(list[n] != 0) {
			FD_SET(list[n], &fdset);
			if(list[n] > highfd)
				highfd = list[n];
		}
	}
}

void recv_data(int num)
{
	int n;
	char buffer[1024];
	n = recv(list[num], buffer, 1023, 0);
	if(n > 0)
	{
#ifdef DEBUG
		printf("%d bytes from %d\n", n, list[num]);
		printf("%s", buffer);
#endif
	}
	else if(n < 0)
	{
		if(errno != EAGAIN)
			printf("client %d disconnected\n", list[num]);
	}
	else if(n == 0)
	{
		printf("client %d disconnected\n", list[num]);
	}
}


void send_data(int num)
{
	int n;
	char buffer[1024];
	sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
	if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
	{
#ifdef DEBUG
		printf("%d bytes sent from %d\n", n, list[num]);
#endif
	}
}

void scan_clients()
{
#ifdef DEBUG
	printf("scan_clients\n");
#endif
	int num;
	
	for(num = 0; num < MAX_CLIENT; num++) {
		if(FD_ISSET(list[num], &fdset))
			recv_data(num);
	}

	for(num = 0; num < MAX_CLIENT; num++)
		send_data(num);
}

int main()
{
	int readsocks;
	int i=0;
	struct timeval tv;
	tv.tv_sec = 0;
	tv.tv_usec = 10;
	
	while(i < MAX_CLIENT)
	{
		cli_con();
		i++;	
	}
	i=0;
	while(i < MAX_CLIENT)
	{
		printf("list[%d] = %d\n",i,list[i]);
		i++;
	}
	i = 0;
	while(1)
	{
		set_list();
		readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
		if(readsocks < 0) {
			perror("select\n");
			exit(1);
		}
		 
		/*printf("else scan_clients\n");*/
		scan_clients();
#ifdef DEBUG
		sleep(4);
#endif
	}
	return 0;
}