執行緒池簡單程式碼
阿新 • • 發佈:2018-11-30
condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
//封裝一個互斥量和條件變數作為狀態
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
}condition_t;
//對狀態的操作函式
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t* cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif
condition.c
#include "condition.h"
//初始化
int condition_init(condition_t *cond)
{
int status;
if((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
//加鎖
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
//解鎖
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
//等待
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
//固定時間等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
//喚醒一個睡眠執行緒
int condition_signal(condition_t* cond)
{
return pthread_cond_signal(&cond->pcond);
}
//喚醒所有睡眠執行緒
int condition_broadcast(condition_t *cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
//釋放
int condition_destroy(condition_t *cond)
{
int status;
if((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
然後是執行緒池對應的threadpool.h和threadpool.c
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ //執行緒池標頭檔案 #include "condition.h" //封裝執行緒池中的物件需要執行的任務物件 typedef struct task { void *(*run)(void *args); //函式指標,需要執行的任務 void *arg; //引數 struct task *next; //任務佇列中下一個任務 }task_t; //下面是執行緒池結構體 typedef struct threadpool { condition_t ready; //狀態量 task_t *first; //任務佇列中第一個任務 task_t *last; //任務佇列中最後一個任務 int counter; //執行緒池中已有執行緒數 int idle; //執行緒池中kongxi執行緒數 idel = 0 則該執行緒在忙碌;>0 則表示該執行緒在等待喚醒 int max_threads; //執行緒池最大執行緒數 int quit; //是否退出標誌 }threadpool_t;//執行緒池初始化 void threadpool_init(threadpool_t *pool, int threads); //往執行緒池中加入任務 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg); //摧毀執行緒池 void threadpool_destroy(threadpool_t *pool); #endif
---------------------------------------------------------------------------
#include "threadpool.h" #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <time.h> //建立的執行緒執行 void *thread_routine(void *arg) { struct timespec abstime; int timeout; printf("thread %d is starting\n", (int)pthread_self()); threadpool_t *pool = (threadpool_t *)arg; while(1) { timeout = 0; //訪問執行緒池之前需要加鎖 condition_lock(&pool->ready); //空閒 pool->idle++; //等待佇列有任務到來 或者 收到執行緒池銷燬通知 while(pool->first == NULL && !pool->quit) { //否則執行緒阻塞等待 printf("thread %d is waiting\n", (int)pthread_self()); //獲取從當前時間,並加上等待時間, 設定程序的超時睡眠時間 clock_gettime(CLOCK_REALTIME, &abstime); abstime.tv_sec += 2; int status; status = condition_timedwait(&pool->ready, &abstime); //該函式會解鎖,允許其他執行緒訪問,當被喚醒時,加鎖 if(status == ETIMEDOUT) { printf("thread %d wait timed out\n", (int)pthread_self()); timeout = 1; break; } } pool->idle--; if(pool->first != NULL) { //取出等待佇列最前的任務,移除任務,並執行任務 task_t *t = pool->first; pool->first = t->next; //由於任務執行需要消耗時間,先解鎖讓其他執行緒訪問執行緒池 condition_unlock(&pool->ready); //執行任務 t->run(t->arg); //執行完任務釋放記憶體 free(t); //重新加鎖 condition_lock(&pool->ready); } //退出執行緒池 if(pool->quit && pool->first == NULL) { pool->counter--;//當前工作的執行緒數-1 //若執行緒池中沒有執行緒,通知等待執行緒(主執行緒)全部任務已經完成 if(pool->counter == 0) { condition_signal(&pool->ready); } condition_unlock(&pool->ready); break; } //超時,跳出銷燬執行緒 if(timeout == 1) { pool->counter--;//當前工作的執行緒數-1 condition_unlock(&pool->ready); break; } condition_unlock(&pool->ready); } printf("thread %d is exiting\n", (int)pthread_self()); return NULL; } //執行緒池初始化 void threadpool_init(threadpool_t *pool, int threads) { condition_init(&pool->ready); pool->first = NULL; pool->last =NULL; pool->counter =0; pool->idle =0; pool->max_threads = threads; pool->quit =0; } //增加一個任務到執行緒池 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) { //產生一個新的任務 task_t *newtask = (task_t *)malloc(sizeof(task_t)); newtask->run = run; newtask->arg = arg; newtask->next=NULL;//新加的任務放在佇列尾端 //執行緒池的狀態被多個執行緒共享,操作前需要加鎖 condition_lock(&pool->ready); if(pool->first == NULL)//第一個任務加入 { pool->first = newtask; } else { pool->last->next = newtask; } pool->last = newtask; //佇列尾指向新加入的執行緒 //執行緒池中有執行緒空閒,喚醒 if(pool->idle > 0) { condition_signal(&pool->ready); } //當前執行緒池中執行緒個數沒有達到設定的最大值,建立一個新的線性 else if(pool->counter < pool->max_threads) { pthread_t tid; pthread_create(&tid, NULL, thread_routine, pool); pool->counter++; } //結束,訪問 condition_unlock(&pool->ready); } //執行緒池銷燬 void threadpool_destroy(threadpool_t *pool) { //如果已經呼叫銷燬,直接返回 if(pool->quit) { return; } //加鎖 condition_lock(&pool->ready); //設定銷燬標記為1 pool->quit = 1; //執行緒池中執行緒個數大於0 if(pool->counter > 0) { //對於等待的執行緒,傳送訊號喚醒 if(pool->idle > 0) { condition_broadcast(&pool->ready); } //正在執行任務的執行緒,等待他們結束任務 while(pool->counter) { condition_wait(&pool->ready); } } condition_unlock(&pool->ready); condition_destroy(&pool->ready); }
測試程式碼:
#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
void* mytask(void *arg)
{
printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep(1);
free(arg);
return NULL;
}
//測試程式碼
int main(void)
{
threadpool_t pool;
//初始化執行緒池,最多三個執行緒
threadpool_init(&pool, 3);
int i;
//建立十個任務
for(i=0; i < 10; i++)
{
int *arg = malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool, mytask, arg);
}
threadpool_destroy(&pool);
return 0;
}