1. 程式人生 > >執行緒池和程序池

執行緒池和程序池

動態建立子程序(函式執行緒)實現併發伺服器的缺點

在前面的文章中我們是通過動態建立子程序(函式執行緒)來實現併發伺服器的,這樣做的缺點如下:

  1. 動態建立程序(或執行緒)是比較耗費時間的,這樣導致較慢的客戶響應
  2. 動態建立的子程序(子執行緒)通常只用來為一個客戶服務,這將導致系統上產生大量的細微程序(或執行緒)。程序間的切換將消耗大量的CPU時間
  3. 動態建立的子程序是當前程序的完整映像,當前程序必須謹慎地管理其分配的檔案描述符和堆記憶體等系統資源,否則子程序可能複製這些資源,從而使系統的可用資源急劇下降,進而影響伺服器的效能

為了解決這些問題,我們使用了程序池和執行緒池這兩種技術。

程序池和執行緒池概述

  • 設計思想:程序池是由伺服器預先建立的一組子程序,在伺服器程式啟動時就建立,當有客戶端連線時,就從池中分配程序或者執行緒為客戶端進行服務。這些子程序的典型數目在3~10個之間。httpd守護程序就是使用包含8個子程序的程序池來實現併發的。執行緒池中的執行緒數量應該和CPU數量差不多

httpd的程序池:

我們可以清楚地看到,當我們開啟httpd服務後,檢視程序時,有8個父程序都是9863的子程序,這就是httpd建立的程序池。

  • 優點:
  1. 程序池中的所有子程序都執行著相同的程式碼,並具有相同的屬性,比如優先順序,PGID等
    。因為程序池在伺服器啟動之初就建立好了,所以每個子程序都相對乾淨,即他們沒有開啟不必要的檔案描述符(從父程序繼承而來),也不會錯誤地使用大塊的堆記憶體。
  2. 當有新的任務到來時,主程序通過某種程序間通訊的方式選擇程序池中的某個子程序來為之服務。相比於動態建立,選擇一個已有的子程序(函式執行緒)的代價顯然要小得多

執行緒池的實現難點

  1. 主執行緒需要將檔案描述符,傳遞給函式執行緒
  2. 函式執行緒啟動後必須阻塞在獲取檔案描述符之前
  3. 訊號量來控制主執行緒向函式執行緒通知獲取檔案描述符事件
  4. 主執行緒在陣列中插入資料,以及函式執行緒獲取陣列中的資料必須是一個互斥的過程

程序池的實現難點

  1. 父程序需要將檔案描述符傳遞給子程序
  2. 由於檔案描述符是在fork之後建立,因此父子程序無法共享
  3. 程序池中的程序必須阻塞在獲取到客戶端的檔案描述符之前
  4. 傳遞檔案描述符時,不能僅僅傳遞一個整型值,而是檔案描述符

程序池原始碼

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<assert.h>
#include<fcntl.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>

static const int CONTROL_LEN = CMSG_LEN(sizeof(int));

/*檔案描述符傳送函式*/
void send_fd(int fd, int fd_to_send)
{
	struct iovec iov[1];
	struct msghdr msg;
	char buf[0];

	iov[0].iov_base = buf;
	iov[0].iov_len = 1;
	msg.msg_name = NULL;
	msg.msg_namelen = 0;
	msg.msg_iov = iov;
	msg.msg_iovlen = 1;

	struct cmsghdr cm;
	cm.cmsg_len = CONTROL_LEN;
	cm.cmsg_level = SOL_SOCKET;
	cm.cmsg_type = SCM_RIGHTS;
	*(int *)CMSG_DATA(&cm) = fd_to_send;
	msg.msg_control = &cm;
	msg.msg_controllen = CONTROL_LEN;

	sendmsg(fd, &msg, 0);
}


/*檔案描述符接收函式*/
int recv_fd(int fd)
{
	struct iovec iov[1];
	struct msghdr msg;
	char buf[0];

	iov[0].iov_base = buf;
	iov[0].iov_len = 1;
	msg.msg_name = NULL;
	msg.msg_namelen = 0;
	msg.msg_iov = iov;
	msg.msg_iovlen = 1;

	struct cmsghdr cm;
	msg.msg_control = &cm;
	msg.msg_controllen = CONTROL_LEN;

	recvmsg(fd, &msg, 0);

	int fd_to_read = *(int*)CMSG_DATA(&cm);

	return fd_to_read;
}

int main()
{
	int pipefd[2];
	int res = socketpair(PF_UNIX, SOCK_DGRAM, 0, pipefd);//建立管道為傳送檔案描述符做準備
	int sockfd = socket(PF_INET, SOCK_STREAM, 0);
	assert(sockfd != -1);

	struct sockaddr_in ser, cli;
	ser.sin_family = AF_INET;
	ser.sin_port = htons(6500);
	ser.sin_addr.s_addr = inet_addr("127.0.0.1");

	res = bind(sockfd, (struct sockaddr*)&ser, sizeof(ser));
	assert(res != -1);

	listen(sockfd, 5);
	
	int ret;
	int i = 0;
/*接收連線前先建立程序池*/
	for(; i < 3; i++)
	{
		ret = fork();
		if(ret == 0)
		{
			break;
		}
	}
	
	if(ret != 0)
	{
		close(pipefd[0]);//父程序關閉讀通道
		while(1)
		{
			socklen_t len = sizeof(cli);
			int c = accept(sockfd, (struct sockaddr*)&cli, &len);
			if(c == -1)
			{
				printf("accept error\n");
				continue;
			}
			send_fd(pipefd[1], c);//將接收到的檔案描述符寫入管道,供子程序讀取
			close(c);
		}
	}
	if(ret == 0)
	{
		while(1)
		{
			close(pipefd[1]);//子程序關閉寫通道
			int c = recv_fd(pipefd[0]);//讀取管道中的檔案描述符
			while(1)
			{
				char recvbuff[128] = {0};
				res = recv(c, recvbuff, 127, 0);
				if(res <= 0)
				{
					printf("%ddisclient\n", c);
					close(c);
					break;
				}
				printf("%d: %s\n", c, recvbuff);
				send(c, "OK", 2, 0);
			}
		}
	}
}

執行緒池原始碼

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<assert.h>
#include<pthread.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<semaphore.h>

pthread_mutex_t mutex;
int clilink[8];//已經被接收但還未被處理的連線請求
sem_t sem;
/*初始化連線陣列*/
void InitCliLink()
{
	int i = 0;
	for(; i < 8; i++)
	{
		clilink[i] = -1;
	}
}

/*插入新接收的連線,等待處理*/
int Insert(int c)
{
	pthread_mutex_lock(&mutex);//插入的同時時不能獲取,用鎖控制
	int i = 0;
	for(; i < 8; i++)
	{
		if(clilink[i] == -1)
		{
			clilink[i] = c;
			break;
		}
	}
	pthread_mutex_unlock(&mutex);
	if(i >= 8)
		return -1;
	return 0;
}

/*獲取連線,進行處理*/
int GetCli()
{
	pthread_mutex_lock(&mutex);//獲取時不能插入
	int i = 0;
	int c = clilink[0];
	for(; i < 7; i++)
	{
		clilink[i] = clilink[i+1];
	}
	pthread_mutex_unlock(&mutex);
	return c;
}

void *pthread_fun(void *arg);

int main()
{
	int sockfd = socket(PF_INET, SOCK_STREAM, 0);
	assert(sockfd != -1);

	struct sockaddr_in ser, cli;
	memset(&ser, 0, sizeof(ser));
	ser.sin_family = AF_INET;
	ser.sin_port = htons(6500);
	ser.sin_addr.s_addr = inet_addr("127.0.0.1");

	int res = bind(sockfd, (struct sockaddr*)&ser, sizeof(ser));
	assert(res != -1);

	listen(sockfd, 5);


/*建立執行緒池*/
	int i = 0;
	for(; i < 3; i++)
	{
		pthread_t id;
		res = pthread_create(&id, NULL, pthread_fun, NULL);
	}
	
	InitCliLink();
	sem_init(&sem, 0, 0);//初始化訊號量值為0,使得函式執行緒阻塞

	while(1)
	{
		int len = sizeof(cli);
		int c = accept(sockfd, (struct sockaddr*)&cli, &len);
		if(c < 0)
		{
			continue;
		}
		
		if(Insert(c) == -1)
		{
			close(c);
			continue;
		}
		sem_post(&sem);//訊號量+1,函式執行緒可以開始獲取陣列中等待的連線進行處理
	}
}

/*執行緒函式*/
void *pthread_fun(void *arg)
{
	while(1)
	{
		sem_wait(&sem);
		int c = GetCli();
			while(1)
			{
				char buff[128] = {0};
				int n = recv(c, buff, 127, 0);
				if(n <= 0)
				{
					close(c);
					break;
				}
				printf("%d: %s\n",c, buff);
				send(c, "ok", 2, 0);
			}
	}
}