1. 程式人生 > 實用技巧 >muduo原始碼解析16-threadpool類

muduo原始碼解析16-threadpool類

threadpool類:

class threadpool:noncopyable
{
};

作用:

利用mymuduo::thread 完成對於執行緒池的封裝
執行緒池內部成員:
執行緒集合m_threads: 用於儲存執行緒池內的所有執行緒
執行緒池任務佇列m_queue 表示待執行的任務佇列
條件變數:m_notFull,m_notEmpty, 條件變數,用於對任務佇列同步操作,感覺用一個也可以,就是判斷一下m_queue.size()即可
互斥鎖:m_mutex 用於和條件變數一起使用
其他成員:執行緒池的名字,執行狀態,最大任務佇列長度

執行緒池提供的操作:
設定任務佇列的最大長度

設定執行緒初始化回撥函式
啟動執行緒池,
關閉執行緒池
執行一個任務,
獲取執行緒池的基本資訊,名字,執行狀態...

成員變數:

private:
    mutable mutexlock m_mutex;          //配合條件變數使用的互斥鎖
    condition m_notEmpty;               //條件變數,任務列隊不空
    condition m_notFull;                //條件變數,任務列隊不滿
    string m_name;                      //執行緒池名字

    Task m_threadInitCallback;          
//任務函式,std::function<void()> //執行緒池內執行緒,用智慧指標,在vector中儲存 std::vector<std::unique_ptr<mymuduo::thread>> m_threads; std::deque<Task> m_queue; //執行緒池內的任務佇列 size_t m_maxQueueSize; //任務佇列最大長度 bool m_running; //執行緒池正在執行?

成員函式:

public
: typedef std::function<void()> Task; //C++中,有時可以將建構函式用作自動型別轉換函式。但這種自動特性並非總是合乎要求的,有時會導致意外的型別轉換 //被explicit關鍵字修飾的類建構函式,不能進行自動地隱式型別轉換,只能顯式地進行型別轉換。 explicit threadpool(const string& nameArg=string("ThreadPool")); ~threadpool(); //設定最大任務佇列長度 void setMaxQueueSize(int maxSize){m_maxQueueSize=maxSize;} //設定回撥函式 void setThreadInitCallback(const Task& cb){m_threadInitCallback=cb;} //執行緒池啟動,建立numThreads個執行緒跑起來 void start(int numThreads); //回收子執行緒,並關閉執行緒池 void stop(); //獲得一些執行緒池相關資訊,名字,任務佇列大小 const string& name() const{return m_name;} size_t queueSize() const { mutexlockguard mlg(m_mutex); return m_queue.size(); } // Could block if maxQueueSize > 0 // Call after stop() will return immediately. // There is no move-only version of std::function in C++ as of C++14. // So we don't need to overload a const& and an && versions // as we do in (Bounded)BlockingQueue. // https://stackoverflow.com/a/25408989 //執行一個任務,實質是向任務佇列中新增一個任務 void run(Task f); private: //判斷佇列是否滿了 bool isFull() const; //執行緒池執行緒執行的函式 void runInThread(); //從任務佇列中取出一個任務,實質是取出一個函式在runInThread函式裡執行 Task take();

threadpool.h:

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include"base/condition.h"
#include"base/mutex.h"
#include"base/thread.h"
#include"base/types.h"

#include<deque>
#include<vector>

namespace mymuduo{

class threadpool:noncopyable
{
public:
    typedef std::function<void()> Task;
    //C++中,有時可以將建構函式用作自動型別轉換函式。但這種自動特性並非總是合乎要求的,有時會導致意外的型別轉換
    //被explicit關鍵字修飾的類建構函式,不能進行自動地隱式型別轉換,只能顯式地進行型別轉換。
    explicit threadpool(const string& nameArg=string("ThreadPool"));
    ~threadpool();
    //設定最大任務佇列長度
    void setMaxQueueSize(int maxSize){m_maxQueueSize=maxSize;}
    //設定回撥函式
    void setThreadInitCallback(const Task& cb){m_threadInitCallback=cb;}
    //執行緒池啟動,建立numThreads個執行緒跑起來
    void start(int numThreads);
    //回收子執行緒,並關閉執行緒池
    void stop();

    //獲得一些執行緒池相關資訊,名字,任務佇列大小
    const string& name() const{return m_name;}
    size_t queueSize() const
    {
        mutexlockguard mlg(m_mutex);
        return m_queue.size();
    }

    // Could block if maxQueueSize > 0
    // Call after stop() will return immediately.
    // There is no move-only version of std::function in C++ as of C++14.
    // So we don't need to overload a const& and an && versions
    // as we do in (Bounded)BlockingQueue.
    // https://stackoverflow.com/a/25408989
    //執行一個任務,實質是向任務佇列中新增一個任務
    void run(Task f);

private:
    //判斷佇列是否滿了
    bool isFull() const;
    //執行緒池執行緒執行的函式
    void runInThread();
    //從任務佇列中取出一個任務,實質是取出一個函式在runInThread函式裡執行
    Task take();

    mutable mutexlock m_mutex;          //配合條件變數使用的互斥鎖
    condition m_notEmpty;               //條件變數,任務列隊不空
    condition m_notFull;                //條件變數,任務列隊不滿
    string m_name;                      //執行緒池名字

    Task m_threadInitCallback;          //任務函式,std::function<void()>
    //執行緒池內執行緒,用智慧指標,在vector中儲存
    std::vector<std::unique_ptr<mymuduo::thread>> m_threads;
    std::deque<Task> m_queue;           //執行緒池內的任務佇列
    size_t m_maxQueueSize;              //任務佇列最大長度
    bool m_running;                     //執行緒池正在執行?

};//namespace mymuduo

}

#endif // THREADPOOL_H

threadpool.cpp:

#include "threadpool.h"
#include"base/logging.h"
#include"base/exception.h"

#include<assert.h>
#include<stdio.h>

namespace mymuduo{

//建構函式僅用於初始化各種成員變數,其他什麼也不做
threadpool::threadpool(const string& nameArg)
    :m_mutex(),m_notEmpty(m_mutex),m_notFull(m_mutex),m_name(nameArg),
      m_maxQueueSize(0),m_running(false)
{

}
threadpool::~threadpool()
{
    if(m_running)       //只有執行緒池處於執行狀態才可以關閉
        this->stop();
}

//啟動執行緒池,建立numThreads個執行緒
void threadpool::start(int numThreads)
{
    assert(m_threads.empty());
    m_running=true;
    m_threads.reserve(numThreads);

    //建立numThreads個執行緒
    for(int i=0;i<numThreads;i++)
    {
        char id[32];
        snprintf(id,sizeof(id),"%d",i+1);
        //push_back和emplace_back新增臨時變數的區別,假設a是個臨時變數
        //push_back(a),會在vector/dequeue內部把a拷貝過來,然後臨時變數a被析構
        //emplace_back(a),直接新增臨時變數,沒有拷貝操作,不會析構a
        m_threads.emplace_back(new thread(std::bind(&threadpool::runInThread,this),m_name+id));
        m_threads[i]->start();  //執行執行緒,實際上是執行runInThread函式
    }

    //numThreads為0,即不建立任務執行緒時,預設呼叫m_threadInitCallback()函式
    if(numThreads==0 && m_threadInitCallback)
        m_threadInitCallback();
}

void threadpool::stop()
{

    //這裡踩一個坑一定要注意,mutexlockguard一定要析構把鎖釋放掉,也就是寫在區域性作用域中{}
    //下分來分析一下之前踩得坑,沒寫到區域性作用於會發生什麼.(死鎖)

    //當前執行緒執行join()時,子執行緒拿不到鎖一直不會結束,因此,join()得不到返回,mlg也不會析構,
    //m_mutex也不會得到釋放,形成死鎖

    //當前執行緒拿著鎖等回收子執行緒,之後才能釋放鎖,而子執行緒需要鎖才能結束,因此形成死鎖
    {
    mutexlockguard mlg(m_mutex);
    m_running=false;            //指定執行緒池狀態為false,之後喚醒時所有執行緒都會直接退出
    m_notFull.notifyAll();
    m_notEmpty.notifyAll();
    }
    //回收每個執行緒
    for(auto& i:m_threads)
        i->join();
}

//執行緒池任務佇列不為空就往裡面新增一個任務
void threadpool::run(Task task)
{
    //執行緒池為空,直接執行即可,也不用同步啥的
    if(m_threads.empty())
    {
        task();
        return;
    }
    //執行緒池不為空,所有執行緒爭搶這一任務
    mutexlockguard mlg(m_mutex);
    while(m_running && isFull())    //執行緒池任務佇列滿了必須要等待
        m_notFull.wait();
    if(!m_running)                  //執行緒池狀態變成false,直接退出
    {
        return;
    }
    assert(!isFull());
    //把當前任務新增到任務佇列中去
    m_queue.push_back(std::move(task));
    m_notEmpty.notify();    //喚醒一個執行緒

}

//從執行緒池任務佇列中拿出一個任務
threadpool::Task threadpool::take()
{
    mutexlockguard mlg(m_mutex);
    //當執行緒池任務佇列為空時一直等待
    //LOG_WARN<<currentthread::tid()<<"wait for task...";
    while(m_running && m_queue.empty())
        m_notEmpty.wait();
    //LOG_WARN<<currentthread::tid()<<"got task...";
    //無任務可取了,
    if(m_queue.empty())return NULL;

    Task task;
    task=m_queue.front();
    m_queue.pop_front();
    if(m_maxQueueSize>0)    //喚醒一個執行緒
        m_notFull.notify();
    return task;
}

bool threadpool::isFull() const
{
    m_mutex.assertLocked(); //保證m_mutex被當前執行緒所持有
    return m_maxQueueSize>0 && m_queue.size()>=m_maxQueueSize;
}

void threadpool::runInThread()
{
    try {
        if(m_threadInitCallback)
            m_threadInitCallback();
        while(m_running)
        {
            Task task=this->take();//在任務佇列中拿出一個任務並執行它
            //LOG_WARN<<"get one task";
            if(task!=NULL)
                task();
            //LOG_WARN<<"get one task done";
        }

    } catch (const mymuduo::exception& ex) {
        fprintf(stderr, "exception caught in ThreadPool %s\n", m_name.c_str());
        fprintf(stderr, "reason: %s\n", ex.what());
        fprintf(stderr, "stack trace: %s\n", ex.stackTree());
        abort();
    }catch (const std::exception& ex)
    {
        fprintf(stderr, "exception caught in ThreadPool %s\n", m_name.c_str());
        fprintf(stderr, "reason: %s\n", ex.what());
        abort();
    }catch(...)
    {
        fprintf(stderr, "unknown exception caught in ThreadPool %s\n", m_name.c_str());
        throw; // rethrow
    }
}

}//namespace mymuduo

測試:

#include"base/threadpool.h"

#include<iostream>

void workerThread()
{
    //sleep for 1 sec
    mymuduo::currentthread::sleepUsec(1000*1000);
    std::cout<<mymuduo::currentthread::name()<<" done\n";
}

int main()
{
    mymuduo::threadpool tp("testThreadPool");
    tp.setMaxQueueSize(10);
    tp.start(5);

    for(int i=0;i<10;i++)
        tp.run(workerThread);

    mymuduo::currentthread::sleepUsec(5000*1000);
    //tp.stop();

}

列印結果:

testThreadPool1 done
testThreadPool3testThreadPool2 done
testThreadPool5 done
done
testThreadPool4 done
testThreadPool5 done
testThreadPool1 done
testThreadPool2 done
testThreadPool3 done
testThreadPool4 done