用C++實現簡單執行緒池
阿新 • • 發佈:2018-12-27
執行緒池
什麼是執行緒池
- 在使用執行緒的CS模型中,伺服器端每接收到一個客戶端請求,都會為客戶端建立執行緒資源,當有大量突發性請求時,伺服器來不及為每個客戶端建立執行緒。執行緒每次的建立與銷燬都會耗費伺服器大量資源與時間,可以在伺服器一開始就建立好一堆執行緒,等到客戶端請求來臨直接讓這些執行緒進行處理,這就是執行緒池。
- 執行緒池是一種多執行緒的處理模式,線上程池啟動之後,向執行緒池中新增任務,執行緒池中的執行緒將會自動處理這些任務
適用場景
- 需要大量的執行緒來完成任務,並且每個任務的時間比較短。在WEB伺服器上適用執行緒池是非常有必要的,單個任務量小,任務量大。
- 對效能比較苛刻,需要立即做出應答。
- 接收突發性的大量請求,但是伺服器不至於產生大量執行緒。
簡單實現
任務模組:適用setData
為每個物件設定任務,使用Run
來執行任務佇列。
struct Task{
public:
void *_data;
public:
bool setData(void *data){
_data = data;
}
bool Run(){
srand(time(NULL));
int sec = rand() % 3;
printf("id:%p -- run data -- %s -- sleep:%d\n" , pthread_self(), _data, sec);
sleep(sec);
}
};
執行緒池模組:
class pthreadPool{
private:
int _max_thr; //max pthread
int _cur_thr; //now pthread
bool _stop_flag; //is stop
std::queue<Task*> _queue;
int _cap; //queue capacity
pthread_mutex_t _lock;
pthread_cond_t _full;
pthread_cond_t _empty;
};
- 使用STL中的佇列來模擬實現任務佇列,_max_thr表示當前執行緒池中最大的執行緒數,
- _cur_thr是當前的執行緒的數量,_stop_flag是否停止標誌,_cap是佇列的最大長度。
建構函式:對相關互斥鎖和條件變數進行初始化
pthreadPool(int max_thr = 5, int max_queue = 10)
: _max_thr(max_thr)
, _cur_thr(0)
, _stop_flag(false)
, _cap(max_queue)
{
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_full, NULL);
pthread_cond_init(&_empty, NULL);
}
初始化執行緒池:建立執行緒池支援的最大個數個執行緒
bool initPool(){
pthread_t tid;
int i, ret;
for(i = 0; i < _max_thr; i++){
ret = pthread_create(&tid, NULL, thrStart, (void*)this);
if(ret != 0){
std::cerr << "pthread_create error" << std::endl;
return false;
}
pthread_detach(tid);
}
}
向執行緒池中新增任務
bool addTask(Task *task){
pthread_mutex_lock(&_lock);
while(queueIsFull()){
pthread_cond_wait(&_full, &_lock);
}
_queue.push(task);
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_lock);
return true;
}
- 在每次向執行緒池中新增任務時,為了防止其他執行緒獲得CPU排程產生的資料安全問題,都需要先對其進行加鎖操作
- 如果任務佇列滿了就需要等待在
_full
條件變數上,此處要用while
迴圈來判斷,而不能用if
- 佇列不滿,向佇列中新增任務,新增完畢之後向等待在
_empty
上的執行緒做出通知,最後釋放鎖,新增完畢
執行緒處理函式
static void *thrStart(void *arg){
pthreadPool *pool = (pthreadPool*)arg;
while(1){
pthread_mutex_lock(&pool->_lock);
//queue is empty, wait
while(pool->queueIsEmpty() && !(pool->_stop_flag)){
pthread_cond_wait(&pool->_empty, &pool->_lock);
}
if(pool->queueIsEmpty() && pool->_stop_flag){
std::cout << "-pthread eixt-" << std::endl;
pool->_cur_thr--;
pthread_mutex_unlock(&pool->_lock);
pthread_cond_signal(&pool->_full);
pthread_exit(NULL);
}
Task* task;
pool->queuePop(&task);
pthread_mutex_unlock(&pool->_lock);
pthread_cond_signal(&pool->_full);
task->Run();
}
}
- 此處執行緒處理函式宣告為靜態函式,所以在建立執行緒時需要把隱含的
this
指標傳遞過來 - 在進行操作之前先進行加鎖操作,如果任務佇列為空並且不退出狀態就等待在
_empty
上 - 取出任務之後,因為不知道任務具體執行的時間,為了防止死鎖,所以先進行解鎖操作,通知等待在
_full
上的執行緒可以新增任務 - 最後執行任務。
判斷是否停止執行緒池
bool stop(){
pthread_mutex_lock(&_lock);
if(_stop_flag){
pthread_mutex_unlock(&_lock);
return false;
}
_stop_flag = true;
while(_cur_thr > 0){
pthread_cond_broadcast(&_empty);
pthread_cond_wait(&_full, &_lock);
}
pthread_mutex_unlock(&_lock);
return false;
}