執行緒池淺析及C++程式碼實現
阿新 • • 發佈:2019-01-28
(1)什麼是執行緒池 執行緒池是一種多執行緒處理技術。執行緒池先建立好若干執行緒,並管理這些執行緒。當有新的任務到來時,將任務新增到一個已建立的空閒執行緒中執行。執行緒池所建立的執行緒優先順序都是一樣的,所以需要使用特定執行緒優先順序的任務不宜使用執行緒池。
(2)執行緒池的優點和應用 執行緒池統一管理執行緒的方式減少了頻繁建立和銷燬執行緒的系統排程開銷,很大程度上提高了伺服器處理併發任務的效能。 執行緒池適用於頻繁的任務排程,如處理HTTP請求,任務多,並且任務週期小
(3)C++程式碼實現
#include "stdafx.h" #include "stdafx.h" #include <assert.h> #include <windows.h> #include <map> #include <iostream> using namespace std; class ITask { public: virtual void ProcessTask(void* pUser)=0; }; //執行緒池 class CThreadPool { public: class ThreadInfo { public: ThreadInfo() { m_hThread=0;m_bBusyWorking=false;} ThreadInfo(HANDLEhandle, boolbBusy) { m_hThread=handle; m_bBusyWorking=bBusy; } ThreadInfo(const ThreadInfo& info){ m_hThread=info.m_hThread; m_bBusyWorking=info.m_bBusyWorking;} HANDLE m_hThread; bool m_bBusyWorking; }; typedef map<DWORD,ThreadInfo>ThreadInfoMap; typedef ThreadInfoMap::iterator Iterator_ThreadInfoMap; enum ThreadPoolStatus{ STATUS_BUSY, STATUS_IDLE,STATUS_NORMAL }; public: CThreadPool() { InitializeCriticalSection(&m_CS); } virtual ~CThreadPool() { DeleteCriticalSection(&m_CS); } bool Start(unsigned short nStatic, unsigned short nMax) { if(nMax<nStatic) { assert(0); return false; } HANDLE hThread; DWORD nThreadId; m_nNumberOfStaticThreads=nStatic; m_nNumberOfTotalThreads=nMax; //lock the resource EnterCriticalSection(&m_CS); //create an IO port m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE,NULL, 0, 0); hThread = CreateThread( NULL, // SD 0, // initial stack size (LPTHREAD_START_ROUTINE)ManagerProc, // threadfunction (LPVOID)this, // thread argument 0, // creationoption &nThreadId ); // thread identifier m_hMgrThread = hThread; //now we start these worker threads m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE,NULL, 0, 0); for(long n = 0; n < nStatic; n++) { hThread = CreateThread( NULL, // SD 0, // initial stack size (LPTHREAD_START_ROUTINE)WorkerProc, // threadfunction (LPVOID)this, //thread argument 0, //creation option &nThreadId ); m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false))); } LeaveCriticalSection(&m_CS); return true; } void Stop(bool bHash = false) { EnterCriticalSection(&m_CS); ::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF); WaitForSingleObject(m_hMgrThread,INFINITE); CloseHandle(m_hMgrThread); CloseHandle(m_hMgrIoPort); //shut down all the worker threads UINT nCount=m_threadMap.size(); HANDLE* pThread= new HANDLE[nCount]; long n=0; ThreadInfo info; Iterator_ThreadInfoMap i=m_threadMap.begin(); while(i!=m_threadMap.end()) { ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF); info=i->second; pThread[n++]=info.m_hThread; i++; } DWORD rc=WaitForMultipleObjects(nCount,pThread, TRUE,30000);//wait for 0.5 minutes, then start to killthreads CloseHandle(m_hWorkerIoPort); if(rc>=WAIT_OBJECT_0 && rc<WAIT_OBJECT_0+nCount) { for(unsigned int n=0;n<nCount;n++) { CloseHandle(pThread[n]); } } else if(rc==WAIT_TIMEOUT&&bHash) { //some threadsnot terminated, we have to stop them. DWORD exitCode; for(unsigned int i=0; i<nCount; i++) { if (::GetExitCodeThread(pThread[i],&exitCode)==STILL_ACTIVE) { TerminateThread(pThread[i], 99); } CloseHandle(pThread[i]); } } delete[] pThread; LeaveCriticalSection(&m_CS); } void AddTask(void* pUser, ITask* pWorker)const { ::PostQueuedCompletionStatus(m_hWorkerIoPort, \ reinterpret_cast<DWORD>(pWorker), \ reinterpret_cast<DWORD>(pUser),\ NULL); } protected: HANDLE GetMgrIoPort()const { return m_hMgrIoPort; } UINT GetMgrWaitTime()const { return1000; } HANDLE GetWorkerIoPort()const { return m_hWorkerIoPort; } private: static DWORD WINAPI WorkerProc(void* p) { //convert the parameter to the server pointer. CThreadPool* pServer=(CThreadPool*)p; HANDLE IoPort = pServer->GetWorkerIoPort(); unsigned long pN1,pN2; OVERLAPPED* pOverLapped; DWORD threadId=::GetCurrentThreadId(); while(::GetQueuedCompletionStatus(IoPort, &pN1,&pN2, &pOverLapped, INFINITE)) { if(pOverLapped ==(OVERLAPPED*)0xFFFFFFFE) { pServer->RemoveThread(threadId); break; } else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF) { break; } else { pServer->SetStatus(threadId, true); //retrieve the job description and agent pointer ITask* pTask = reinterpret_cast<ITask*>(pN1); void* pUser= reinterpret_cast<void*>(pN2); pTask->ProcessTask(pUser); pServer->SetStatus(threadId, false); } } return 0; } static DWORD WINAPI ManagerProc(void* p) { //convert the parameter to the server pointer. CThreadPool* pServer=(CThreadPool*)p; HANDLE IoPort = pServer->GetMgrIoPort(); unsigned long pN1,pN2; OVERLAPPED* pOverLapped; LABEL_MANAGER_PROCESSING: while(::GetQueuedCompletionStatus(IoPort, &pN1,&pN2, &pOverLapped, pServer->GetMgrWaitTime() )) { if(pOverLapped ==(OVERLAPPED*)0xFFFFFFFF) { return 0; } } //time out processing if (::GetLastError()==WAIT_TIMEOUT) { //the manager will take a look at all the worker's status.The if (pServer->GetStatus()==STATUS_BUSY) pServer->AddThreads(); if (pServer->GetStatus()==STATUS_IDLE) pServer->RemoveThreads(); goto LABEL_MANAGER_PROCESSING; } return 0; } protected: //manager thread HANDLE m_hMgrThread; HANDLE m_hMgrIoPort; protected: //configuration parameters mutable unsigned short m_nNumberOfStaticThreads; mutable unsigned short m_nNumberOfTotalThreads; protected: //helper functions void CThreadPool::AddThreads() { HANDLE hThread; DWORD nThreadId; unsigned int nCount=m_threadMap.size(); unsigned int nTotal=min(nCount+2, m_nNumberOfTotalThreads); for(unsigned int i=0; i<nTotal-nCount; i++) { hThread = CreateThread( NULL, // SD 0, // initial stack size (LPTHREAD_START_ROUTINE)WorkerProc, // threadfunction (LPVOID)this, //thread argument 0, //creation option &nThreadId ); m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false))); } } void RemoveThreads() { unsigned int nCount=m_threadMap.size(); unsigned int nTotal=max(nCount-2, m_nNumberOfStaticThreads); for(unsigned int i=0; i<nCount-nTotal; i++) { ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFE); } } ThreadPoolStatus GetStatus() { int nTotal = m_threadMap.size(); ThreadInfo info; int nCount=0; Iterator_ThreadInfoMap i=m_threadMap.begin(); while(i!=m_threadMap.end()) { info=i->second; if (info.m_bBusyWorking==true)nCount++; i++; } if ( nCount/(1.0*nTotal) > 0.8 ) return STATUS_BUSY; if ( nCount/ (1.0*nTotal) < 0.2 ) return STATUS_IDLE; return STATUS_NORMAL; } void SetStatus(DWORD threadId,bool status) { EnterCriticalSection(&m_CS); Iterator_ThreadInfoMap i; ThreadInfo info; i=m_threadMap.find(threadId); info=i->second; info.m_bBusyWorking=status; m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(threadId, info)); LeaveCriticalSection(&m_CS); } void CThreadPool::RemoveThread(DWORDthreadId) { EnterCriticalSection(&m_CS); m_threadMap.erase(threadId); LeaveCriticalSection(&m_CS); } protected: //all the work threads ThreadInfoMap m_threadMap; CRITICAL_SECTION m_CS; HANDLE m_hWorkerIoPort; }; ////////////////////測試程式碼//////////////////////////////////// class CTest:public ITask { public: CTest() { static int ii = 0; m_ii = ii ++; } void ProcessTask(void* pUser) { for(int i = 0; i <3; i ++) { cout<<"TaskID: "<<((CTest*)pUser)->m_ii<<endl; Sleep(100); } } ~CTest() { } private: int m_ii; }; int _tmain(int argc, _TCHAR* argv[]) { CThreadPool *pPool= new CThreadPool(); const int TEST_COUNT = 8; CTest test[TEST_COUNT]; //啟動執行緒池,5個處理執行緒,最多接受10個任務 pPool->Start(5,10); //新增任務到執行緒池中 for(int i = 0; i < TEST_COUNT; i++) pPool->AddTask(&test[i], &test[i]); cin.get(); //停止執行緒池 pPool->Stop(); return 0; }