1. 程式人生 > >C++執行緒池實現

C++執行緒池實現

最近讀了muduo的原始碼,看了一下其中執行緒池的是實現。其中互斥量、條件變數都是庫裡面自己封裝的,正好現在C++標準庫裡面有對應的類,所以就改造了一下,補充了部分註釋。同時總結了一下條件變數和鎖的使用。程式碼如下:
ThreadPool.h

#pragma once

#include <deque>
#include <vector>
#include <string>
#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>
namespace muduo { using namespace std; class CThreadPool { public: // 入參為空的任務型別 typedef function<void()> CTask; // 構造及解構函式 explicit CThreadPool(const string& nameArg = string("ThreadPool")); ~CThreadPool(); // 設定任務佇列的數量上限 void
setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; } // 設定工作執行緒初次執行時的訊息回撥 void setThreadInitCallback(const CTask& cb) { threadInitCallback_ = cb; } // 開啟指定數量的工作執行緒 void start(int numThreads); // 退出執行緒池執行,退出所有的工作執行緒 void
stop(); // 執行緒池名稱 const string& name() const { return name_; } // 任務佇列多少 size_t queueSize() const; // 新任務入隊,若佇列已滿,將堵塞介面呼叫執行緒 void run(const CTask& f); private: // 禁止拷貝構造 CThreadPool(const CThreadPool&) = delete; CThreadPool& operator=(const CThreadPool&) = delete; // 判斷佇列是否已滿,可採用lambda表示式替代 bool isFull() const; // 完成任務到執行緒的分發 // 執行緒池物件維護了任務佇列,工作執行緒在執行時需要從任務佇列中獲取工作任務, // 所以只能將執行緒的執行體與執行緒池物件的介面關聯,完成工作任務獲取; void runInThread(); // 從任務佇列中取任務執行 CThreadPool::CTask take(); private: mutable mutex mutex_; // 任務佇列的互斥鎖 condition_variable notEmpty_; // 當前佇列為空的條件變數,工作執行緒取任務時空則等待; condition_variable notFull_; // 佇列已滿的條件變數,任務如隊時,佇列滿則等待; string name_; // 執行緒池名稱 CTask threadInitCallback_; // 工作執行緒初次執行的回撥 vector<thread*> threads_; // 工作執行緒集 deque<CTask> queue_; // 任務佇列 size_t maxQueueSize_; //任務量上限 bool running_; // 執行緒池是否在執行 }; }

ThreadPool.cpp

#include "MyThreadPool.h"
#include <assert.h>
#include <sstream>
#include <iostream>

using namespace muduo;

CThreadPool::CThreadPool(const string& nameArg)
    :name_(nameArg),
    maxQueueSize_(0),
    running_(false)
{
}

CThreadPool::~CThreadPool()
{
    if (running_)
    {
        stop();
    }
}

void CThreadPool::start(int numThreads)
{
    assert(threads_.empty());
    running_ = true;
    threads_.reserve(numThreads);

    for (int i = 0; i < numThreads; ++i)
    {
        // 將執行緒的執行過程繫結到執行緒池的runInThread
        threads_.push_back(new thread(&CThreadPool::runInThread, this));
    }
    if (numThreads == 0)
    {
        threadInitCallback_();
    }
}

void CThreadPool::stop()
{
    {
        lock_guard<mutex> lock(mutex_);
        running_ = false;
        // 喚醒所有的工作執行緒
        notEmpty_.notify_all();
    }

    //等待所有執行執行緒退出
    for each (auto ptr_thread in threads_)
    {
        ptr_thread->join();
    }
}

size_t CThreadPool::queueSize() const
{
    lock_guard<mutex> lck(mutex_);
    return queue_.size();
}

void CThreadPool::run(const CTask& task)
{
    if (!task)
    {
        return;
    }

    if (threads_.empty())
    {
        // 當前可執行執行緒數為0,當前主執行緒子集執行這個任務;
        task();
    }
    else
    {
        unique_lock<mutex> lck(mutex_);
        notFull_.wait(lck, [&]() { return !isFull(); });

        // 將task入隊,喚醒執行執行緒
        queue_.push_back(task);
        lck.unlock();
        notEmpty_.notify_one();
    }
}

CThreadPool::CTask CThreadPool::take()
{
    unique_lock<mutex> lock(mutex_);
    // 若執行緒池需要退出,也不需要再等待
    notEmpty_.wait(lock, [&](){ return !queue_.empty() || !running_; });

    CTask task;
    if (!queue_.empty())
    {
        task = queue_.front();
        queue_.pop_front();
        if (maxQueueSize_ > 0)
        {
            //條件變數的分發不需要對互斥量加鎖,以免喚醒的執行緒再次進入wait狀態
            lock.unlock();
            notFull_.notify_one();
        }
    }
    return task;
}

bool CThreadPool::isFull() const
{
    return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}

void CThreadPool::runInThread()
{
    try
    {
        if (threadInitCallback_)
        {
            threadInitCallback_();
        }

        while (running_)
        {
            CTask task(take());
            if (task)
            {
                task();
            }
        }
    }
    catch (...)
    {
        fprintf(stderr, "unknown exception caught in CThreadPool %s\n", name_.c_str());
        throw; // rethrow
    }
}

// 測試程式碼
// 1. 執行函式定義
#define Func(index) \
void func_##index() \
{\
    ostringstream ss;\
    ss << "=========== thread: " << std::this_thread::get_id() << std::endl;\
    std::cout << ss.str() << std::endl;\
    std::cout.flush();\
}\

Func(1)
Func(2)

//2. 執行任務入佇列
#define RUN_IN_POOL(index) \
do{ tp.run(std::function<void()>(&func_##index)); }while(0)

void main()
{
    CThreadPool tp;
    tp.start(2);
    tp.setMaxQueueSize(3);

    RUN_IN_POOL(1);
    RUN_IN_POOL(2);

    // 睡眠一下,否則立即呼叫stop,部分任務還未執行完
    Sleep(1000);
    tp.stop();
    system("pause");
}

上述實現存在幾個問題:
1. task類為無參的function型別,與其他可執行物件相比,無法儲存狀態;不能適配需要入參的場景;
2. 所有的成員變數的讀寫都用同一個互斥量保護,在工作執行緒較多情況下,執行效率不高;
3. run函式呼叫,插入待執行任務時,若當前任務佇列已滿,會將呼叫執行緒掛起;此時若呼叫該執行緒GUI執行緒,將導致介面卡死。

幾點總結:
1. 條件變數的使用
a. 對共享變數修改的執行順序
1) 通過lock_guard對互斥量mutex加鎖;
2) 加鎖完成後對變數進行修改;
3) 呼叫notify_one或notify_all介面喚醒在std::condition_variable上等待的執行緒(對外發送訊息通知時,不需要持有鎖)。
為了能夠正確通知條件變數,即便共享變數為原子變數,也必須在對mutex互斥量加鎖情況下進行修改。
b. 在條件變數上等待的執行緒
1) 通過unique_lock對同一個mutex上進行加鎖,已保護共享變數;
2) 執行wait、wait_for或者wait_until,此時自動釋放mutex,將執行緒掛起;
3) 某條件變數發出通知後,將當前執行緒喚醒,並自動獲取mutex。同時,必須對條件變數進行判斷,以避免為虛假喚醒。
2. lock_guard和unique_lock的差別
lock_guard和unique_lock都能實現互斥量的加鎖操作。但unique_lock在guard_lock上做了擴充套件,支援鎖狀態的轉移,鎖狀態的自動釋放和獲取。wait介面呼叫時,需要傳入unique_lock型別的入參。