C++簡易執行緒池的實現
文章目錄
1.執行緒池基本瞭解
一種執行緒使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價
。執行緒池不僅能夠保證核心的充分利用
,還能防止過分排程
。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。
2.執行緒池的應用場景
1)需要大量的執行緒來完成任務,且完成任務的時間比較短。
WEB伺服器完成網頁請求這樣的任務,使用執行緒池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點選次數。 但對於長時間的任務,比如一個Telnet連線請求,執行緒池的優點就不明顯了。因為Telnet會話時間比執行緒的建立時間大多了。
2)對效能要求苛刻的應用,比如要求伺服器迅速響應客戶請求。
3) 接受突發性的大量請求,但不至於使伺服器因此產生大量執行緒的應用。
突發性大量客戶請求,在沒有執行緒池情況下,將產生大量執行緒,雖然理論上大部分作業系統執行緒數目最大值不是問題,短時間內產生大量執行緒可能使記憶體到達極限,出現錯誤.
3.實現一個簡單的加法任務的執行緒池
1)首先我們建立一個任務類。
//回撥函式 typedef int(*cal_t)(int,int); class Task{ private: int x;//運算元1 int y;//運算元2 int z;//運算結果 cal_t handler_task;//運算操作 public: Task(int a,int b,cal_t handler_task_)//建構函式 :x(a),y(b),handler_task(handler_task_) { } //執行操作 void Run() { z = handler_task(x,y); } //將結果輸出 void Show() { cout<<"thread : "<<pthread_self()<<"Task finish,result is : "<<z<<endl; } ~Task() { } };
2)執行緒池類
《1》執行緒池的成員
class ThreadPool{
private:
queue<Task> Task_Queue;//將任務新增在一個佇列中
bool IsStop;//執行緒池的工作狀態
int ThreadNum;//執行緒池內的執行緒數量
//為了執行緒安全所以需要互斥鎖與條件變數保護臨界資源
pthread_mutex_t lock;
pthread_cond_t cond;
};
《2》根據測試實現執行緒池內的成員函式
#include"ThreadPool.hpp"
#define NUM 5 //設定執行緒池的執行緒數量
//任務操作
int Add(int x,int y)
{
return x + y;
}
int main()
{
//建立一個執行緒池類
ThreadPool *tp =new ThreadPool(NUM);
//執行緒池的初始化
tp -> InitThreadPool();
//為了便於理解,我們使用while迴圈先一直往任務佇列新增任務。
int count =1;
while(1){
sleep(1);
Task t(count,count-1,Add);
tp->AddTask(t);
count++;
}
return 0;
}
a.執行緒池類的建構函式與解構函式
//建構函式初始化時,將IsStop狀態設定為fasle,否則執行緒池無法工作。
ThreadPool(int num):ThreadNum(num),IsStop(false)
{}
b.執行緒池的初始化
void InitThreadPool()
{
//對互斥鎖以及條件變臉初始化。
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
//設定變數控制執行緒的建立
int i = 0;
for(;i<ThreadNum;i++)
{
pthread_t tid;
pthread_create(&tid,NULL,route,(void *)this);
}
}
//解構函式完成互斥鎖以及條件變數的銷燬
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
c.初始化完後實現執行緒建立中的route函式
//C++中static函式內無this指標,C++類對執行緒操作要用static修飾
static void *route(void *arg)
{
ThreadPool *tp = (ThreadPool*)arg;
//將執行緒自己分離出來
pthread_detach(pthread_self());
while(1){
//為了保證執行緒安全,將任務佇列加鎖
tp->LockQueue();
//當任務佇列為空時,執行緒先進入等待狀態
if(tp->IsEmpty()){
tp->IdleThread();
}
//當佇列有任務時,建立一個任務類並得到該任務,也由此得出需要一個AddTask();成員函式。
Task t = tp->GetTask();
//取到會解鎖
tp->UnlockQueue();
//執行任務操作
t.Run();
//將任務結果顯示
t.Show();
}
}
d.依次實LockQueue();IsEmpty();IDleThread();GetTask();UnlockQueue();
//任務佇列加鎖
void LockQueue()
{
pthread_mutex_lock(&lock);
}
//任務解鎖解鎖
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
//判斷任務是否為空
bool IsEmpty()
{
return Task_Queue.size() == 0 ? true : false;
}
//加任務
void AddTask(Task &t)
{
Task_Queue.push(t);
//當得到一個任務時便傳送訊號給一個程序
NoticOneThread();
UnlockQueue();
}
//傳送訊號給一個程序
void NoticOneThread()
{
pthread_cond_signal(&cond);
}
//等待佇列
void IdleThread()
{
pthread_cond_wait(&cond,&lock);
}
//得到任務
Task GetTask()
{
Task t = Task_Queue.front();
Task_Queue.pop();
return t;
}
e.暫停分析
貌似我們已經實現了一個加法的簡易執行緒池,首先建立一個執行緒池物件,然後進行初始化,建立設定數的執行緒,並且它們都處於等待狀態,當有任務時,它們依次被喚醒,執行任務。但是我們測試用的是while(1)假如沒有了任務,執行緒便一直處於了等待狀態不會退出,於是我們還要實現一個Stop()成員函式,若停止了廣播NoticAllThread()中的執行緒讓他們依次退出,並修改IdleThread()以及AddTask()函式進行IsStop條件判斷。
void Stop()
{
LockQueue();
IsStop = true;
UnlockQueue();
while(ThreadNum > 0){
NoticAllThread();
}
}
void IdleThread()
{
if(IsStop){
UnlockQueue();
ThreadNum--;
pthread_exit((void *)0);
cout<<"pthread "<<pthread_self() <<"quit"<<endl;
return ;
}
pthread_cond_wait(&cond,&lock);
}
void AddTask(Task &t)
{
if(IsStop)
{
UnlockQueue();
return ;
}
Task_Queue.push(t);
NoticOneThread();
UnlockQueue();
}
void NoticAllThread()
{
pthread_cond_broadcast(&cond);
}
3) 完整程式碼以及測試
標頭檔案
#ifndef __THREADPOLL_HPP__
#define __THREADPOLL_HPP__
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
using namespace std;
typedef int(*cal_t)(int,int);
class Task{
private:
int x;
int y;
int z;
cal_t handler_task;
public:
Task(int a,int b,cal_t handler_task_)
:x(a),y(b),handler_task(handler_task_)
{
}
void Run()
{
z = handler_task(x,y);
}
void Show()
{
cout<<"thread : "<<pthread_self()<<"Task finish,result is : "<<z<<endl;
}
~Task()
{
}
};
class ThreadPool{
private:
queue<Task> Task_Queue;
bool IsStop;
int ThreadNum;
pthread_mutex_t lock;
pthread_cond_t cond;
private:
static void *route(void *arg)
{
ThreadPool *tp = (ThreadPool*)arg;
pthread_detach(pthread_self());
while(1){
tp->LockQueue();
if(tp->IsEmpty()){
tp->IdleThread();
}
Task t = tp->GetTask();
tp->UnlockQueue();
t.Run();
t.Show();
}
}
void NoticOneThread()
{
pthread_cond_signal(&cond);
}
void NoticAllThread()
{
pthread_cond_broadcast(&cond);
}
public:
ThreadPool(int num):ThreadNum(num),IsStop(false)
{}
void InitThreadPool()
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
int i = 0;
for(;i<ThreadNum;i++)
{
pthread_t tid;
pthread_create(&tid,NULL,route,(void *)this);
}
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
bool IsEmpty()
{
return Task_Queue.size() == 0 ? true : false;
}
void AddTask(Task &t)
{
if(IsStop)
{
UnlockQueue();
return ;
}
Task_Queue.push(t);
NoticOneThread();
UnlockQueue();
}
void IdleThread()
{
if(IsStop){
UnlockQueue();
ThreadNum--;
pthread_exit((void *)0);
cout<<"pthread "<<pthread_self() <<"quit"<<endl;
return ;
}
pthread_cond_wait(&cond,&lock);
}
Task GetTask()
{
Task t = Task_Queue.front();
Task_Queue.pop();
return t;
}
void Stop()
{
LockQueue();
IsStop = true;
UnlockQueue();
while(ThreadNum > 0){
NoticAllThread();
}
}
};
#endif
測試檔案
我們只設置九個任務。
#include"ThreadPool.hpp"
#define NUM 5
int Add(int x,int y)
{
return x + y;
}
int main()
{
ThreadPool *tp =new ThreadPool(NUM);
tp -> InitThreadPool();
int count =1;
int u = 0;
for(;u<1;u++){
sleep(1);
Task t(count,count-1,Add);
tp->AddTask(t);
count++;
}
return 0;
}
結果
在gdb下觀察:
1)首先建立5個執行緒。
2)5個執行緒桉序執行任務。
3)沒有任務了執行緒退出。