1. 程式人生 > >C++執行緒模型 one-loop-per-thread

C++執行緒模型 one-loop-per-thread

C++11引入了執行緒物件,使我們能夠在語言層面方便的執行執行緒操作,能夠將成員函式,函式物件,lambda函式都當作執行緒入口,從而不用再去在不同平臺上對系統API去做一些生硬的相容措施(比如將類指標當作執行緒引數傳入到執行緒函式中從而能夠訪問類成員)。在這裡,我對C++11的執行緒物件進行簡單的封裝,實現的簡單的執行緒操作,接著輔助於上一篇提到的訊息佇列,實現了支援訊息轉發的執行緒模型。首先看下簡單執行緒模型的封裝:

#ifndef HEADER_RUNNABLE
#define HEADER_RUNNABLE

#include <thread>

class CRunnable
{
public
: CRunnable() : _stop(false) {} virtual ~CRunnable() {} //執行緒基本操作 virtual void Start() { _stop = false; if (!_pthread) { _pthread = std::shared_ptr<std::thread>(new std::thread(std::bind(&CRunnable::Run, this))); } } virtual void Stop() { _stop = true
; } virtual void Join() { if (_pthread) { _pthread->join(); } } //執行緒主邏輯 virtual void Run() = 0; bool GetStop() { return _stop; } static void Sleep(int interval) { std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } protected
: CRunnable(const CRunnable&) = delete; CRunnable& operator=(const CRunnable&) = delete; protected: bool _stop; std::shared_ptr<std::thread> _pthread; }; #endif

在start 函式中建立執行緒物件,只要繼承這個類並在run中實現執行緒邏輯,就可在另一個執行緒中執行。在one-loop-per-thread中,我們的主邏輯類繼承之,在run函式中迴圈從訊息佇列中提取操作處理。下面我們繼承這個類新增一個訊息佇列:

#ifndef HEADER_RUNNABLEALONETASKLIST
#define HEADER_RUNNABLEALONETASKLIST

#include "Runnable.h"
#include "TaskQueue.h"

typedef std::function<void()> Task;
template<typename T = Task>
class CRunnableAloneTaskList : public CRunnable
{
public:
    CRunnableAloneTaskList() {}
    virtual ~CRunnableAloneTaskList() {}

    int GetTaskListSize() {
        return _task_list.Size();
    }

    //執行緒訊息投遞
    void Push(const T&& t) {
        _task_list.Push(t);
    }
    void Push(const T& t) {
        _task_list.Push(t);
    }

    //執行緒主邏輯
    virtual void Run() = 0;

protected:
    T _Pop() {
        return std::move(_task_list.Pop());
    }

    CRunnableAloneTaskList(const CRunnableAloneTaskList&) = delete;
    CRunnableAloneTaskList& operator=(const CRunnableAloneTaskList&) = delete;

private:
    CTaskQueue<T>           _task_list;         //每個執行緒都有自己的任務佇列
};
#endif

繼承這個類的派生類在run中迴圈的 從訊息佇列中Pop任務執行,其他執行緒往這個類中投遞訊息只需呼叫Push函式即可(訊息佇列是執行緒安全的),接著我們實現一個每個執行緒都單獨有訊息佇列並且支援往指定執行緒投遞訊息的的類:

#ifndef HEADER_RUNNABLEALONETASKLISTWITHPOST
#define HEADER_RUNNABLEALONETASKLISTWITHPOST

#include <mutex>
#include <map>

#include "Runnable.h"
#include "TaskQueue.h"

typedef std::function<void()> Task;

class CRunnableAloneTaskListWithPost : public CRunnable
{
public:
    CRunnableAloneTaskListWithPost() {}
    virtual ~CRunnableAloneTaskListWithPost() {}

    int GetTaskListSize() {
        return _task_list.Size();
    }

    virtual void Start();
    virtual void Stop();

    //執行緒訊息投遞
    void Push(const Task&& func) {
        _task_list.Push(func);
    }
    void Push(const Task& func) {
        _task_list.Push(func);
    }

    //執行緒主邏輯
    virtual void Run();

    std::thread::id GetId()const { return _id; }

    //向指定執行緒投遞任務
    static  bool PostTask(const std::thread::id& thread_id, const Task& func);

private:
    Task _Pop() {
        return std::move(_task_list.Pop());
    }

    CRunnableAloneTaskListWithPost(const CRunnableAloneTaskListWithPost&) = delete;
    CRunnableAloneTaskListWithPost& operator=(const CRunnableAloneTaskListWithPost&) = delete;

private:
    CTaskQueue<Task>        _task_list;         //每個執行緒都有自己的任務佇列
    std::thread::id         _id;

    static  std::mutex                                                  _map_mutex;     //_runnable_map訪問鎖
    static  std::map<std::thread::id, CRunnableAloneTaskListWithPost*>  _runnable_map;  //記錄執行緒物件,支援執行緒間訊息投遞
};

std::mutex CRunnableAloneTaskListWithPost::_map_mutex;
std::map<std::thread::id, CRunnableAloneTaskListWithPost*> CRunnableAloneTaskListWithPost::_runnable_map;

void CRunnableAloneTaskListWithPost::Start() {
    _stop = false;
    if (!_pthread) {
        _pthread = std::shared_ptr<std::thread>(new std::thread(std::bind(&CRunnableAloneTaskListWithPost::Run, this)));
    }

    Push([this]() {
        std::unique_lock<std::mutex> lock(_map_mutex);
        auto iter = _runnable_map.find(std::this_thread::get_id());
        if (iter == _runnable_map.end()) {
            _runnable_map[std::this_thread::get_id()] = this;
            _id = std::this_thread::get_id();
        }
    });
}

void CRunnableAloneTaskListWithPost::Stop() {
    Push([this]() {
        {
            std::unique_lock<std::mutex> lock(_map_mutex);
            auto iter = _runnable_map.find(std::this_thread::get_id());
            if (iter != _runnable_map.end()) {
                _runnable_map.erase(iter);
            }
        }
        Push(nullptr);
        _stop = true;
    });
}

bool CRunnableAloneTaskListWithPost::PostTask(const std::thread::id& thread_id, const Task& func) {
    std::unique_lock<std::mutex> lock(_map_mutex);
    auto iter = _runnable_map.find(thread_id);
    if (iter != _runnable_map.end()) {
        if (iter->second) {
            iter->second->Push(func);
            return true;
        }
    }
    return false;
}

void  CRunnableAloneTaskListWithPost::Run() {
    while (!_stop) {
        auto t = _Pop();
        if (t) {
            t();

        } else {
            break;
        }
    }
}
#endif

我們用一個靜態的map來儲存建立了的所有執行緒類,然後通過指定執行緒id來向指定的執行緒投放執行任務。這裡有點特殊的是儲存和釋放map中函式物件的時候需要建立個匿名函式投遞到執行緒中去操作,因為只有這樣才能使get_id函式獲取到的執行緒id是當前要操作的執行緒id。接下來我們再實現一個類公用一個訊息佇列,競爭訊息佇列中的執行任務:

#ifndef HEADER_RUNNABLESHARETASKLIST
#define HEADER_RUNNABLESHARETASKLIST

#include <mutex>
#include <map>

#include "TaskQueue.h"

typedef std::function<void()> Task;

template<typename T = Task>
class CRunnableShareTaskList : public CRunnable
{
public:
    explicit CRunnableShareTaskList(int channel);
    virtual ~CRunnableShareTaskList();

    int GetTaskListSize() {
        std::unique_lock<std::mutex> lock(_map_mutex);
        return _task_list_map[_channel].first->Size();
    }

    //執行緒訊息投遞
    void Push(const T&& func) {
        std::unique_lock<std::mutex> lock(_map_mutex);
        _task_list_map[_channel].first->Push(func);
    }

    void Push(const T& func) {
        std::unique_lock<std::mutex> lock(_map_mutex);
        _task_list_map[_channel].first->Push(func);
    }

    //執行緒主邏輯
    virtual void Run() = 0;

private:
    T _Pop() {
        return std::move(_task_list_map[_channel].first->Pop());
    }

    CRunnableShareTaskList(const CRunnableShareTaskList&) = delete;
    CRunnableShareTaskList& operator=(const CRunnableShareTaskList&) = delete;

private:
    int                             _channel;
    bool                            _stop;
    std::shared_ptr<std::thread>    _pthread;

    static  std::mutex                                                          _map_mutex;     //_runnable_map訪問鎖
    static  std::map<int, std::pair<std::shared_ptr<CTaskQueue<T>>, int>>       _task_list_map; //共享任務佇列 channel TaskQueue TaskQueue的共享個數
};

template<typename T>
std::mutex CRunnableShareTaskList<T>::_map_mutex;
template<typename T>
std::map<int, std::pair<std::shared_ptr<CTaskQueue<T>>, int>> CRunnableShareTaskList<T>::_task_list_map;

template<typename T>
CRunnableShareTaskList<T>::CRunnableShareTaskList(int channel) : _stop(false), _channel(channel) {
    std::unique_lock<std::mutex> lock(_map_mutex);
    auto iter = _task_list_map.find(_channel);
    if (iter != _task_list_map.end()) {
        iter->second.second++;

    } else {
        _task_list_map[_channel] = std::make_pair(std::shared_ptr<CTaskQueue<Task>>(new CTaskQueue<Task>), 1);
    }
}

template<typename T>
CRunnableShareTaskList<T>::~CRunnableShareTaskList() {
    std::unique_lock<std::mutex> lock(_map_mutex);
    auto iter = _task_list_map.find(_channel);
    if (iter != _task_list_map.end()) {
        iter->second.second--;
        if (iter->second.second == 0) {
            _task_list_map.erase(iter);
        }
    }
}
#endif

用一個map儲存公用的訊息佇列資訊,用channel來區分共享的訊息佇列。這兩個類可能有點想當然了,並沒有考慮到多少實際的使用場景。
老慣例,下邊是測試使用程式碼:

#include "RunnableShareTaskList.h"
#include "RunnableAloneTaskListWithPost.h"
int main() {
    CRunnableAloneTaskListWithPost run[2];
    run[0].Start();
    run[1].Start();

    CMemaryPool* pool = NULL;
    Task task = [&pool]() {
        if (pool) {
            delete pool;
        }
    };
    run[0].Push([&pool]() {
        pool = new CMemaryPool; 
        int a = 0;
        a++;
    });

    run[1].Push([&pool, task]() { 
        CRunnableAloneTaskListWithPost::PostTask(pool->GetCreateThreadId(), task);
    });

    run[0].Join();
    run[1].Join();
}

int main() {
    CRunnableShareTaskList<> run1(1);
    CRunnableShareTaskList<> run2(1);
    run1.Start();
    run2.Start();

    CMemaryPool* pool = NULL;
    Task task = [&pool]() {
        if (pool) {
            delete pool;
        }
    };
    run1.Push([&pool]() {
        pool = new CMemaryPool;
        int a = 0;
        a++;
    });

    run1.Push([&pool]() {
        CRunnableShareTaskList<>::Sleep(1000);
        if (pool) {
            delete pool;
        }
    });

    run1.Stop();
    run2.Join();
}

CRunnableShareTaskList 的物件是建立不出來的,得派生實現run函式~~.接下來我們通過繼承執行緒類來實現一個實際的例子 日誌執行緒,這個放到下一篇中說……
GitHub:https://github.com/caozhiyi/Base