1. 程式人生 > >線程池代碼(通用版)

線程池代碼(通用版)

line cor 使用場景 鏈表 http sde 依次 線程 HR

一、適用場景

首先,必須明確一點,線程池不是萬能的,它有其特定的使用場景。使用線程池是為了減小線程本身的開銷對應用性能所產生的影響,但是其 前提是線程本身創建、銷毀的開銷和線程執行任務的開銷相比是不可忽略的 。如果線程本身創建、銷毀的開銷對應用程序的性能可以忽略不計,那麽使用/不使用線程池對程序的性能並不會有太大的影響。

線程池通常適合以下幾種場景:

①、單位時間內處理的任務頻繁,且任務時間較短

②、對實時性要求較高。如果接收到任務之後再創建線程,可能無法滿足實時性的要求,此時必須使用線程池。

③、必須經常面對高突發性事件。比如Web服務器。如果有足球轉播,則服務器將產生巨大沖擊,此時使用傳統方法,則必須不停的大量創建、銷毀線程。此時采用動態線程池可以避免這種情況的發生。

二、代碼實現

2.1 頭文件

#if !defined(__THREAD_POOL_H__)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <memory.h>
#include <pthread.h>
#include <sys/types.h>

// 布爾類型
typedef int bool;
#define false (0)
#define true  (1)

/* 線程任務鏈表 */
typedef struct _thread_worker_t
{
	void *(*process)(void *arg);  /* 線程處理的任務 */
	void *arg;                    /* 任務接口參數 */
	struct _thread_worker_t *next;/* 下一個節點 */
}thread_worker_t;

/* 線程池對象 */
typedef struct
{
	pthread_mutex_t queue_lock;   /* 隊列互斥鎖 */
	pthread_cond_t queue_ready;   /* 隊列條件鎖 */

	thread_worker_t *head;        /* 任務隊列頭指針 */
	bool isdestroy;               /* 是否已銷毀線程 */
	pthread_t *threadid;          /* 線程ID數組 —動態分配空間 */
	int num;                      /* 線程個數 */
	int queue_size;               /* 工作隊列當前大小 */
}thread_pool_t;

/* 函數聲明 */
extern int thread_pool_init(thread_pool_t **pool, int num);
extern int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg);
extern int thread_pool_destroy(thread_pool_t *pool);

#endif /*__THREAD_POOL_H__*/

  

2.2 函數實現

/*************************************************************
 **功  能:線程池的初始化
 **參  數:
 **    pool:線程池對象
 **    num :線程池中線程個數
 **返回值:0:成功 !0: 失敗
 *************************************************************/
int thread_pool_init(thread_pool_t **pool, int num)
{
	int idx = 0;

        /* 為線程池分配空間 */
	*pool = (thread_pool_t*)calloc(1, sizeof(thread_pool_t));
	if(NULL == *pool)
	{
		return -1;
	}

        /* 初始化線程池 */
	pthread_mutex_init(&((*pool)->queue_lock), NULL);
	pthread_cond_init(&((*pool)->queue_ready), NULL);
	(*pool)->head = NULL;
	(*pool)->num = num;
	(*pool)->queue_size = 0;
	(*pool)->isdestroy = false;
	(*pool)->threadid = (pthread_t*)calloc(1, num*sizeof(pthread_t));
	if(NULL == (*pool)->threadid)
	{
		free(*pool);
		(*pool) = NULL;

		return -1;
	}

        /* 依次創建線程 */
	for(idx=0; idx<num; idx++)
	{
		pthread_create(&((*pool)->threadid[idx]), NULL, thread_routine, *pool);
	}

	return 0;
}

  

/*************************************************************
 **功  能:將任務加入線程池處理隊列
 **參  數:
 **    pool:線程池對象
 **    process:需處理的任務
 **    arg: process函數的參數
 **返回值:0:成功 !0: 失敗
 *************************************************************/
int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg)
{
	thread_worker_t *worker=NULL, *member=NULL;
	
	worker = (thread_worker_t*)calloc(1, sizeof(thread_worker_t));
	if(NULL == worker)
	{
		return -1;
	}

	worker->process = process;
	worker->arg = arg;
	worker->next = NULL;

	pthread_mutex_lock(&(pool->queue_lock));

	member = pool->head;
	if(NULL != member)
	{
		while(NULL != member->next) member = member->next;
		member->next = worker;
	}
	else
	{
		pool->head = worker;
	}

	pool->queue_size++;

	pthread_mutex_unlock(&(pool->queue_lock));
	pthread_cond_signal(&(pool->queue_ready));

	return 0;
}

  

/*************************************************************
 **功  能:線程池的銷毀
 **參  數:
 **    pool:線程池對象
 **返回值:0:成功 !0: 失敗
 *************************************************************/
int thread_pool_destroy(thread_pool_t *pool)
{
	int idx = 0;
	thread_worker_t *member = NULL;

	if(false != pool->isdestroy)
	{
		return -1;
	}

	pool->isdestroy = true;

	pthread_cond_broadcast(&(pool->queue_ready));
	for(idx=0; idx<pool->num; idx++)
	{
		pthread_join(pool->threadid[idx], NULL);
	}

	free(pool->threadid);
	pool->threadid = NULL;

	while(NULL != pool->head)
	{
		member = pool->head;
		pool->head = member->next;
		free(member);
	}

	pthread_mutex_destroy(&(pool->queue_lock));
	pthread_cond_destroy(&(pool->queue_ready));
	free(pool);
	
	return 0;
}

  

/*************************************************************
 **功  能:線程池各個線程入口函數
 **參  數:
 **    arg:線程池對象
 **返回值:0:成功 !0: 失敗
 *************************************************************/
static void *thread_routine(void *arg)
{
	thread_worker_t *worker = NULL;
	thread_pool_t *pool = (thread_pool_t*)arg;

	while(1)
	{
		pthread_mutex_lock(&(pool->queue_lock));
		while((false == pool->isdestroy) && (0 == pool->queue_size))
		{
			pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
		}

		if(false != pool->isdestroy)
		{
			pthread_mutex_unlock(&(pool->queue_lock));
			pthread_exit(NULL);
		}

		pool->queue_size--;
		worker = pool->head;
		pool->head = worker->next;
		pthread_mutex_unlock(&(pool->queue_lock));

                /* 執行隊列中的任務 */
		(*(worker->process))(worker->arg);

		free(worker);
		worker = NULL;
	}
}

通用版代碼:https://www.cnblogs.com/cthon/p/9097007.html  

難度升級版代碼:https://www.cnblogs.com/cthon/p/9085623.html

線程池代碼(通用版)