muduo原始碼解析16-threadpool類
阿新 • • 發佈:2020-08-26
threadpool類:
class threadpool:noncopyable { };
作用:
利用mymuduo::thread 完成對於執行緒池的封裝
執行緒池內部成員:
執行緒集合m_threads: 用於儲存執行緒池內的所有執行緒
執行緒池任務佇列m_queue 表示待執行的任務佇列
條件變數:m_notFull,m_notEmpty, 條件變數,用於對任務佇列同步操作,感覺用一個也可以,就是判斷一下m_queue.size()即可
互斥鎖:m_mutex 用於和條件變數一起使用
其他成員:執行緒池的名字,執行狀態,最大任務佇列長度
執行緒池提供的操作:
設定任務佇列的最大長度
啟動執行緒池,
關閉執行緒池
執行一個任務,
獲取執行緒池的基本資訊,名字,執行狀態...
成員變數:
private: mutable mutexlock m_mutex; //配合條件變數使用的互斥鎖 condition m_notEmpty; //條件變數,任務列隊不空 condition m_notFull; //條件變數,任務列隊不滿 string m_name; //執行緒池名字 Task m_threadInitCallback;//任務函式,std::function<void()> //執行緒池內執行緒,用智慧指標,在vector中儲存 std::vector<std::unique_ptr<mymuduo::thread>> m_threads; std::deque<Task> m_queue; //執行緒池內的任務佇列 size_t m_maxQueueSize; //任務佇列最大長度 bool m_running; //執行緒池正在執行?
成員函式:
public: typedef std::function<void()> Task; //C++中,有時可以將建構函式用作自動型別轉換函式。但這種自動特性並非總是合乎要求的,有時會導致意外的型別轉換 //被explicit關鍵字修飾的類建構函式,不能進行自動地隱式型別轉換,只能顯式地進行型別轉換。 explicit threadpool(const string& nameArg=string("ThreadPool")); ~threadpool(); //設定最大任務佇列長度 void setMaxQueueSize(int maxSize){m_maxQueueSize=maxSize;} //設定回撥函式 void setThreadInitCallback(const Task& cb){m_threadInitCallback=cb;} //執行緒池啟動,建立numThreads個執行緒跑起來 void start(int numThreads); //回收子執行緒,並關閉執行緒池 void stop(); //獲得一些執行緒池相關資訊,名字,任務佇列大小 const string& name() const{return m_name;} size_t queueSize() const { mutexlockguard mlg(m_mutex); return m_queue.size(); } // Could block if maxQueueSize > 0 // Call after stop() will return immediately. // There is no move-only version of std::function in C++ as of C++14. // So we don't need to overload a const& and an && versions // as we do in (Bounded)BlockingQueue. // https://stackoverflow.com/a/25408989 //執行一個任務,實質是向任務佇列中新增一個任務 void run(Task f); private: //判斷佇列是否滿了 bool isFull() const; //執行緒池執行緒執行的函式 void runInThread(); //從任務佇列中取出一個任務,實質是取出一個函式在runInThread函式裡執行 Task take();
threadpool.h:
#ifndef THREADPOOL_H #define THREADPOOL_H #include"base/condition.h" #include"base/mutex.h" #include"base/thread.h" #include"base/types.h" #include<deque> #include<vector> namespace mymuduo{ class threadpool:noncopyable { public: typedef std::function<void()> Task; //C++中,有時可以將建構函式用作自動型別轉換函式。但這種自動特性並非總是合乎要求的,有時會導致意外的型別轉換 //被explicit關鍵字修飾的類建構函式,不能進行自動地隱式型別轉換,只能顯式地進行型別轉換。 explicit threadpool(const string& nameArg=string("ThreadPool")); ~threadpool(); //設定最大任務佇列長度 void setMaxQueueSize(int maxSize){m_maxQueueSize=maxSize;} //設定回撥函式 void setThreadInitCallback(const Task& cb){m_threadInitCallback=cb;} //執行緒池啟動,建立numThreads個執行緒跑起來 void start(int numThreads); //回收子執行緒,並關閉執行緒池 void stop(); //獲得一些執行緒池相關資訊,名字,任務佇列大小 const string& name() const{return m_name;} size_t queueSize() const { mutexlockguard mlg(m_mutex); return m_queue.size(); } // Could block if maxQueueSize > 0 // Call after stop() will return immediately. // There is no move-only version of std::function in C++ as of C++14. // So we don't need to overload a const& and an && versions // as we do in (Bounded)BlockingQueue. // https://stackoverflow.com/a/25408989 //執行一個任務,實質是向任務佇列中新增一個任務 void run(Task f); private: //判斷佇列是否滿了 bool isFull() const; //執行緒池執行緒執行的函式 void runInThread(); //從任務佇列中取出一個任務,實質是取出一個函式在runInThread函式裡執行 Task take(); mutable mutexlock m_mutex; //配合條件變數使用的互斥鎖 condition m_notEmpty; //條件變數,任務列隊不空 condition m_notFull; //條件變數,任務列隊不滿 string m_name; //執行緒池名字 Task m_threadInitCallback; //任務函式,std::function<void()> //執行緒池內執行緒,用智慧指標,在vector中儲存 std::vector<std::unique_ptr<mymuduo::thread>> m_threads; std::deque<Task> m_queue; //執行緒池內的任務佇列 size_t m_maxQueueSize; //任務佇列最大長度 bool m_running; //執行緒池正在執行? };//namespace mymuduo } #endif // THREADPOOL_H
threadpool.cpp:
#include "threadpool.h" #include"base/logging.h" #include"base/exception.h" #include<assert.h> #include<stdio.h> namespace mymuduo{ //建構函式僅用於初始化各種成員變數,其他什麼也不做 threadpool::threadpool(const string& nameArg) :m_mutex(),m_notEmpty(m_mutex),m_notFull(m_mutex),m_name(nameArg), m_maxQueueSize(0),m_running(false) { } threadpool::~threadpool() { if(m_running) //只有執行緒池處於執行狀態才可以關閉 this->stop(); } //啟動執行緒池,建立numThreads個執行緒 void threadpool::start(int numThreads) { assert(m_threads.empty()); m_running=true; m_threads.reserve(numThreads); //建立numThreads個執行緒 for(int i=0;i<numThreads;i++) { char id[32]; snprintf(id,sizeof(id),"%d",i+1); //push_back和emplace_back新增臨時變數的區別,假設a是個臨時變數 //push_back(a),會在vector/dequeue內部把a拷貝過來,然後臨時變數a被析構 //emplace_back(a),直接新增臨時變數,沒有拷貝操作,不會析構a m_threads.emplace_back(new thread(std::bind(&threadpool::runInThread,this),m_name+id)); m_threads[i]->start(); //執行執行緒,實際上是執行runInThread函式 } //numThreads為0,即不建立任務執行緒時,預設呼叫m_threadInitCallback()函式 if(numThreads==0 && m_threadInitCallback) m_threadInitCallback(); } void threadpool::stop() { //這裡踩一個坑一定要注意,mutexlockguard一定要析構把鎖釋放掉,也就是寫在區域性作用域中{} //下分來分析一下之前踩得坑,沒寫到區域性作用於會發生什麼.(死鎖) //當前執行緒執行join()時,子執行緒拿不到鎖一直不會結束,因此,join()得不到返回,mlg也不會析構, //m_mutex也不會得到釋放,形成死鎖 //當前執行緒拿著鎖等回收子執行緒,之後才能釋放鎖,而子執行緒需要鎖才能結束,因此形成死鎖 { mutexlockguard mlg(m_mutex); m_running=false; //指定執行緒池狀態為false,之後喚醒時所有執行緒都會直接退出 m_notFull.notifyAll(); m_notEmpty.notifyAll(); } //回收每個執行緒 for(auto& i:m_threads) i->join(); } //執行緒池任務佇列不為空就往裡面新增一個任務 void threadpool::run(Task task) { //執行緒池為空,直接執行即可,也不用同步啥的 if(m_threads.empty()) { task(); return; } //執行緒池不為空,所有執行緒爭搶這一任務 mutexlockguard mlg(m_mutex); while(m_running && isFull()) //執行緒池任務佇列滿了必須要等待 m_notFull.wait(); if(!m_running) //執行緒池狀態變成false,直接退出 { return; } assert(!isFull()); //把當前任務新增到任務佇列中去 m_queue.push_back(std::move(task)); m_notEmpty.notify(); //喚醒一個執行緒 } //從執行緒池任務佇列中拿出一個任務 threadpool::Task threadpool::take() { mutexlockguard mlg(m_mutex); //當執行緒池任務佇列為空時一直等待 //LOG_WARN<<currentthread::tid()<<"wait for task..."; while(m_running && m_queue.empty()) m_notEmpty.wait(); //LOG_WARN<<currentthread::tid()<<"got task..."; //無任務可取了, if(m_queue.empty())return NULL; Task task; task=m_queue.front(); m_queue.pop_front(); if(m_maxQueueSize>0) //喚醒一個執行緒 m_notFull.notify(); return task; } bool threadpool::isFull() const { m_mutex.assertLocked(); //保證m_mutex被當前執行緒所持有 return m_maxQueueSize>0 && m_queue.size()>=m_maxQueueSize; } void threadpool::runInThread() { try { if(m_threadInitCallback) m_threadInitCallback(); while(m_running) { Task task=this->take();//在任務佇列中拿出一個任務並執行它 //LOG_WARN<<"get one task"; if(task!=NULL) task(); //LOG_WARN<<"get one task done"; } } catch (const mymuduo::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", m_name.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTree()); abort(); }catch (const std::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", m_name.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); }catch(...) { fprintf(stderr, "unknown exception caught in ThreadPool %s\n", m_name.c_str()); throw; // rethrow } } }//namespace mymuduo
測試:
#include"base/threadpool.h" #include<iostream> void workerThread() { //sleep for 1 sec mymuduo::currentthread::sleepUsec(1000*1000); std::cout<<mymuduo::currentthread::name()<<" done\n"; } int main() { mymuduo::threadpool tp("testThreadPool"); tp.setMaxQueueSize(10); tp.start(5); for(int i=0;i<10;i++) tp.run(workerThread); mymuduo::currentthread::sleepUsec(5000*1000); //tp.stop(); }
列印結果:
testThreadPool1 done
testThreadPool3testThreadPool2 done
testThreadPool5 done
done
testThreadPool4 done
testThreadPool5 done
testThreadPool1 done
testThreadPool2 done
testThreadPool3 done
testThreadPool4 done