c++11實現執行緒池
阿新 • • 發佈:2019-01-26
測試程式
//
// main.cpp
//
#include <iostream> // std::cout, std::endl
#include <vector> // std::vector
#include <string> // std::string
#include <future> // std::future
#include <thread> // std::this_thread::sleep_for
#include <chrono> // std::chrono::seconds
#include "ThreadPool.h"
int main()
{
// 建立一個能夠併發執行四個執行緒的執行緒池
ThreadPool pool(4);
// 建立併發執行執行緒的結果列表
std::vector< std::future<std::string> > results;
// 啟動八個需要執行的執行緒任務
for(int i = 0; i < 8; ++i) {
// 將併發執行任務的返回值新增到結果列表中
results.emplace_back(
// 將下面的列印任務新增到執行緒池中併發執行
pool.enqueue([i] {
std ::cout << "hello " << i << std::endl;
// 上一行輸出後, 該執行緒會等待1秒鐘
std::this_thread::sleep_for(std::chrono::seconds(1));
// 然後再繼續輸出並返回執行情況
std::cout << "world " << i << std::endl;
return std::string ("---thread ") + std::to_string(i) + std::string(" finished.---");
})
);
}
// 輸出執行緒任務的結果
for(auto && result: results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
執行緒池類
//
// ThreadPool.hpp
// ThreadPool
//
// Original Author: Jakob Progsch, Václav Zeman
// Modified By: https://www.shiyanlou.com
// Original Link: https://github.com/progschj/ThreadPool
//
#ifndef ThreadPool_hpp
#define ThreadPool_hpp
#include <vector> // std::vector
#include <queue> // std::queue
#include <memory> // std::make_shared
#include <stdexcept> // std::runtime_error
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
#include <future> // std::future, std::packaged_task
#include <functional> // std::function, std::bind
#include <utility> // std::move, std::forward
class ThreadPool {
public:
inline ThreadPool(size_t threads) : stop(false) {
// 啟動 threads 數量的工作執行緒(worker)
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
// 此處的 lambda 表示式捕獲 this, 即執行緒池例項
[this]
{
// 迴圈避免虛假喚醒
for(;;)
{
// 定義函式物件的容器, 儲存任意的返回型別為 void 引數表為空的函式
std::function<void()> task;
// 臨界區
{
// 建立互斥鎖
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 阻塞當前執行緒, 直到 condition_variable 被喚醒
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
// 如果當前執行緒池已經結束且等待任務佇列為空, 則應該直接返回
if(this->stop && this->tasks.empty())
return;
// 否則就讓任務佇列的隊首任務作為需要執行的任務出隊
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 執行當前任務
task();
}
}
);
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
// 推導任務返回型別
using return_type = typename std::result_of<F(Args...)>::type;
// 獲得當前任務
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 獲得 std::future 物件以供實施執行緒同步
std::future<return_type> res = task->get_future();
// 臨界區
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 禁止線上程池停止後加入新的執行緒
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
// 將執行緒新增到執行任務佇列中
tasks.emplace([task]{ (*task)(); });
}
// 通知一個正在等待的執行緒
condition.notify_one();
return res;
}
inline ~ThreadPool() {
// 臨界區
{
// 建立互斥鎖
std::unique_lock<std::mutex> lock(queue_mutex);
// 設定執行緒池狀態
stop = true;
}
// 通知所有等待執行緒
condition.notify_all();
// 使所有非同步執行緒轉為同步執行, 此處迴圈為 c++11 新提供的迴圈語法 for(value:values)
for(std::thread &worker: workers)
worker.join();
}
private:
// 需要持續追蹤執行緒來保證可以使用 join
std::vector< std::thread > workers;
// 任務佇列
std::queue< std::function<void()> > tasks;
// 同步相關
std::mutex queue_mutex; // 互斥鎖
std::condition_variable condition; // 互斥條件變數
// 停止相關
bool stop;
};
#endif /* ThreadPool_hpp */