1. 程式人生 > >通用執行緒池的設計和實現[C語言]

通用執行緒池的設計和實現[C語言]


1 適用場景

    首先,必須明確一點,執行緒池不是萬能的,它有其特定的使用場景。使用執行緒池是為了減小執行緒本身的開銷對應用效能所產生的影響,但是其前提是執行緒本身建立、銷燬的開銷和執行緒執行任務的開銷相比是不可忽略的。如果執行緒本身建立、銷燬的開銷對應用程式的效能可以忽略不計,那麼使用/不使用執行緒池對程式的效能並不會有太大的影響。因此,執行緒池通常適合以下幾種場景:

    1)、單位時間內處理的任務頻繁,且任務時間較短

    2)、對實時性要求較高。如果接收到任務之後再建立執行緒,可能無法滿足實時性的要求,此時必須使用執行緒池。

    3)、必須經常面對高突發性事件。比如Web伺服器。如果有足球轉播,則伺服器將產生巨大沖擊,此時使用傳統方法,則必須不停的大量建立、銷燬執行緒。此時採用動態執行緒池可以避免這種情況的發生。

2 程式碼實現

注意事項:因非分離執行緒在異常退出時,作業系統無法及時回收其佔用的記憶體空間,因此,使用分離執行緒。但請注意:不要使用pthread_detach()來使執行緒成為分離執行緒,而應該通過執行緒屬性(pthread_attr_t)的引數來設定執行緒為分離執行緒。否則,當執行緒退出後,再呼叫pthread_kill()來判斷執行緒是否還存在時,很可能出現段錯誤

2.1 標頭檔案

#if !defined(__THREAD_POOL_H__)
#define __THREAD_POOL_H__

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <memory.h>
#include <pthread.h>
#include <sys/types.h>

// 布林型別
typedef int bool;
#define false (0)
#define true  (1)

/* 執行緒任務連結串列 */
typedef struct _thread_worker_t
{
	void *(*process)(void *arg);  /* 執行緒處理的任務 */
	void *arg;                    /* 任務介面引數 */
	struct _thread_worker_t *next;/* 下一個節點 */
}thread_worker_t;

/* 執行緒池物件 */
typedef struct
{
	pthread_mutex_t queue_lock;   /* 佇列互斥鎖 */
	pthread_cond_t queue_ready;   /* 佇列條件鎖 */

	thread_worker_t *head;        /* 任務佇列頭指標 */
	bool isdestroy;               /* 是否已銷燬執行緒 */
	pthread_t *threadid;          /* 執行緒ID陣列 —動態分配空間 */
	int reqnum;                   /* 請求建立的執行緒個數 */
	int num;                      /* 實際建立的執行緒個數 */
	int queue_size;               /* 工作隊列當前大小 */
}thread_pool_t;

/* 函式宣告 */
extern int thread_pool_init(thread_pool_t **pool, int num);
extern int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg);
extern int thread_pool_keepalive(thread_pool_t *pool);
extern int thread_pool_destroy(thread_pool_t *pool);

#endif /*__THREAD_POOL_H__*/

2.2 函式實現

/*************************************************************
 **功  能:執行緒池的初始化
 **參  數:
 **    pool:執行緒池物件
 **    num :執行緒池中執行緒個數
 **返回值:0:成功 !0: 失敗
 *************************************************************/
int thread_pool_init(thread_pool_t **pool, int num)
{
	int idx = 0;

    /* 為執行緒池分配空間 */
	*pool = (thread_pool_t*)calloc(1, sizeof(thread_pool_t));
	if(NULL == *pool) {
		return -1;
	}

    /* 初始化執行緒池 */
	pthread_mutex_init(&((*pool)->queue_lock), NULL);
	pthread_cond_init(&((*pool)->queue_ready), NULL);
	(*pool)->head = NULL;
	(*pool)->reqnum = num;
	(*pool)->queue_size = 0;
	(*pool)->isdestroy = false;
	(*pool)->threadid = (pthread_t*)calloc(1, num*sizeof(pthread_t));
	if(NULL == (*pool)->threadid) {
		free(*pool);
		(*pool) = NULL;

		return -1;
	}

        /* 依次建立執行緒 */
	for(idx=0; idx<num; idx++) {
		ret = thread_create_detach(*pool, idx);
		if(0 != ret) {
			return -1;
		}
		(*pool)->num++;
	}

	return 0;
}
/*************************************************************
 **功  能:將任務加入執行緒池處理佇列
 **參  數:
 **    pool:執行緒池物件
 **    process:需處理的任務
 **    arg: process函式的引數
 **返回值:0:成功 !0: 失敗
 *************************************************************/
int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg)
{
	thread_worker_t *worker=NULL, *member=NULL;
	
	worker = (thread_worker_t*)calloc(1, sizeof(thread_worker_t));
	if(NULL == worker) {
		return -1;
	}

	worker->process = process;
	worker->arg = arg;
	worker->next = NULL;

	pthread_mutex_lock(&(pool->queue_lock));

	member = pool->head;
	if(NULL != member) {
		while(NULL != member->next) member = member->next;
		member->next = worker;
	}
	else {
pool->head = worker;}pool->queue_size++;pthread_mutex_unlock(&(pool->queue_lock));pthread_cond_signal(&(pool->queue_ready));return 0;}
/******************************************************************************
 **函式名稱: thread_pool_keepalive
 **功    能: 執行緒保活
 **輸入引數: 
 **       pool: 執行緒池
 **輸出引數: NONE
 **返    回: 0: success !0: failed
 **實現過程:
 **      1. 判斷執行緒是否存在
 **      2. 不存在,說明執行緒死亡,需重新建立
 ******************************************************************************/
int thread_pool_keepalive(thread_pool_t *pool)
{
	int idx=0, ret=0;

 	for(idx=0; idx<pool->num; idx++) {
		ret = pthread_kill(pool->thread[idx], 0);
		if(ESRCH == ret) {
			ret = thread_create_detach(pool, idx);
			if(ret < 0) {
				return -1;
			}
		}
	}

	return 0;
}
/*************************************************************
 **功  能:執行緒池的銷燬
 **參  數:
 **    pool:執行緒池物件
 **返回值:0:成功 !0: 失敗
 *************************************************************/
int thread_pool_destroy(thread_pool_t *pool)
{
	int idx = 0;
	thread_worker_t *member = NULL;

	if(false != pool->isdestroy) {
		return -1;
	}

	pool->isdestroy = true;

	pthread_cond_broadcast(&(pool->queue_ready));
	for(idx=0; idx<pool->num; idx++) {
		ret = pthread_kill(pool->threadid[idx], 0);
		if(ESRCH == ret) {
			continue;
		}
		else {
			idx--;
			sleep(1);
		}
	}

	free(pool->threadid);
	pool->threadid = NULL;

	while(NULL != pool->head) {
		member = pool->head;
		pool->head = member->next;
		free(member);
	}

	pthread_mutex_destroy(&(pool->queue_lock));
	pthread_cond_destroy(&(pool->queue_ready));
	free(pool);
	
	return 0;
}
/******************************************************************************
 **函式名稱: thread_create_detach
 **功    能: 建立分離執行緒
 **輸入引數: 
 **       pool: 執行緒池
 **       idx: 執行緒索引號
 **輸出引數: NONE
 **返    回: 0: success !0: failed
 ******************************************************************************/
static int thread_create_detach(thread_pool_t *pool, int idx)
{
	int ret = 0;
	pthread_attr_t attr;

	do {
		ret = pthread_attr_init(&attr);
		if(0 != ret) {
			return -1;
		}
		
		ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
		if(0 != ret) {
			return -1;
		}
		
		ret = pthread_create(&((*pool)->threadid[idx]), &attr, thread_routine, *pool);
		if(0 != ret) {
			pthread_attr_destroy(&attr);
			
			if(EINTR == errno) {
				continue;
			}
			return -1;
		}
		pthread_attr_destroy(&attr);
	}while(0);

	return 0;
}
/*************************************************************
 **功  能:執行緒池各個執行緒入口函式
 **參  數:
 **    arg:執行緒池物件
 **返回值:0:成功 !0: 失敗
 *************************************************************/
static void *thread_routine(void *arg)
{
	thread_worker_t *worker = NULL;
	thread_pool_t *pool = (thread_pool_t*)arg;

	while(1) {
		pthread_mutex_lock(&(pool->queue_lock));
		while((false == pool->isdestroy) && (0 == pool->queue_size)) {
			pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
		}

		if(false != pool->isdestroy) {
			pthread_mutex_unlock(&(pool->queue_lock));
			pthread_exit(NULL);
		}

		pool->queue_size--;
		worker = pool->head;
		pool->head = worker->next;
		pthread_mutex_unlock(&(pool->queue_lock);
		/* 執行佇列中的任務 */
		(*(worker->process))(worker->arg);

		free(worker);
		worker = NULL;
	}
}

3 函式呼叫

#define THREAD_MAX_NUM (32)
#define SLEEP	(10)

int myprocess(void *arg)
{
	fprintf(stdout, "[%s][%d] threadid:%d arg:%d", __FILE__, __LINE__, pthread_self(), *(int*)arg);
	return 0;
}

int main(void)
{
	int ret=0, idx=0;
	thread_pool_t *pool = NULL;
	int array[THREAD_MAX_NUM] = {0};

	ret = thread_pool_init(&pool, THREAD_MAX_NUM);
	if(ret < 0) {
		return -1;
	}

	for(idx=0; idx<THREAD_MAX_NUM; idx++) {
		array[idx] = idx;
		thread_pool_add_worker(pool, myprocess, &array[idx]); /* 注意:地址各不相同 */
	}

	thread_pool_destroy(pool);
	pool = NULL;
	return 0;
}