簡單Linux C執行緒池
阿新 • • 發佈:2018-12-11
#include "threadpool.h"struct threadpool* threadpool_init(int thread_num, int queue_max_num){struct threadpool *pool = NULL;do { pool = malloc(sizeof(struct threadpool));if (NULL == pool) { printf("failed to malloc threadpool!\n");break; } pool->thread_num = thread_num; pool->queue_max_num = queue_max_num; pool->queue_cur_num = 0; pool->head = NULL; pool->tail = NULL;if (pthread_mutex_init(&(pool->mutex), NULL)) { printf("failed to init mutex!\n");break; }if (pthread_cond_init(&(pool->queue_empty), NULL)) { printf("failed to init queue_empty!\n");break; }if (pthread_cond_init(&(pool->queue_not_empty), NULL)) { printf("failed to init queue_not_empty!\n");break; }if (pthread_cond_init(&(pool->queue_not_full), NULL)) { printf("failed to init queue_not_full!\n");break; } pool->pthreads = malloc(sizeof(pthread_t) * thread_num);if (NULL == pool->pthreads) { printf("failed to malloc pthreads!\n");break; } pool->queue_close = 0; pool->pool_close = 0;int i;for (i = 0; i < pool->thread_num; ++i) { pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool); }return pool; } while (0);return NULL;}int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg){ assert(pool != NULL); assert(callback_function != NULL); assert(arg != NULL); pthread_mutex_lock(&(pool->mutex));while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close)) { pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex)); //佇列滿的時候就等待 }if (pool->queue_close || pool->pool_close) //佇列關閉或者執行緒池關閉就退出 { pthread_mutex_unlock(&(pool->mutex));return -1; }struct job *pjob =(struct job*) malloc(sizeof(struct job));if (NULL == pjob) { pthread_mutex_unlock(&(pool->mutex));return -1; } pjob->callback_function = callback_function; pjob->arg = arg; pjob->next = NULL;if (pool->head == NULL) { pool->head = pool->tail = pjob; pthread_cond_broadcast(&(pool->queue_not_empty)); //佇列空的時候,有任務來時就通知執行緒池中的執行緒:佇列非空 }else { pool->tail->next = pjob; pool->tail = pjob; } pool->queue_cur_num++; pthread_mutex_unlock(&(pool->mutex));return 0;}void* threadpool_function(void* arg){struct threadpool *pool = (struct threadpool*)arg;struct job *pjob = NULL;while (1) //死迴圈 { pthread_mutex_lock(&(pool->mutex));while ((pool->queue_cur_num == 0) && !pool->pool_close) //佇列為空時,就等待佇列非空 { pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex)); }if (pool->pool_close) //執行緒池關閉,執行緒就退出 { pthread_mutex_unlock(&(pool->mutex)); pthread_exit(NULL); } pool->queue_cur_num--; pjob = pool->head;if (pool->queue_cur_num == 0) { pool->head = pool->tail = NULL; }else { pool->head = pjob->next; }if (pool->queue_cur_num == 0) { pthread_cond_signal(&(pool->queue_empty)); //佇列為空,就可以通知threadpool_destroy函式,銷燬執行緒函式 }if (pool->queue_cur_num == pool->queue_max_num - 1) { pthread_cond_broadcast(&(pool->queue_not_full)); //佇列非滿,就可以通知threadpool_add_job函式,新增新任務 } pthread_mutex_unlock(&(pool->mutex)); (*(pjob->callback_function))(pjob->arg); //執行緒真正要做的工作,回撥函式的呼叫 free(pjob); pjob = NULL; }}int threadpool_destroy(struct threadpool *pool){ assert(pool != NULL); pthread_mutex_lock(&(pool->mutex));if (pool->queue_close || pool->pool_close) //執行緒池已經退出了,就直接返回 { pthread_mutex_unlock(&(pool->mutex));return -1; } pool->queue_close = 1; //置佇列關閉標誌while (pool->queue_cur_num != 0) { pthread_cond_wait(&(pool->queue_empty), &(pool->mutex)); //等待佇列為空 } pool->pool_close = 1; //置執行緒池關閉標誌 pthread_mutex_unlock(&(pool->mutex)); pthread_cond_broadcast(&(pool->queue_not_empty)); //喚醒執行緒池中正在阻塞的執行緒 pthread_cond_broadcast(&(pool->queue_not_full)); //喚醒新增任務的threadpool_add_job函式int i;for (i = 0; i < pool->thread_num; ++i) { pthread_join(pool->pthreads[i], NULL); //等待執行緒池的所有執行緒執行完畢 } pthread_mutex_destroy(&(pool->mutex)); //清理資源 pthread_cond_destroy(&(pool->queue_empty)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); free(pool->pthreads);struct job *p;while (pool->head != NULL) { p = pool->head; pool->head = p->next; free(p); } free(pool);return 0;}