libuv封裝任務執行緒池
阿新 • • 發佈:2018-11-24
Task.h
#ifndef __CTASK__H_
#define __CTASK__H_
#include "RcObject.h"
class CTask: public CRcObject
{
public:
virtual int TaskInit() = 0;
virtual int TaskExcute() = 0;
virtual int TaskQuit() = 0;
};
#endif
#ifndef __CUVTASKTHREAD__H_ #define __CUVTASKTHREAD__H_ #include "UvThread.h" #include "Task.h" class CUvTaskThread : public CUvThread { public: CUvTaskThread(); ~CUvTaskThread(); public: int SetTask(CTask* pTask); protected: int OnThreadRun(); CTask* mpTask; }; #endif #include "UvTaskThread.h" #include "UvTaskPool.h" CUvTaskThread::CUvTaskThread(){ mpTask = nullptr; } CUvTaskThread::~CUvTaskThread(){ } int CUvTaskThread::OnThreadRun() { for (;;) { if (nullptr == mpTask) { sUvTaskPool->PushTaskThread(this); Wait(); continue; } mpTask->TaskInit(); mpTask->TaskExcute(); mpTask->TaskQuit(); UNREF(mpTask); mpTask = nullptr; } return 0; } int CUvTaskThread::SetTask(CTask* pTask) { ASSERT_RET_VALUE(nullptr != pTask && nullptr == mpTask, 1); mpTask = pTask; REF(mpTask); return 0; }
#ifndef __CUVTASKPOOL__H_ #define __CUVTASKPOOL__H_ #include "singleton.h" #include "UvTaskThread.h" #include "UvMutex.h" #include <set> #include <queue> class CUvTaskPool : public CSingleton<CUvTaskPool>, public CUvThread { SINGLE_CLASS_INITIAL(CUvTaskPool); public: ~CUvTaskPool(); public: int PushTask(CTask* pTask); int PushTaskThread(CUvTaskThread* pTaskThread); private: CTask* PopTask(); int DispatchTask(CTask* pTask); protected: int OnThreadRun(); private: std::queue<CTask*> mqueTasks; CUvMutex mcQueTasksMutex; std::set<CUvTaskThread*> msetTaskThreads; CUvMutex mcTaskThreadsMutex; }; #define sUvTaskPool CUvTaskPool::Instance() #endif #include "UvTaskPool.h" CUvTaskPool::CUvTaskPool(){ } CUvTaskPool::~CUvTaskPool(){ } int CUvTaskPool::OnThreadRun() { for (;;) { CTask* pTask = PopTask(); if (nullptr == pTask) { Wait(); continue; } DispatchTask(pTask); } return 0; } CTask* CUvTaskPool::PopTask() { CTask* pTask = nullptr; mcQueTasksMutex.Lock(); if (!mqueTasks.empty()) { pTask = mqueTasks.front(); mqueTasks.pop(); } mcQueTasksMutex.UnLock(); return pTask; } int CUvTaskPool::PushTask(CTask* pTask) { ASSERT_RET_VALUE(nullptr != pTask, 1); mcQueTasksMutex.Lock(); mqueTasks.push(pTask); mcQueTasksMutex.UnLock(); Activate(); return 0; } int CUvTaskPool::PushTaskThread(CUvTaskThread* pTaskThread) { ASSERT_RET_VALUE(nullptr != pTaskThread, 1); mcTaskThreadsMutex.Lock(); msetTaskThreads.insert(pTaskThread); mcTaskThreadsMutex.UnLock(); return 0; } int CUvTaskPool::DispatchTask(CTask* pTask) { ASSERT_RET_VALUE(nullptr != pTask, 1); CUvTaskThread* pTaskThread = nullptr; mcTaskThreadsMutex.Lock(); std::set<CUvTaskThread*>::iterator iter = msetTaskThreads.begin(); if (iter != msetTaskThreads.end()) { pTaskThread = (CUvTaskThread*)*iter; if (nullptr != pTaskThread) { pTaskThread->SetTask(pTask); pTaskThread->Activate(); } msetTaskThreads.erase(iter); } mcTaskThreadsMutex.UnLock(); if (nullptr == pTaskThread) { pTaskThread = new CUvTaskThread(); ASSERT_RET_VALUE(nullptr != pTaskThread, 1); pTaskThread->SetTask(pTask); pTaskThread->Start(); } return 0; }