C++執行緒池實現
最近讀了muduo的原始碼,看了一下其中執行緒池的是實現。其中互斥量、條件變數都是庫裡面自己封裝的,正好現在C++標準庫裡面有對應的類,所以就改造了一下,補充了部分註釋。同時總結了一下條件變數和鎖的使用。程式碼如下:
ThreadPool.h
#pragma once
#include <deque>
#include <vector>
#include <string>
#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>
namespace muduo
{
using namespace std;
class CThreadPool
{
public:
// 入參為空的任務型別
typedef function<void()> CTask;
// 構造及解構函式
explicit CThreadPool(const string& nameArg = string("ThreadPool"));
~CThreadPool();
// 設定任務佇列的數量上限
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
// 設定工作執行緒初次執行時的訊息回撥
void setThreadInitCallback(const CTask& cb)
{
threadInitCallback_ = cb;
}
// 開啟指定數量的工作執行緒
void start(int numThreads);
// 退出執行緒池執行,退出所有的工作執行緒
void stop();
// 執行緒池名稱
const string& name() const
{
return name_;
}
// 任務佇列多少
size_t queueSize() const;
// 新任務入隊,若佇列已滿,將堵塞介面呼叫執行緒
void run(const CTask& f);
private:
// 禁止拷貝構造
CThreadPool(const CThreadPool&) = delete;
CThreadPool& operator=(const CThreadPool&) = delete;
// 判斷佇列是否已滿,可採用lambda表示式替代
bool isFull() const;
// 完成任務到執行緒的分發
// 執行緒池物件維護了任務佇列,工作執行緒在執行時需要從任務佇列中獲取工作任務,
// 所以只能將執行緒的執行體與執行緒池物件的介面關聯,完成工作任務獲取;
void runInThread();
// 從任務佇列中取任務執行
CThreadPool::CTask take();
private:
mutable mutex mutex_; // 任務佇列的互斥鎖
condition_variable notEmpty_; // 當前佇列為空的條件變數,工作執行緒取任務時空則等待;
condition_variable notFull_; // 佇列已滿的條件變數,任務如隊時,佇列滿則等待;
string name_; // 執行緒池名稱
CTask threadInitCallback_; // 工作執行緒初次執行的回撥
vector<thread*> threads_; // 工作執行緒集
deque<CTask> queue_; // 任務佇列
size_t maxQueueSize_; //任務量上限
bool running_; // 執行緒池是否在執行
};
}
ThreadPool.cpp
#include "MyThreadPool.h"
#include <assert.h>
#include <sstream>
#include <iostream>
using namespace muduo;
CThreadPool::CThreadPool(const string& nameArg)
:name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}
CThreadPool::~CThreadPool()
{
if (running_)
{
stop();
}
}
void CThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true;
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
// 將執行緒的執行過程繫結到執行緒池的runInThread
threads_.push_back(new thread(&CThreadPool::runInThread, this));
}
if (numThreads == 0)
{
threadInitCallback_();
}
}
void CThreadPool::stop()
{
{
lock_guard<mutex> lock(mutex_);
running_ = false;
// 喚醒所有的工作執行緒
notEmpty_.notify_all();
}
//等待所有執行執行緒退出
for each (auto ptr_thread in threads_)
{
ptr_thread->join();
}
}
size_t CThreadPool::queueSize() const
{
lock_guard<mutex> lck(mutex_);
return queue_.size();
}
void CThreadPool::run(const CTask& task)
{
if (!task)
{
return;
}
if (threads_.empty())
{
// 當前可執行執行緒數為0,當前主執行緒子集執行這個任務;
task();
}
else
{
unique_lock<mutex> lck(mutex_);
notFull_.wait(lck, [&]() { return !isFull(); });
// 將task入隊,喚醒執行執行緒
queue_.push_back(task);
lck.unlock();
notEmpty_.notify_one();
}
}
CThreadPool::CTask CThreadPool::take()
{
unique_lock<mutex> lock(mutex_);
// 若執行緒池需要退出,也不需要再等待
notEmpty_.wait(lock, [&](){ return !queue_.empty() || !running_; });
CTask task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0)
{
//條件變數的分發不需要對互斥量加鎖,以免喚醒的執行緒再次進入wait狀態
lock.unlock();
notFull_.notify_one();
}
}
return task;
}
bool CThreadPool::isFull() const
{
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}
void CThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)
{
CTask task(take());
if (task)
{
task();
}
}
}
catch (...)
{
fprintf(stderr, "unknown exception caught in CThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}
// 測試程式碼
// 1. 執行函式定義
#define Func(index) \
void func_##index() \
{\
ostringstream ss;\
ss << "=========== thread: " << std::this_thread::get_id() << std::endl;\
std::cout << ss.str() << std::endl;\
std::cout.flush();\
}\
Func(1)
Func(2)
//2. 執行任務入佇列
#define RUN_IN_POOL(index) \
do{ tp.run(std::function<void()>(&func_##index)); }while(0)
void main()
{
CThreadPool tp;
tp.start(2);
tp.setMaxQueueSize(3);
RUN_IN_POOL(1);
RUN_IN_POOL(2);
// 睡眠一下,否則立即呼叫stop,部分任務還未執行完
Sleep(1000);
tp.stop();
system("pause");
}
上述實現存在幾個問題:
1. task類為無參的function型別,與其他可執行物件相比,無法儲存狀態;不能適配需要入參的場景;
2. 所有的成員變數的讀寫都用同一個互斥量保護,在工作執行緒較多情況下,執行效率不高;
3. run函式呼叫,插入待執行任務時,若當前任務佇列已滿,會將呼叫執行緒掛起;此時若呼叫該執行緒GUI執行緒,將導致介面卡死。
幾點總結:
1. 條件變數的使用
a. 對共享變數修改的執行順序
1) 通過lock_guard對互斥量mutex加鎖;
2) 加鎖完成後對變數進行修改;
3) 呼叫notify_one或notify_all介面喚醒在std::condition_variable上等待的執行緒(對外發送訊息通知時,不需要持有鎖)。
為了能夠正確通知條件變數,即便共享變數為原子變數,也必須在對mutex互斥量加鎖情況下進行修改。
b. 在條件變數上等待的執行緒
1) 通過unique_lock對同一個mutex上進行加鎖,已保護共享變數;
2) 執行wait、wait_for或者wait_until,此時自動釋放mutex,將執行緒掛起;
3) 某條件變數發出通知後,將當前執行緒喚醒,並自動獲取mutex。同時,必須對條件變數進行判斷,以避免為虛假喚醒。
2. lock_guard和unique_lock的差別
lock_guard和unique_lock都能實現互斥量的加鎖操作。但unique_lock在guard_lock上做了擴充套件,支援鎖狀態的轉移,鎖狀態的自動釋放和獲取。wait介面呼叫時,需要傳入unique_lock型別的入參。