Windows下一個比較完美的執行緒池實現(更新)
1. 前言
執行緒池不是一個新鮮的東西,網上能找到很多原理、實現,甚至很多庫都提供了實現,比如微軟的 ATL::CThreadPool, Vista後提供的CreateThreadpoolWork, boost 中提供的 thread_pool, CSDN、CodeProject 等網站上很多人已經實現的類。但這些庫往往只支援啟動任務,而不能很好地停止任務(相信很多人都會和我一樣有這個需求),於是我在FTL中寫了一個個人認為還比較完美的執行緒池。
2. 功能
本執行緒池提供瞭如下功能:
1. 能根據任務個數和當前執行緒的多少在最小/最大執行緒個數之間自動調整(Vista後的系統有 SetThreadpoolThreadMaximum 等函式有類似功能),現已支援動態改變最小、最大值
2. 能方便的對任一任務進行取消操作,無論該任務是等待執行狀態還是正在執行狀態都支援(相比較而言,WaitForThreadpoolWorkCallbacks 函式只能取消尚未執行的任務);
3. 能對單個Job或整個執行緒池進行 暫停、繼續、停止 處理 -- 需要 JobBase 的子類根據 GetJobWaitType 的返回值進行處理,能對整個執行緒池進行安全的暫停、繼續、停止處理 。
4. 支援回撥方式的反饋通知
5. 使用模版方式實現,能方便的進行引數傳遞
6. 在加入任務時可以設定優先順序(目前尚不支援動態調整) ,現已支援動態調整優先順序
7. 使用的是微軟的基本API,能支援WinXP、Vista、Win7等各種作業系統(CreateThreadpoolWork 等只能在Vista後才能使用)
8. 可以設定等待Job的最大個數,並在加入Job時,佇列滿了的話,可以設定等待超時並根據返回值進一步處理
3. UML圖示和簡單說明
其UML圖比較簡單,主要的只有三個類:
CFThreadPool – 執行緒池的管理類,負責整個執行緒池的管理工作,直接使用即可。
CFJobBase – Job的基類,如果想實現自己的Job,必須從這個類繼承,並實現其中的Run/ OnCancelJob 等函式。
IFThreadPoolCallBack– 可選的回撥實現,可以通知呼叫段 Job 的啟動、停止、取消、進度通知、錯誤等各種狀態的改變。
以下部分簡單介紹了一些比較重要的程式碼實現,具體請參見示例程式碼和其中的註釋部分。
本執行緒池類中有兩種Job的容器,分別是等待執行的Jobs和當前正在執行的Jobs。因為有不同的需求,其定義分別如下(兩種容器型別的選擇,其理由已經寫得比較詳細,大家自行分析即可)
//! 儲存等待Job的資訊,由於有優先順序的問題,而且從最前面開始取任務,因此儲存成set
//! 保證優先順序高、JobIndex小(同優先順序時FIFO) 的Job在最前面
typedeftypenameUnreferenceLess<CFJobBase<T>* >JobBaseUnreferenceLess;
typedefstd::set<CFJobBase<T>*,JobBaseUnreferenceLess > WaitingJobContainer;
WaitingJobContainer m_WaitingJobs; //!等待執行的Job
//! 儲存執行Job的資訊,由於會頻繁加入、刪除,且需要按照JobIndex查詢刪除,因此儲存成map
typedefstd::map<LONG,CFJobBase<T>* > DoingJobContainer;
DoingJobContainer m_DoingJobs; //! 正在執行的Job
程式碼中通過operator < 方法比較CFJobBase<T>::m_nJobPriority和 JobIndex,在SubmitJob時保證其在 set 中的順序來保證 優先順序高的Job先執行,相同優先順序的Job採用FIFO方式。
為了支援暫停、停止,Job的子類必須在工作迴圈中呼叫父類提供 GetJobWaitType方法,並判斷其返回值。如果返回值為ftwtStop 表示使用者請求了停止,需要進行必要的清除工作。GetJobWaitType的實現請參見程式碼(本質是等待m_hEventJobStop, m_pThreadPool->m_hEventStop, m_pThreadPool->m_hEventContinue這三個手動重置事件之一)
執行緒池中主要有以下的一些函式,因為意義根據名字很容易猜出,就不再詳細介紹,詳見示例。
Start(LONGnMinNumThreads, LONG nMaxNumThreads);
StopAndWait(DWORDdwTimeOut = FTL_MAX_THREAD_DEADLINE_CHECK);
ClearUndoWork();
SubmitJob(CFJobBase<T>*pJob, LONG* pOutJobIndex); //加入Job,會返回Job在Pool中的唯一索引,可以通過CancelJob 取消。
Pause/Resume/Stop// 對整個執行緒池請求暫停、繼續、停止的操作。注意:需要Job子類的配合才能達成目標。
CFJobBase子類需要 Initialize、Run、Finalize、OnCancelJob 這幾個虛擬函式。
當Job執行的時候,其邏輯為 if(Initialize){ Run -> Finalize; },需要在 Finalize 中釋放資源;
當Job沒有執行的時候就被取消或Pool停止,則會呼叫 OnCancelJob,需要在其中釋放資源。
編寫了簡單的MFC示例程式,其介面如圖所示:
因為UI不是重點,而且本人也比較懶,所以沒有弄很好的UI出來,所有的執行資訊請參見VisualStudio中的“輸出”視窗。
Start 按鈕啟動執行緒池,示例中設定的最小、最大執行緒個數是 2-4(即會根據加入的Job自動在 2-4 個執行緒之間自動調整)。
三個AddJob按鈕分別是增加 高、普通、低優先順序的Job,加入後可以在日誌中檢視其執行的順序。
兩個CancelJob按鈕分別是從前面、後面的JobIndex取消Job(實際上支援任和有效的JobIndex)。
注意:新介面
Pool 呼叫 IFThreadPoolCallBack 介面的各個方法時,為了效能上的考慮,沒有加鎖進行同步。使用時如果有需要,最好自行同步。
在開發過程中,考慮過是提供單任務的暫停、繼續 還是 整個Pool 的暫停、繼續,考慮到目前的需求,暫時只支援整個Pool的暫停,免得過分複雜,如有需要,大家可以自行參考CancelJob實現。
在開發的過程中,對各個部分都進行了詳細的分析、考慮和測試,應該沒有較大的Bug。目前只想到一種在極端情況下,可能會造成的Bug,特提出以免各位踩雷。
描述:因為是多執行緒的程式碼,所以在極端情況下,可能出現SubmitJob函式尚未返回,對應的Job就執行完畢(或 Initialize失敗直接返回)的情況。此時如果呼叫端採用了將 *pOutJobIndex儲存起來,在 OnJobEnd 中清除的邏輯,可能會因為 OnJobEnd 找不到對應的JobIndex 而出現邏輯錯誤。
解決方法:在呼叫 SubmitJob 的程式碼和OnJobEnd 的回撥程式碼中,使用相同的鎖機制保證即可(這個也應該由呼叫者來保證)。
原始碼和示例程式的下載地址請參見本人的資源列表(可能需要等一段CSDN重新整理的時間)。
新的下載地址: http://download.csdn.net/detail/fishjam/9409236
舊的下載地址: http://download.csdn.net/detail/fishjam/5106672