[C++]固定大小執行緒池
執行緒池模型
執行緒池是併發程式設計中常用的模型。
執行緒是一種非常寶貴的資源,建立、銷燬執行緒都是非常消耗時間的操作,所以我們的一個思路是在程式start up的時候,建立一個儲存有多個執行緒的快取,這樣程式執行時就不會頻繁的發生建立和銷燬執行緒的操作,從而提高了併發的效率。
儲存有多個執行緒的這個快取,我們一般稱其為執行緒池(ThreadPool)。如果執行緒池的中執行緒的數量可以動態變化,我們稱其為動態大小的執行緒池,這裡討論並實現的是固定大小的執行緒池
執行緒池中維護多個執行緒(thread)的同時,維護一個任務佇列。所謂的任務佇列,就是需要我們去併發執行的一個個的任務,說的通俗一點,就是等待執行的一個個函式。一旦任務佇列不空,取出一個任務給定某個空閒的執行緒去執行該任務。
這是一個典型的生產者和消費者的模型,所以我們要用到mutex和conditon variable原語。除此之外,我們還應該對“任務”有一個合理的抽象。
所以執行緒池這個類的資料成員應該有如下幾個:
1.mutex_t mutex
2.condition_t cond
3.vector_t<thread_t> threadVec
4.queue_t<task_t> taskQ
其中thread_t在C++11中可以使用std::thread來替代,可是這裡的task_t我們還沒有定義。
實際上,task_t是對某個執行過程的抽象,所以我們可以用C++11中的function語義來替代:
using task_t = function<void()>
資料成員定義完畢,那麼一個執行緒池類應該支援什麼樣的操作呢。
1.Start()
2.Run(Task)
3.Stop()
Start
Start操作是抽象了初始化的操作,在構造出一個執行緒池物件之後,我們需要對taskQ和threadVec做出初始化操作。taskQ的初始化實際上就是置空,threadVec的初始化有些複雜,我們需要建立一些std::thread的物件。
如果你瞭解thread的建立你可能會有一個困惑,建立thread時需要給定一個函式指標,意味著該thread將併發執行該函式指標指向的函式,那麼這裡的這個執行緒需要執行的函式是什麼呢?
我們再來明確一下執行緒池的作用,一旦任務佇列不空,我們就取出一個任務扔給某個空閒的執行緒,讓該執行緒去執行任務。
我們稱這些執行緒為worker,對應生產者消費者模型中的消費者,它消費生產佇列(taskQ)中的物品(某個具體的task),所以建立thread給定的函式指標指向的就是:消費者從生產佇列取出task並執行的這個過程。
Start()
{
for(i < nThreadCount)
create thread(fetch_and_consume)
threadVec.push(thread)
thread.run()
}
fetch_and_consume()
{
lock(mutex)
take one task
execute the task
}
Run(task)
這個函式抽象的是生產者向生產佇列中加入task的過程,暴露給使用執行緒池的使用者使用。
Run(task)
{
lock(mutex)
taskQ.Add(task)
cond.notify()
}
Stop()
Stop會停止執行緒池的運作,一般線上程池的解構函式中主動呼叫,為了防止執行緒池析構時各worker執行緒還沒完成他們的task,所以我們一般會在Stop中join各個worker執行緒。
Stop()
{
for all worker in threadVec
worker.join()
}
實現
#ifndef SIXDAY_THREAD_POOL_H
#define SIXDAY_THREAD_POOL_H
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <cstdint>
namespace sixday
{
class FixSizeThreadPool
{
public:
using Task = std::function<void()>;
public:
explicit FixSizeThreadPool(int32_t nFixedSize);
~FixSizeThreadPool();
FixSizeThreadPool() = delete;
FixSizeThreadPool(const FixSizeThreadPool&) = delete;
FixSizeThreadPool(const FixSizeThreadPool&&) = delete;
FixSizeThreadPool& operator=(const FixSizeThreadPool&) = delete;
FixSizeThreadPool& operator=(const FixSizeThreadPool&&) = delete;
void Start();
void Run(const Task& task);
void Stop();
private:
std::vector<std::thread> m_Worker;
std::queue<Task> m_TaskQ;
std::mutex m_Mutex;
std::condition_variable m_Cond;
bool m_bIsRunning;
int32_t m_nWorkerCount;
void RunInThread();
Task Take();
};
}
#endif // ! SIXDAY_THREAD_POOL_H
#include "ThreadPool.h"
#include <cassert>
namespace sixday
{
FixSizeThreadPool::FixSizeThreadPool(int32_t nFixedSize)
{
assert(nFixedSize > 0);
m_Worker.reserve(nFixedSize);
m_nWorkerCount = nFixedSize;
m_bIsRunning = false;
}
FixSizeThreadPool::~FixSizeThreadPool()
{
if (m_bIsRunning)
{
Stop();
}
}
void FixSizeThreadPool::Start()
{
assert(m_nWorkerCount > 0);
m_bIsRunning = true;
for (int32_t i = 0; i < m_nWorkerCount; ++i)
{
auto func = std::bind(&FixSizeThreadPool::RunInThread, this);
m_Worker.push_back(std::thread(func));
}
}
void FixSizeThreadPool::Run(const Task & task)
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_TaskQ.push(task);
m_Cond.notify_one();
}
void FixSizeThreadPool::Stop()
{
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_bIsRunning = false;
m_Cond.notify_all();
}
for (auto& thread : m_Worker)
{
thread.join();
}
}
void FixSizeThreadPool::RunInThread()
{
while (m_bIsRunning)
{
Task task = Take();
if (task != nullptr)
{
task();
}
}
}
FixSizeThreadPool::Task FixSizeThreadPool::Take()
{
std::unique_lock<std::mutex> lock(m_Mutex);
while (m_TaskQ.empty() && !m_bIsRunning)
{
m_Cond.wait(lock);
}
Task task = nullptr;
if (!m_TaskQ.empty())
{
task = m_TaskQ.front();
m_TaskQ.pop();
}
return task;
}
}
//Main.cpp
#include "CountDownLatch.h"
#include "ThreadPool.h"
#include <cstdio>
using namespace sixday;
static const int32_t LoopMax = 1000000;
void PrintHello()
{
for(int32_t i = 0 ; i < LoopMax; ++i)
printf("hello\n");
}
void PrintWorld()
{
for (int32_t i = 0; i < LoopMax; ++i)
printf("world\n");
}
void PrintSay()
{
for (int32_t i = 0; i < LoopMax; ++i)
printf("say\n");
}
void PrintName()
{
for (int32_t i = 0; i < LoopMax; ++i)
printf("fancy\n");
}
int main()
{
CountDownLatch latch(1);
FixSizeThreadPool threadpool(5);
threadpool.Start();
threadpool.Run(PrintHello);
threadpool.Run(PrintWorld);
threadpool.Run(PrintName);
threadpool.Run(PrintSay);
latch.Wait();
return 0;
}