1. 程式人生 > >c++執行緒池

c++執行緒池

      本文給出了一個通用的執行緒池框架,該框架將與執行緒執行相關的任務進行了高層次的抽象,使之與具體的執行任務無關。另外該執行緒池具有動態伸縮性,它能根據執行任務的輕重自動調整執行緒池中執行緒的數量。文章的最後,我們給出一個簡單示例程式,通過該示例程式,我們會發現,通過該執行緒池框架執行多執行緒任務是多麼的簡單。

為什麼需要執行緒池
      目前的大多數網路伺服器,包括Web伺服器、Email伺服器以及資料庫伺服器等都具有一個共同點,就是單位時間內必須處理數目巨大的連線請求,但處理時間卻相對較短。
傳統多執行緒方案中我們採用的伺服器模型則是一旦接受到請求之後,即建立一個新的執行緒,由該執行緒執行任務。任務執行完畢後,執行緒退出,這就是是“即時建立,即時銷燬”的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數極其頻繁,那麼伺服器將處於不停的建立執行緒,銷燬執行緒的狀態。

我們將傳統方案中的執行緒執行過程分為三個過程:T1、T2、T3。

T1:執行緒建立時間
T2:執行緒執行時間,包括執行緒的同步等時間
T3:執行緒銷燬時間

那麼我們可以看出,執行緒本身的開銷所佔的比例為(T1+T3) / (T1+T2+T3)。如果執行緒執行的時間很短的話,這比開銷可能佔到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。

      除此之外,執行緒池能夠減少建立的執行緒個數。通常執行緒池所允許的併發執行緒是有上界的,如果同時需要併發的執行緒數超過上界,那麼一部分執行緒將會等待。而傳統方案中,如果同時請求數目為2000,那麼最壞情況下,系統可能需要產生2000個執行緒。儘管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。

      因此執行緒池的出現正是著眼於減少執行緒池本身帶來的開銷。執行緒池採用預建立的技術,在應用程式啟動之後,將立即建立一定數量的執行緒(N1),放入空閒佇列中。這些執行緒都是處於阻塞(Suspended)狀態,不消耗CPU,但佔用較小的記憶體空間。當任務到來後,緩衝池選擇一個空閒執行緒,把任務傳入此執行緒中執行。當N1個執行緒都在處理任務後,緩衝池自動建立一定數量的新執行緒,用於處理更多的任務。在任務執行完畢後執行緒也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閒時,大部分執行緒都一直處於暫停狀態,執行緒池自動銷燬一部分執行緒,回收系統資源。
      基於這種預建立技術,執行緒池將執行緒建立和銷燬本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的執行緒本身開銷則越小,不過我們另外可能需要考慮進去執行緒之間同步所帶來的開銷。

構建執行緒池框架

一般執行緒池都必須具備下面幾個組成部分:
執行緒池管理器:用於建立並管理執行緒池
工作執行緒: 執行緒池中實際執行的執行緒
任務介面: 儘管執行緒池大多數情況下是用來支援網路伺服器,但是我們將執行緒執行的任務抽象出來,形成任務介面,從而是的執行緒池與具體的任務無關。
任務佇列:執行緒池的概念具體到實現則可能是佇列,連結串列之類的資料結構,其中儲存執行執行緒。

我們實現的通用執行緒池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括執行緒同步使用的類CThreadMutex和CCondition。
CJob是所有的任務的基類,其提供一個介面Run,所有的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。
CThread是Linux中執行緒的包裝,其封裝了Linux執行緒最經常使用的屬性和方法,它也是一個抽象類,是所有執行緒類的基類,具有一個介面Run。
CWorkerThread是實際被排程和執行的執行緒類,其從CThread繼承而來,實現了CThread中的Run方法。
CThreadPool是執行緒池類,其負責儲存執行緒,釋放執行緒以及排程執行緒。
CThreadManage是執行緒池與使用者的直接介面,其遮蔽了內部的具體實現。
CThreadMutex用於執行緒之間的互斥。
CCondition則是條件變數的封裝,用於執行緒之間的同步。

執行緒池的時序很簡單。CThreadManage直接跟客戶端打交道,其接受需要建立的執行緒初始個數,並接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上呼叫的都是CThreadPool的相關操作。CThreadPool建立具體的執行緒,並把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。

理解系統元件

下面我們分開來了解系統中的各個元件。
CThreadManage
CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:

複製程式碼
 1 class CThreadManage 
 2 { 
 3 private: 
 4     CThreadPool*    m_Pool; 
 5     int          m_NumOfThread; 
 6 protected: 
 7 public: 
 8     void     SetParallelNum(int num); 
 9     CThreadManage(); 
10     CThreadManage(int num); 
11     virtual ~CThreadManage(); 
12 
13     void    Run(CJob* job,void* jobdata); 
14     void    TerminateAll(void); 
15 }; 
複製程式碼

其中m_Pool指向實際的執行緒池;m_NumOfThread是初始建立時候允許建立的併發的執行緒個數。另外Run和TerminateAll方法也非常簡單,只是簡單的呼叫CThreadPool的一些相關方法而已。其具體的實現如下: 

複製程式碼
 1 CThreadManage::CThreadManage(){ 
 2     m_NumOfThread = 10; 
 3     m_Pool = new CThreadPool(m_NumOfThread); 
 4 } 
 5 CThreadManage::CThreadManage(int num){ 
 6     m_NumOfThread = num; 
 7     m_Pool = new CThreadPool(m_NumOfThread); 
 8 } 
 9 CThreadManage::~CThreadManage(){ 
10     if(NULL != m_Pool) 
11     delete m_Pool; 
12 } 
13 void CThreadManage::SetParallelNum(int num){ 
14     m_NumOfThread = num; 
15 } 
16 void CThreadManage::Run(CJob* job,void* jobdata){ 
17     m_Pool->Run(job,jobdata); 
18 } 
19 void CThreadManage::TerminateAll(void){ 
20     m_Pool->TerminateAll(); 
21 } 
複製程式碼

CThread
CThread 類實現了對Linux中執行緒操作的封裝,它是所有執行緒的基類,也是一個抽象類,提供了一個抽象介面Run,所有的CThread都必須實現該Run方法。CThread的定義如下所示:

複製程式碼
 1 class CThread 
 2 { 
 3 private: 
 4     int          m_ErrCode; 
 5     Semaphore    m_ThreadSemaphore;  //the inner semaphore, which is used to realize 
 6     unsigned     long m_ThreadID;   
 7     bool         m_Detach;       //The thread is detached 
 8     bool         m_CreateSuspended;  //if suspend after creating 
 9     char*        m_ThreadName; 
10     ThreadState m_ThreadState;      //the state of the thread 
11 protected: 
12     void     SetErrcode(int errcode){m_ErrCode = errcode;} 
13     static void* ThreadFunction(void*); 
14 public: 
15     CThread(); 
16     CThread(bool createsuspended,bool detach); 
17     virtual ~CThread(); 
18     virtual void Run(void) = 0; 
19     void     SetThreadState(ThreadState state){m_ThreadState = state;} 
20 
21     bool     Terminate(void);    //Terminate the threa 
22     bool     Start(void);        //Start to execute the thread 
23     void     Exit(void); 
24     bool     Wakeup(void); 
25    
26     ThreadState  GetThreadState(void){return m_ThreadState;} 
27     int      GetLastError(void){return m_ErrCode;} 
28     void     SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);} 
29     char*    GetThreadName(void){return m_ThreadName;} 
30     int      GetThreadID(void){return m_ThreadID;} 
31 
32     bool     SetPriority(int priority); 
33     int      GetPriority(void); 
34     int      GetConcurrency(void); 
35     void     SetConcurrency(int num); 
36     bool     Detach(void); 
37     bool     Join(void); 
38     bool     Yield(void); 
39     int      Self(void); 
40 }; 
複製程式碼

執行緒的狀態可以分為四種,空閒、忙碌、掛起、終止(包括正常退出和非正常退出)。由於目前Linux執行緒庫不支援掛起操作,因此,我們的此處的掛起操作類似於暫停。如果執行緒建立後不想立即執行任務,那麼我們可以將其“暫停”,如果需要執行,則喚醒。有一點必須注意的是,一旦執行緒開始執行任務,將不能被掛起,其將一直執行任務至完畢。
執行緒類的相關操作均十分簡單。執行緒的執行入口是從Start()函式開始,其將呼叫函式ThreadFunction,ThreadFunction再呼叫實際的Run函式,執行實際的任務。

CThreadPool
CThreadPool是執行緒的承載容器,一般可以將其實現為堆疊、單向佇列或者雙向佇列。在我們的系統中我們使用STL Vector對執行緒進行儲存。CThreadPool的實現程式碼如下:

複製程式碼
  1 class CThreadPool 
  2 { 
  3 friend class CWorkerThread; 
  4 private: 
  5     unsigned int m_MaxNum;   //the max thread num that can create at the same time 
  6     unsigned int m_AvailLow; //The min num of idle thread that shoule kept 
  7     unsigned int m_AvailHigh;    //The max num of idle thread that kept at the same time 
  8     unsigned int m_AvailNum; //the normal thread num of idle num; 
  9     unsigned int m_InitNum;  //Normal thread num; 
 10 protected: 
 11     CWorkerThread* GetIdleThread(void); 
 12 
 13     void    AppendToIdleList(CWorkerThread* jobthread); 
 14     void    MoveToBusyList(CWorkerThread* idlethread); 
 15     void    MoveToIdleList(CWorkerThread* busythread); 
 16 
 17     void    DeleteIdleThread(int num); 
 18     void    CreateIdleThread(int num); 
 19 public: 
 20     CThreadMutex m_BusyMutex;    //when visit busy list,use m_BusyMutex to lock and unlock 
 21     CThreadMutex m_IdleMutex;    //when visit idle list,use m_IdleMutex to lock and unlock 
 22     CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock 
 23     CThreadMutex m_VarMutex; 
 24 
 25     CCondition       m_BusyCond; //m_BusyCond is used to sync busy thread list 
 26     CCondition       m_IdleCond; //m_IdleCond is used to sync idle thread list 
 27     CCondition       m_IdleJobCond;  //m_JobCond is used to sync job list 
 28     CCondition       m_MaxNumCond; 
 29 
 30     vector<CWorkerThread*>   m_ThreadList; 
 31     vector<CWorkerThread*>   m_BusyList;     //Thread List 
 32     vector<CWorkerThread*>   m_IdleList; //Idle List 
 33 
 34     CThreadPool(); 
 35     CThreadPool(int initnum); 
 36     virtual ~CThreadPool(); 
 37 
 38     void    SetMaxNum(int maxnum){m_MaxNum = maxnum;} 
 39     int     GetMaxNum(void){return m_MaxNum;} 
 40     void    SetAvailLowNum(int minnum){m_AvailLow = minnum;} 
 41     int     GetAvailLowNum(void){return m_AvailLow;} 
 42     void    SetAvailHighNum(int highnum){m_AvailHigh = highnum;} 
 43     int     GetAvailHighNum(void){return m_AvailHigh;} 
 44     int     GetActualAvailNum(void){return m_AvailNum;} 
 45     int     GetAllNum(void){return m_ThreadList.size();} 
 46     int     GetBusyNum(void){return m_BusyList.size();} 
 47     void    SetInitNum(int initnum){m_InitNum = initnum;} 
 48     int     GetInitNum(void){return m_InitNum;} 
 49   
 50     void    TerminateAll(void); 
 51     void    Run(CJob* job,void* jobdata); 
 52 }; 
 53 CThreadPool::CThreadPool() 
 54 { 
 55     m_MaxNum = 50; 
 56     m_AvailLow = 5; 
 57     m_InitNum=m_AvailNum = 10 ;  
 58     m_AvailHigh = 20; 
 59 
 60     m_BusyList.clear(); 
 61     m_IdleList.clear(); 
 62     for(int i=0;i<m_InitNum;i++){ 
 63     CWorkerThread* thr = new CWorkerThread(); 
 64     thr->SetThreadPool(this); 
 65     AppendToIdleList(thr); 
 66     thr->Start(); 
 67     } 
 68 } 
 69 
 70 CThreadPool::CThreadPool(int initnum) 
 71 { 
 72     assert(initnum>0 && initnum<=30); 
 73     m_MaxNum   = 30; 
 74     m_AvailLow = initnum-10>0?initnum-10:3; 
 75     m_InitNum=m_AvailNum = initnum ;  
 76     m_AvailHigh = initnum+10; 
 77 
 78     m_BusyList.clear(); 
 79     m_IdleList.clear(); 
 80     for(int i=0;i<m_InitNum;i++){ 
 81     CWorkerThread* thr = new CWorkerThread(); 
 82     AppendToIdleList(thr); 
 83     thr->SetThreadPool(this); 
 84     thr->Start();       //begin the thread,the thread wait for job 
 85     } 
 86 } 
 87 
 88 CThreadPool::~CThreadPool() 
 89 { 
 90    TerminateAll(); 
 91 } 
 92 
 93 void CThreadPool::TerminateAll() 
 94 { 
 95     for(int i=0;i < m_ThreadList.size();i++) { 
 96     CWorkerThread* thr = m_ThreadList[i]; 
 97     thr->Join(); 
 98     } 
 99     return; 
100 } 
101 
102 CWorkerThread* CThreadPool::GetIdleThread(void) 
103 { 
104     while(m_IdleList.size() ==0 ) 
105     m_IdleCond.Wait(); 
106    
107     m_IdleMutex.Lock(); 
108     if(m_IdleList.size() > 0 ) 
109     { 
110     CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); 
111     printf("Get Idle thread %dn",thr->GetThreadID()); 
112     m_IdleMutex.Unlock(); 
113     return thr; 
114     } 
115     m_IdleMutex.Unlock(); 
116 
117     return NULL; 
118 } 
119 
120 //add an idle thread to idle list 
121 void CThreadPool::AppendToIdleList(CWorkerThread* jobthread) 
122 { 
123     m_IdleMutex.Lock(); 
124     m_IdleList.push_back(jobthread); 
125     m_ThreadList.push_back(jobthread); 
126     m_IdleMutex.Unlock(); 
127 } 
128 
129 //move and idle thread to busy thread 
130 void CThreadPool::MoveToBusyList(CWorkerThread* idlethread) 
131 { 
132     m_BusyMutex.Lock(); 
133     m_BusyList.push_back(idlethread); 
134     m_AvailNum--; 
135     m_BusyMutex.Unlock(); 
136   
137     m_IdleMutex.Lock(); 
138     vector<CWorkerThread*>::iterator pos; 
139     pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread); 
140     if(pos !=m_IdleList.end()) 
141     m_IdleList.erase(pos); 
142     m_IdleMutex.Unlock(); 
143 } 
144 
145 void CThreadPool::MoveToIdleList(CWorkerThread* busythread) 
146 { 
147     m_IdleMutex.Lock(); 
148     m_IdleList.push_back(busythread); 
149     m_AvailNum++; 
150     m_IdleMutex.Unlock(); 
151 
152     m_BusyMutex.Lock(); 
153     vector<CWorkerThread*>::iterator pos; 
154     pos = find(m_BusyList.begin(),m_BusyList.end(),busythread); 
155     if(pos!=m_BusyList.end()) 
156     m_BusyList.erase(pos); 
157     m_BusyMutex.Unlock(); 
158 
159     m_IdleCond.Signal(); 
160     m_MaxNumCond.Signal(); 
161 } 
162 
163 //create num idle thread and put them to idlelist 
164 void CThreadPool::CreateIdleThread(int num) 
165 { 
166     for(int i=0;i<num;i++){ 
167     CWorkerThread* thr = new CWorkerThread(); 
168     thr->SetThreadPool(this); 
169     AppendToIdleList(thr); 
170     m_VarMutex.Lock(); 
171     m_AvailNum++; 
172     m_VarMutex.Unlock(); 
173     thr->Start();       //begin the thread,the thread wait for job 
174     } 
175 } 
176 
177 void CThreadPool::DeleteIdleThread(int num) 
178 { 
179     printf("Enter into CThreadPool::DeleteIdleThreadn"); 
180     m_IdleMutex.Lock(); 
181     printf("Delete Num is %dn",num); 
182     for(int i=0;i<num;i++){ 
183     CWorkerThread* thr; 
184     if(m_IdleList.size() > 0 ){ 
185             thr = (CWorkerThread*)m_IdleList.front(); 
186             printf("Get Idle thread %dn",thr->GetThreadID()); 
187     } 
188 
189     vector<CWorkerThread*>::iterator pos; 
190     pos = find(m_IdleList.begin(),m_IdleList.end(),thr); 
191     if(pos!=m_IdleList.end()) 
192         m_IdleList.erase(pos); 
193     m_AvailNum--; 
194     printf("The idle thread available num:%d n",m_AvailNum); 
195     printf("The idlelist              num:%d n",m_IdleList.size()); 
196     } 
197     m_IdleMutex.Unlock(); 
198 } 
199 void CThreadPool::Run(CJob* job,void* jobdata) 
200 { 
201     assert(job!=NULL); 
202    
203     //if the busy thread num adds to m_MaxNum,so we should wait 
204     if(GetBusyNum() == m_MaxNum) 
205         m_MaxNumCond.Wait(); 
206 
207     if(m_IdleList.size()<m_AvailLow) 
208     { 
209     if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
210         CreateIdleThread(m_InitNum-m_IdleList.size()); 
211     else 
212         CreateIdleThread(m_MaxNum-GetAllNum()); 
213     } 
214 
215     CWorkerThread*  idlethr = GetIdleThread(); 
216     if(idlethr !=NULL) 
217     { 
218     idlethr->m_WorkMutex.Lock(); 
219     MoveToBusyList(idlethr); 
220     idlethr->SetThreadPool(this); 
221     job->SetWorkThread(idlethr); 
222     printf("Job is set to thread %d n",idlethr->GetThreadID()); 
223     idlethr->SetJob(job,jobdata); 
224     } 
225 } 
複製程式碼

在CThreadPool中存在兩個連結串列,一個是空閒連結串列,一個是忙碌連結串列。Idle連結串列中存放所有的空閒程序,當執行緒執行任務時候,其狀態變為忙碌狀態,同時從空閒連結串列中刪除,並移至忙碌連結串列中。在CThreadPool的建構函式中,我們將執行下面的程式碼: 

複製程式碼
1 for(int i=0;i<m_InitNum;i++) 
2     { 
3     CWorkerThread* thr = new CWorkerThread(); 
4     AppendToIdleList(thr); 
5     thr->SetThreadPool(this); 
6     thr->Start();       //begin the thread,the thread wait for job 
7     } 
複製程式碼

在該程式碼中,我們將建立m_InitNum個執行緒,建立之後即呼叫AppendToIdleList放入Idle連結串列中,由於目前沒有任務分發給這些執行緒,因此執行緒執行Start後將自己掛起。
事實上,執行緒池中容納的執行緒數目並不是一成不變的,其會根據執行負載進行自動伸縮。為此在CThreadPool中設定四個變數:
m_InitNum:處世建立時執行緒池中的執行緒的個數。
m_MaxNum:當前執行緒池中所允許併發存在的執行緒的最大數目。
m_AvailLow:當前執行緒池中所允許存在的空閒執行緒的最小數目,如果空閒數目低於該值,表明負載可能過重,此時有必要增加空閒執行緒池的數目。實現中我們總是將執行緒調整為m_InitNum個。
m_AvailHigh:當前執行緒池中所允許的空閒的執行緒的最大數目,如果空閒數目高於該值,表明當前負載可能較輕,此時將刪除多餘的空閒執行緒,刪除後調整數也為m_InitNum個。
m_AvailNum:目前執行緒池中實際存在的執行緒的個數,其值介於m_AvailHigh和m_AvailLow之間。如果執行緒的個數始終維持在m_AvailLow和m_AvailHigh之間,則執行緒既不需要建立,也不需要刪除,保持平衡狀態。因此如何設定m_AvailLow和m_AvailHigh的值,使得執行緒池最大可能的保持平衡態,是執行緒池設計必須考慮的問題。
執行緒池在接受到新的任務之後,執行緒池首先要檢查是否有足夠的空閒池可用。檢查分為三個步驟:
      (1)檢查當前處於忙碌狀態的執行緒是否達到了設定的最大值m_MaxNum,如果達到了,表明目前沒有空閒執行緒可用,而且也不能建立新的執行緒,因此必須等待直到有執行緒執行完畢返回到空閒佇列中。
      (2)如果當前的空閒執行緒數目小於我們設定的最小的空閒數目m_AvailLow,則我們必須建立新的執行緒,預設情況下,建立後的執行緒數目應該為m_InitNum,因此建立的執行緒數目應該為( 當前空閒執行緒數與m_InitNum);但是有一種特殊情況必須考慮,就是現有的執行緒總數加上建立後的執行緒數可能超過m_MaxNum,因此我們必須對執行緒的建立區別對待。

1 if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 
2         CreateIdleThread(m_InitNum-m_IdleList.size()); 
3     else 
4         CreateIdleThread(m_MaxNum-GetAllNum());

如果建立後總數不超過m_MaxNum,則建立後的執行緒為m_InitNum;如果超過了,則只建立( m_MaxNum-當前執行緒總數 )個。
      (3)呼叫GetIdleThread方法查詢空閒執行緒。如果當前沒有空閒執行緒,則掛起;否則將任務指派給該執行緒,同時將其移入忙碌佇列。
當執行緒執行完畢後,其會呼叫MoveToIdleList方法移入空閒連結串列中,其中還呼叫m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的執行緒。

CWorkerThread
CWorkerThread是CThread的派生類,是事實上的工作執行緒。在CThreadPool的建構函式中,我們建立了一定數量的CWorkerThread。一旦這些執行緒建立完畢,我們將呼叫Start()啟動該執行緒。Start方法最終會呼叫Run方法。Run方法是個無限迴圈的過程。在沒有接受到實際的任務的時候,m_Job為NULL,此時執行緒將呼叫Wait方法進行等待,從而處於掛起狀態。一旦執行緒池將具體的任務分發給該執行緒,其將被喚醒,從而通知執行緒從掛起的地方繼續執行。CWorkerThread的完整定義如下:

複製程式碼
 1 class CWorkerThread:public CThread 
 2 { 
 3 private: 
 4     CThreadPool*  m_ThreadPool; 
 5     CJob*    m_Job; 
 6     void*    m_JobData; 
 7    
 8     CThreadMutex m_VarMutex; 
 9     bool      m_IsEnd; 
10 protected: 
11 public: 
12     CCondition   m_JobCond; 
13     CThreadMutex m_WorkMutex; 
14     CWorkerThread(); 
15     virtual ~CWorkerThread(); 
16     void Run(); 
17     void    SetJob(CJob* job,void* jobdata); 
18     CJob*   GetJob(void){return m_Job;} 
19     void    SetThreadPool(CThreadPool* thrpool); 
20     CThreadPool* GetThreadPool(void){return m_ThreadPool;} 
21 }; 
22 CWorkerThread::CWorkerThread() 
23 { 
24     m_Job = NULL; 
25     m_JobData = NULL; 
26     m_ThreadPool = NULL; 
27     m_IsEnd = false; 
28 } 
29 CWorkerThread::~CWorkerThread() 
30 { 
31     if(NULL != m_Job) 
32     delete m_Job; 
33     if(m_ThreadPool != NULL) 
34     delete m_ThreadPool; 
35 } 
36 
37 void CWorkerThread::Run() 
38 { 
39     SetThreadState(THREAD_RUNNING); 
40     for(;;) 
41     { 
42     while(m_Job == NULL) 
43         m_JobCond.Wait(); 
44 
45     m_Job->Run(m_JobData); 
46     m_Job->SetWorkThread(NULL); 
47     m_Job = NULL; 
48     m_ThreadPool->MoveToIdleList(this); 
49     if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum()) 
50     { 
51 m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_T 
52 hreadPool->GetInitNum()); 
53     } 
54     m_WorkMutex.Unlock(); 
55     } 
56 } 
57 void CWorkerThread::SetJob(CJob* job,void* jobdata) 
58 { 
59     m_VarMutex.Lock(); 
60     m_Job = job; 
61     m_JobData = jobdata; 
62     job->SetWorkThread(this); 
63     m_VarMutex.Unlock(); 
64     m_JobCond.Signal(); 
65 } 
66 void CWorkerThread::SetThreadPool(CThreadPool* thrpool) 
67 { 
68     m_VarMutex.Lock(); 
69     m_ThreadPool = thrpool; 
70     m_VarMutex.Unlock(); 
71 } 
複製程式碼

      當執行緒執行任務之前首先必須判斷空閒執行緒的數目是否低於m_AvailLow,如果低於,則必須建立足夠的空閒執行緒,使其數目達到m_InitNum個,然後將呼叫MoveToBusyList()移出空閒佇列,移入忙碌佇列。當任務執行完畢後,其又呼叫MoveToIdleList()移出忙碌佇列,移入空閒佇列,等待新的任務。
      除了Run方法之外,CWorkerThread中另外一個重要的方法就是SetJob,該方法將實際的任務賦值給執行緒。當沒有任何執行任務即m_Job為NULL的時候,執行緒將呼叫m_JobCond.Wait進行等待。一旦Job被賦值給執行緒,其將呼叫m_JobCond.Signal方法喚醒該執行緒。由於m_JobCond屬於執行緒內部的變數,每個執行緒都維持一個m_JobCond,只有得到任務的執行緒才被喚醒,沒有得到任務的將繼續等待。無論一個執行緒何時被喚醒,其都將從等待的地方繼續執行m_Job->Run(m_JobData),這是執行緒執行實際任務的地方。
      線上程執行給定Job期間,我們必須防止另外一個Job又賦給該執行緒,因此在賦值之前,通過m_VarMutex進行鎖定, Job執行期間,其於的Job將不能關聯到該執行緒;任務執行完畢,我們呼叫m_VarMutex.Unlock()進行解鎖,此時,執行緒又可以接受新的執行任務。
線上程執行任務結束後返回空閒佇列前,我們還需要判斷當前空閒佇列中的執行緒是否高於m_AvailHigh個。如果超過m_AvailHigh,則必須從其中刪除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())個執行緒,使執行緒數目保持在m_InitNum個。

CJob
CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,程式碼如下:

複製程式碼
 1 class CJob 
 2 { 
 3 private: 
 4     int      m_JobNo;        //The num was assigned to the job 
 5     char*    m_JobName;      //The job name 
 6     CThread  *m_pWorkThread;     //The thread associated with the job 
 7 public: 
 8     CJob( void ); 
 9     virtual ~CJob(); 
10        
11     int      GetJobNo(void) const { return m_JobNo; } 
12     void     SetJobNo(int jobno){ m_JobNo = jobno;} 
13     char*    GetJobName(void) const { return m_JobName; } 
14     void     SetJobName(char* jobname); 
15     CThread *GetWorkThread(void){ return m_pWorkThread; } 
16     void     SetWorkThread ( CThread *pWorkThread ){ 
17         m_pWorkThread = pWorkThread; 
18     } 
19     virtual void Run ( void *ptr ) = 0; 
20 }; 
21 CJob::CJob(void) 
22 :m_pWorkThread(NULL) 
23 ,m_JobNo(0) 
24 ,m_JobName(NULL) 
25 { 
26 } 
27 CJob::~CJob(){ 
28     if(NULL != m_JobName) 
29     free(m_JobName); 
30 } 
31 void CJob::SetJobName(char* jobname) 
32 { 
33     if(NULL !=m_JobName)    { 
34         free(m_JobName); 
35         m_JobName = NULL; 
36     } 
37     if(NULL !=jobname)    { 
38         m_JobName = (char*)malloc(strlen(jobname)+1); 
39         strcpy(m_JobName,jobname); 
40     } 
41 } 
複製程式碼

執行緒池使用示例
至此我們給出了一個簡單的與具體任務無關的執行緒池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然後將該Job交由CThreadManage去執行。下面我們給出一個簡單的示例程式

複製程式碼
 1 class CXJob:public CJob 
 2 { 
 3 public: 
 4     CXJob(){i=0;} 
 5     ~CXJob(){} 
 6     void Run(void* jobdata)    { 
 7         printf("The Job comes from CXJOB\n"); 
 8         sleep(2); 
 9     } 
10 }; 
11 
12 class CYJob:public CJob 
13 { 
14 public: 
15     CYJob(){i=0;} 
16     ~CYJob(){} 
17     void Run(void* jobdata)    { 
18         printf("The Job comes from CYJob\n"); 
19     } 
20 }; 
21 
22 main() 
23 { 
24     CThreadManage* manage = new CThreadManage(10); 
25     for(int i=0;i<40;i++) 
26     { 
27         CXJob*   job = new CXJob(); 
28         manage->Run(job,NULL); 
29     } 
30     sleep(2); 
31     CYJob* job = new CYJob(); 
32     manage->Run(job,NULL); 
33     manage->TerminateAll(); 
34 } 
複製程式碼

CXJob和CYJob都是從Job類繼承而來,其都實現了Run介面。CXJob只是簡單的列印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然後均休眠2秒鐘。在主程式中我們初始建立10個工作執行緒。然後分別執行40次CXJob和一次CYJob。

執行緒池使用後記

執行緒池適合場合:
事實上,執行緒池並不是萬能的。它有其特定的使用場合。執行緒池致力於減少執行緒本身的開銷對應用所產生的影響,這是有前提的,前提就是執行緒本身開銷與執行緒執行任務相比不可忽略。如果執行緒本身的開銷相對於執行緒任務執行開銷而言是可以忽略不計的,那麼此時執行緒池所帶來的好處是不明顯的,比如對於FTP伺服器以及Telnet伺服器,通常傳送檔案的時間較長,開銷較大,那麼此時,我們採用執行緒池未必是理想的方法,我們可以選擇“即時建立,即時銷燬”的策略。
總之執行緒池通常適合下面的幾個場合:
(1) 單位時間內處理任務頻繁而且任務處理時間短
(2) 對實時性要求較高。如果接受到任務後在建立執行緒,可能滿足不了實時要求,因此必須採用執行緒池進行預建立。
(3) 必須經常面對高突發性事件,比如Web伺服器,如果有足球轉播,則伺服器將產生巨大的衝擊。此時如果採取傳統方法,則必須不停的大量產生執行緒,銷燬執行緒。此時採用動態執行緒池可以避免這種情況的發生。

結束語
本文給出了一個簡單的通用的與任務無關的執行緒池的實現,通過該執行緒池能夠極大的簡化Linux下多執行緒的開發工作。該執行緒池的進一步完善開發工作還在進行中,希望能夠得到你的建議和支援。
參考資料
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtml
POSIX多執行緒程式設計,David R.Butenhof 譯者:於磊 曾剛,中國電力出版社
C++面向物件多執行緒程式設計,CAMERON HUGHES等著 周良忠譯,人民郵電出版社
Java Pro,結合線程和分析器池,Edy Yu