執行緒池基礎
為什麼要有執行緒池
沒有執行緒池會出現的問題。
大多數的網路伺服器,包括Web伺服器都具有一個特點,就是單位時間內必須處理數目巨大的連線請求,但是處理時間卻是比較短的。在傳統的多執行緒伺服器模型中是這樣實現的:一旦有個請求到達,就建立一個新的執行緒,由該執行緒執行任務,任務執行完畢之後,執行緒就退出。這就是”即時建立,即時銷燬”的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數非常頻繁,那麼伺服器就將處於一個不停的建立執行緒和銷燬執行緒的狀態。這筆開銷是不可忽略的,尤其是執行緒執行的時間非常非常短的情況。
執行緒池的思路
執行緒池就是為了解決上述問題的,它的實現原理是這樣的:在應用程式啟動之後,就馬上建立一定數量的執行緒,放入空閒的佇列中。這些執行緒都是處於阻塞狀態,這些執行緒只佔一點記憶體,不佔用CPU。當任務到來後,執行緒池將選擇一個空閒的執行緒,將任務傳入此執行緒中執行。當所有的執行緒都處在處理任務的時候,執行緒池將自動建立一定的數量的新執行緒,用於處理更多的任務。執行任務完成之後執行緒並不退出,而是繼續線上程池中等待下一次任務。當大部分執行緒處於阻塞狀態時,執行緒池將自動銷燬一部分的執行緒,回收系統資源。
執行緒池的設計思路
1、現實生活中的例子
以銀行服務大廳為例。可以把銀行的服務大廳看作一個執行緒池。 在大廳椅子上等待視窗有空位的儲戶們視為(任務佇列), 櫃檯窗口裡的櫃員視為執行任務的執行緒(執行佇列)。銀行服務大廳還有一個非常重要的組成就是提示燈牌,這個燈牌會協調櫃員和儲戶的配合,保證每個儲戶只會同時被一個櫃員服務,保證只要有一個櫃員空閒且有儲戶正在等待的時候這個櫃員一定會服務這個儲戶。銀行服務廳的燈牌就相當於執行緒池中協調執行緒和任務的池管理元件(互斥鎖,條件變數,訊號量)等等。
2、組成執行緒池的幾部分
1、 就以上例子我們知道,要實現一個執行緒池要在程式開始時先建立一批執行緒,這一批建立好了的執行緒要然他們在沒有任務可以執行的時候阻塞住,有一個任務需要執行的時候其中一個執行緒能夠獲取到任務,其他執行緒繼續阻塞。獲取到任務的執行緒在將任務函式執行完畢之後重新阻塞住等待下一個任務,直到執行緒池要被銷燬的時候建立的所有執行緒被銷燬。
2、 每當要執行一個任務的時候,該任務被加入到任務佇列中去,同時通知阻塞等待任務的某個執行緒有任務要執行了,這時候收到有任務需要執行條件的執行緒從任務佇列中拿走一個任務去執行。這裡實際上是生產者消費者模型,生產者是將任務放進任務佇列的執行緒,消費者是執行任務的執行緒,任務佇列是交易場所。要維護好這個生產者消費者模型就必須用到互斥鎖,條件變數,
訊號量等等。這些就是執行緒池的管理元件。
C語言中實現執行緒池的資料結構設計:
1. 任務佇列(要執行的任務函式的函式指標,函式的引數)
2. 執行佇列(執行緒id, 執行緒終止標識, 池管理元件)
3. 池管理元件(互斥鎖,條件變數, 任務佇列, 執行佇列)
執行緒池的程式碼實現:C版本
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//在佇列中增加一個結點的巨集函式 (頭插)
#define LL_ADD(item, list) do { \
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
//在佇列中刪除一個節點的巨集函式
#define LL_REMOVE(item, list) do { \
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
typedef struct NWORKER {//這裡是執行佇列,佇列中的結點有執行任務的執行緒id,表示執行緒是否執行完畢任務的標誌terminate
//指向池管理元件的指標,指向任務佇列中前後結點的指標。
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
typedef struct NJOB {//這裡是任務佇列,佇列中每個結點有執行任務的函式指標,執行任務需要的資料
//還有指向任務佇列前後結點的指標。
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
typedef struct NWORKQUEUE {//池管理元件,有一個指向任務佇列的指標和一個指向執行佇列的指標
struct NWORKER *workers;//還有管理執行緒池需要的互斥量和條件變數。
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr) {//建立執行緒的執行緒入口函式
nWorker *worker = (nWorker*)ptr;/輸入的引數是執行佇列的結點
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);//加鎖
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;//執行緒任務結束 不等待條件變數並釋放鎖
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}//條件變數控制當有任務時 獲取到鎖 如果該執行緒沒有從任務佇列取到任務則阻塞在這裡。
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);//解鎖
break;//如果執行緒需要結束了就跳出while迴圈 然後釋放
}
nJob *job = worker->workqueue->waiting_jobs;w出任務佇列中取出一個任務
if (job != NULL) {//如果任務佇列中有任務
LL_REMOVE(job, worker->workqueue->waiting_jobs);//將任務佇列中被取出放入執行佇列的任務清除
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);//呼叫真正執行任務的函式。執行完畢後繼續while迴圈等待新的任務
}
free(worker);
pthread_exit(NULL);
}
//建立一個執行緒池 //池管理元件的指標 //初始化執行緒池中執行緒的數量
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, ezeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;//初始化池管理元件中的條件變數
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;//初始化池管理元件中的鎖
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++) {//迴圈建立執行緒
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));//建立一個執行佇列的結點
if (worker == NULL) {
perror("malloc");
return 1;i
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;//執行佇列結點指向池管理元件
//建立一個以執行佇列結點中的thread為執行緒id,執行佇列結點的指標為執行緒入口函式的引數
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);//建立執行緒失敗的話,釋放執行佇列的結點
return 1;
}
LL_ADD(worker, worker->workqueue->workers);//將執行任務的結點加入執行佇列
}
return 0;
}
//關閉執行緒池
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;
e for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
wiorker->terminate = 1;//將每個執行緒的狀態設定為已退出。
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);//當有新的任務的時候 ,新增到任務佇列中去
w
pthread_cond_signal(&workqueue->jobs_cond);x//告訴池控制元件已經有新的任務了
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/************************** debug thread pool **************************/
#define KING_MAX_THREAD 80
#define KING_COUNTER_SIZE 1000
void king_counter(nJob *job) {//執行緒池中執行緒執行的任務
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lu\n", index, pthread_self());//打印出執行緒處理函式的引數,並且打印出執行緒id
free(job->user_data);
free(job);
}
int main(int argc, char *argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, KING_MAX_THREAD);//建立執行緒池
int i = 0;
for (i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = king_counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;//構造任務佇列的任務
ntyThreadPoolQueue(&pool, job);//向任務佇列中插入一個任務並通過條件變數告訴執行佇列有任務到來了
}
getchar();
printf("\n");
}
C++語言中執行緒池資料結構的設計:
執行緒池被封裝為一個類,類中的成員有vector 或 list型別的任務佇列,創建出來的所以執行緒的id,所有執行緒的退出標誌。池管理控制元件(訊號量,互斥量,條件變數等)。執行緒入口函式的函式指標等。
執行緒池的實現 C++:
執行緒池控制元件檔案: 封裝互斥鎖與訊號量
locker.h
#ifndef __LOCKER_H__
#define __LOCKER_H__
#include<exception>
#include<pthread.h>
#include<semaphore.h>
class sem//訊號量的類
{
public:
sem();//建立訊號量
~sem();//刪除訊號量
bool wait();//訊號量P操作 減一
bool post();//訊號量V操作 加一
private:
sem_t m_sem;
};
class locker
{
public:
locker();//建立鎖
~locker();//釋放鎖
bool lock();//加鎖
bool unlock();//解鎖
private:
pthread_mutex_t m_mutex;
};
class cond
{
public:
cond();//建立條件變數
~cond();//釋放條件變數
bool wait();//等待條件變數
bool signal();//通知條件變數就緒
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
locker.cpp
#include"locker.h"
sem::sem()
{
if(sem_init(&m_sem,0,0)!=0)
{
throw std::exception();
}
}
sem::~sem()
{
sem_destroy(&m_sem);
}
bool sem::wait()
{
return sem_wait(&m_sem)==0;
}
bool sem::post()
{
return sem_post(&m_sem)==0;
}
locker::locker()
{
if(pthread_mutex_init(&m_mutex,NULL)!=0)
{
throw std::exception();
}
}
locker::~locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool locker::lock()
{
return pthread_mutex_lock(&m_mutex);
}
bool locker::unlock()
{
return pthread_mutex_unlock(&m_mutex);
}
cond::cond()
{
if(pthread_mutex_init(&m_mutex,NULL)!=0)
{
throw std::exception();
}
if(pthread_cond_init(&m_cond,NULL)!=0)
{
throw std::exception();
}
}
cond::~cond()
{
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
bool cond::wait()
{
int ret=0;
pthread_mutex_lock(&m_mutex);
ret=pthread_cond_wait(&m_cond,&m_mutex); //等待條件滿足在執行下一步
pthread_mutex_unlock(&m_mutex);
return ret==0;
}
bool cond::signal()
{
return pthread_cond_signal(&m_cond)==0;
}
執行緒池:
#include<iostream>
#include<pthread.h>
#include<cstdio>
#include<list>
#include<exception>
#include"locker.h"
//為了提高複用性,我們用模板實現執行緒池,模板型別T是任務類
template<typename T>
class threadpool
{
public:
threadpool(int thread_number=8,int max_requests=1000);
~threadpool();
bool append(T* request); //向請求佇列中新增請求
private:
static void* worker(void *arg); //執行緒執行函式
void run(); //實際執行執行緒任務的函式
private:
int m_thread_number; //執行緒的數量
int m_max_requests; //請求佇列的最大容量
pthread_t *m_threads; //指向管理執行緒tid的陣列
std::list<T*> m_workqueue; //請求佇列
locker m_queuelocker; //保護請求佇列的互斥鎖
sem m_queuestat; //請求佇列中是否有任務要處理
bool m_stop; //是否結束執行緒
};
template<typename T>
threadpool<T>::threadpool(int thread_number,int max_requests)//創造一個執行緒池
:m_thread_number(thread_number)
,m_max_requests(max_requests)
,m_stop(false)
,m_threads(NULL)
{
if((thread_number<=0)||(max_requests<=0))
{
throw std::exception();
}
m_threads=new pthread_t[thread_number]; //定義執行緒id陣列
if(!m_threads)
{
throw std::exception();
}
for(int i=0;i<thread_number;i++) //建立thread_number個執行緒,並且將其設定為分離狀態
{
if(pthread_create(m_threads+i,NULL,worker,(void*)this)!=0)
{//因為threadpool類中有請求佇列,池的控制元件(鎖,訊號量等),還有執行佇列(執行緒tid陣列)
//所以這裡作為建立執行緒時執行緒入口函式的引數
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i]))//執行緒建立成功後自動分離
{
delete [] m_threads;
throw std::exception();
}
}
}
template<typename T>
threadpool<T>::~threadpool()
{
delete [] m_threads;
m_stop=true;
}
template<typename T>//該函式是實際使用執行緒池的時候呼叫的函式
bool threadpool<T>::append(T* request) //向請求佇列中新增請求任務
{
m_queuelocker.lock();//請求佇列是臨界資源所以必須要加鎖
if(m_workqueue.size()>m_max_requests) //確保請求佇列中沒有被任務堆積滿
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post(); //每新增一個任務,訊號量增加 通知
//這裡相當於條件變數singal() 通知執行佇列有任務可以取到
return true;
}
template<typename T>
void* threadpool<T>::worker(void *arg)
{
threadpool *pool=(threadpool*)arg;
pool->run(); //呼叫run函式處理請求佇列中的請求任務
return pool;
}
template<typename T>
void threadpool<T>::run() //處理請求佇列中的請求任務
{
while(!m_stop)
{
m_queuestat.wait();//等待請求佇列中有任務供執行緒去執行 沒有任務的話該執行緒阻塞在這裡。
m_queuelocker.lock();
if(m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
T* request=m_workqueue.front();
m_workqueue.pop_front();//從請求佇列中取出一個任務。
m_queuelocker.unlock();
request->process(); //process是任務類裡面的一個方法 這是執行緒真正在執行的任務。
}
}