隱藏一個元素的幾種方法
阿新 • • 發佈:2020-09-10
簡介
執行緒池(thread pool):一種執行緒的使用模式,執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。
執行緒池的組成
1、執行緒池管理器
建立一定數量的執行緒,啟動執行緒,調配任務,管理著執行緒池。
本篇執行緒池目前只需要啟動(start()),停止方法(stop()),及任務新增方法(addTask).
start()建立一定數量的執行緒池,進行執行緒迴圈.
addTask()新增任務.
2、工作執行緒
執行緒池中執行緒,線上程池中等待並執行分配的任務.
本篇選用條件變數實現等待與通知機制.
3、任務介面,
新增任務的介面,以供工作執行緒排程任務的執行。
4、任務佇列
用於存放沒有處理的任務。提供一種緩衝機制
同時任務佇列具有排程功能,高優先順序的任務放在任務佇列前面;本篇選用priority_queue 與pair的結合用作任務優先佇列的結構.
程式碼實現:
ThreadPool.hpp:
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #includeView Code<thread> #include <mutex> #include <atomic> #include <condition_variable> #include <functional> #include <vector> #include <queue> class ThreadPool { public: using Task = std::function<void()>; explicit ThreadPool(int num) : _thread_num(num), _is_running(false) {} ~ThreadPool() { if (_is_running) stop(); } void start() { _is_running = true; // start threads for (int i = 0; i < _thread_num; i++) _threads.emplace_back(std::thread(&ThreadPool::work, this)); } void stop() { { // stop thread pool, should notify all threads to wake std::unique_lock<std::mutex> lk(_mtx); _is_running = false; _cond.notify_all(); // must do this to avoid thread block } // terminate every thread job for (std::thread& t : _threads) { if (t.joinable()) t.join(); } } void appendTask(const Task& task) { if (_is_running) { std::unique_lock<std::mutex> lk(_mtx); _tasks.push(task); _cond.notify_one(); // wake a thread to to the task } } private: void work() { printf("begin work thread: %d\n", std::this_thread::get_id()); // every thread will compete to pick up task from the queue to do the task while (_is_running) { Task task; { std::unique_lock<std::mutex> lk(_mtx); if (!_tasks.empty()) { // if tasks not empty, // must finish the task whether thread pool is running or not task = _tasks.front(); _tasks.pop(); // remove the task } else if (_is_running && _tasks.empty()) _cond.wait(lk); } if (task) task(); // do the task } printf("end work thread: %d\n", std::this_thread::get_id()); } public: // disable copy and assign construct ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool& other) = delete; private: bool _is_running; // thread pool manager status std::mutex _mtx; std::condition_variable _cond; int _thread_num; std::vector<std::thread> _threads; std::queue<Task> _tasks; }; #endif // !_THREAD_POOL_H_
main.cpp
#include "stdafx.h" #include <iostream> #include <chrono> #include "ThreadPool.hpp" void fun1() { std::cout << "working in thread " << std::this_thread::get_id() << std::endl; } void fun2(int x) { std::cout << "task " << x << " working in thread " << std::this_thread::get_id() << std::endl; } int main(int argc, char* argv[]) { ThreadPool thread_pool(3); thread_pool.start(); std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (int i = 0; i < 6; i++) { //thread_pool.appendTask(fun1); thread_pool.appendTask(std::bind(fun2, i)); //std::this_thread::sleep_for(std::chrono::milliseconds(500)); } thread_pool.stop(); getchar(); return 0; }View Code