【Linux】生產者消費者程式設計實現-執行緒池+訊號量
阿新 • • 發佈:2019-02-09
生產者消費者程式設計實現,採用了執行緒池以及訊號量技術。
執行緒的概念就不多說,首先說一下多執行緒的好處:多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。
那麼為什麼又需要執行緒池呢?
我們知道應用程式建立一個物件,然後銷燬物件是很耗費資源的。建立執行緒,銷燬執行緒,也是如此。因此,我們就預先生成一些執行緒,等到我們使用的時候在進行排程,於是,一些"池化資源"技術就這樣的產生了。
一般一個簡單執行緒池至少包含下列組成部分。
1)執行緒池管理器(ThreadPoolManager):用於建立並管理執行緒池
2)工作執行緒( WorkThread): 執行緒池中執行緒
3)任務介面(Task):每個任務必須實現的介面,以供工作執行緒排程任務的執行。
4)任務佇列:用於存放沒有處理的任務。提供一種緩衝機制。
圖示:圖1 執行緒池圖解
生產者消費者模型C語言程式碼實現:
thread_pool_pv.h:
//執行緒池程式設計實現 #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <stdio.h> #include <stdlib.h> #include <semaphore.h>//訊號量sem_t #include <pthread.h> //任務介面,執行緒呼叫的函式 typedef void* (*FUNC)(void *arg); //任務資料結構 typedef struct thread_pool_job_s{ FUNC function;//執行緒呼叫的函式 void *arg;//函式引數 struct thread_pool_job_s *pre;//指向上一個節點 struct thread_pool_job_s *next;//指向下一個節點 }thread_pool_job; //工作佇列 typedef struct thread_pool_job_queue_s{ thread_pool_job *head;//佇列頭指標 thread_pool_job *tail;//佇列尾指標 int num;//任務數目 sem_t *quene_sem;//訊號量 }thread_pool_job_queue; //執行緒池(存放消費者程序) typedef struct thread_pool_s{ pthread_t *threads;//執行緒 int threads_num;//執行緒數目 thread_pool_job_queue *job_queue;//指向工作佇列的指標 }thread_pool; //typedef struct thread_data_s{ // pthread_mutex_t *mutex_t;//互斥量 // thread_pool *tp_p;//指向執行緒池的指標 //}thread_data; //初始化執行緒池 thread_pool* tp_init(int thread_num); //初始化工作佇列 int tp_job_quene_init(thread_pool *tp); //向工作佇列中新增一個元素 void tp_job_quene_add(thread_pool *tp,thread_pool_job *new_job); //向執行緒池中新增一個工作項 int tp_add_work(thread_pool *tp,void *(*func_p)(void *),void *arg); //取得工作佇列的最後個節點 thread_pool_job* tp_get_lastjob(thread_pool *tp); //刪除工作佇列的最後個節點 int tp_delete__lastjob(thread_pool *tp); //銷燬執行緒池 void tp_destroy(thread_pool *tp); //消費者執行緒函式 void* tp_thread_func(thread_pool *tp); //生產者執行緒執行函式 void* thread_func_producer(thread_pool *tp); #endif
thread_pool_pv.c:
//執行緒池程式設計實現 #include "thread_pool.h" //互斥量,用於對工作佇列的訪問 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //標記執行緒池是否處於可用狀態 static int tp_alive = 1; //初始化執行緒池 thread_pool* tp_init(int thread_num){ thread_pool *tp; int i; if(thread_num < 1) thread_num = 1; tp = (thread_pool *)malloc(sizeof(thread_pool)); //判斷記憶體分配是否成功 if(NULL == tp){ printf("ERROR:allocate memory for thread_pool failed\n"); return NULL; } tp->threads_num = thread_num; //分配執行緒所佔記憶體空間 tp->threads = (pthread_t*)malloc(thread_num * sizeof(pthread_t)); //判斷記憶體分配是否成功 if(NULL == tp->threads){ printf("ERROR:allocate memory for threads in thread pool failed\n"); return NULL; } if(tp_job_quene_init(tp)) return NULL; tp->job_queue->quene_sem = (sem_t *)malloc(sizeof(sem_t)); sem_init(tp->job_queue->quene_sem,0,0);//訊號量初始化 //初始化執行緒 for(i = 0;i < thread_num;++i){ pthread_create(&(tp->threads[i]),NULL,(void *)tp_thread_func,(void *)tp); } return tp; } //初始化工作佇列 int tp_job_quene_init(thread_pool *tp){ tp->job_queue = (thread_pool_job_queue *)malloc(sizeof(thread_pool_job_queue)); if(NULL == tp->job_queue){ return -1; } tp->job_queue->head = NULL; tp->job_queue->tail = NULL; tp->job_queue->num = 0; return 0; } //執行緒函式 void* tp_thread_func(thread_pool *tp){ FUNC function; void *arg_buf; thread_pool_job *job_p; while(tp_alive){ //執行緒阻塞,等待訊號量 if(sem_wait(tp->job_queue->quene_sem)){ printf("thread waiting for semaphore....\n"); exit(1); } if(tp_alive){ pthread_mutex_lock(&mutex); job_p = tp_get_lastjob(tp); if(NULL == job_p){ pthread_mutex_unlock(&mutex); continue; } function = job_p->function; arg_buf = job_p->arg; if(tp_delete__lastjob(tp)) return; pthread_mutex_unlock(&mutex); //執行指定的執行緒函式 printf("consumer...get a job from job quene and run it!\n"); function(arg_buf); free(job_p); } else return; } return; } //向工作佇列中新增一個元素 void tp_job_quene_add(thread_pool *tp,thread_pool_job *new_job){ new_job->pre = NULL; new_job->next = NULL; thread_pool_job *old_head_job = tp->job_queue->head; if(NULL == old_head_job){ tp->job_queue->head = new_job; tp->job_queue->tail = new_job; } else{ old_head_job->pre = new_job; new_job->next = old_head_job; tp->job_queue->head = new_job; } ++(tp->job_queue->num); sem_post(tp->job_queue->quene_sem); } //取得工作佇列的最後一個節點 thread_pool_job* tp_get_lastjob(thread_pool *tp){ return tp->job_queue->tail; } //刪除工作佇列的最後個節點 int tp_delete__lastjob(thread_pool *tp){ if(NULL == tp) return -1; thread_pool_job *last_job = tp->job_queue->tail; if(0 == tp->job_queue->num){ return -1; } else if(1 == tp->job_queue->num){ tp->job_queue->head = NULL; tp->job_queue->tail = NULL; } else{ last_job->pre->next = NULL; tp->job_queue->tail = last_job->pre; } //修改相關變數 --(tp->job_queue->num); return 0; } //向執行緒池中新增一個工作項 int tp_add_work(thread_pool *tp,void *(*func_p)(void *),void *arg){ thread_pool_job *new_job = (thread_pool_job *)malloc(sizeof(thread_pool_job)); if(NULL == new_job){ printf("ERROR:allocate memory for new job failed!\n"); exit(1); } new_job->function = func_p; new_job->arg = arg; pthread_mutex_lock(&mutex); tp_job_quene_add(tp,new_job); pthread_mutex_unlock(&mutex); } //銷燬執行緒池 void tp_destroy(thread_pool *tp){ int i; tp_alive = 0; //等待執行緒執行結束 //sleep(10); for(i = 0;i < tp->threads_num;++i){ pthread_join(tp->threads[i],NULL); } free(tp->threads); if(sem_destroy(tp->job_queue->quene_sem)){ printf("ERROR:destroy semaphore failed!\n"); } free(tp->job_queue->quene_sem); //刪除job佇列 thread_pool_job *current_job = tp->job_queue->tail; while(tp->job_queue->num){ tp->job_queue->tail = current_job->pre; free(current_job); current_job = tp->job_queue->tail; --(tp->job_queue->num); } tp->job_queue->head = NULL; tp->job_queue->tail = NULL; } //自定義執行緒執行函式 void* thread_func1(){ printf("Task1 running...by Thread :%u\n",(unsigned int)pthread_self()); } //自定義執行緒執行函式 void* thread_func2(){ printf("Task2 running...by Thread :%u\n",(unsigned int)pthread_self()); } //生產者執行緒執行函式 void* thread_func_producer(thread_pool *tp){ while(1){ printf("producer...add a job(job1) to job quene!\n"); tp_add_work(tp,(void*)thread_func1,NULL); sleep(1); printf("producer...add a job(job2) to job quene!\n"); tp_add_work(tp,(void*)thread_func2,NULL); } } int main(){ thread_pool *tp = tp_init(5); int i; int arg = 7; pthread_t producer_thread_id;//生產者執行緒ID pthread_create(&producer_thread_id,NULL,(void *)thread_func_producer,(void *)tp); pthread_join(producer_thread_id,NULL); tp_destroy(tp); return 0; }
執行結果: