多個消費者多個生產者
阿新 • • 發佈:2018-11-10
之前的是單生產消費者 使用一個mutex解決生產者消費者問題
其中第2個例子使用了事件物件, 用於單執行緒的情況. 若在linux 上用訊號量每次解鎖一次即可;
如果要修改成多個的情況,就要使用訊號量.
先解決多個生產 1個消費的問題:
const int BUFF_SIZE = 10; //緩衝區大小 const int NTIMES = 50; //總共生產的個數 static int producer_gcount = 0; //僅僅用於顯示死鎖問題 struct { HANDLE mutex; HANDLE sem_empty; //修改成訊號量, 可以用於多個執行緒同時訪問 HANDLE sem_stored; //同上 int arr[BUFF_SIZE]; int index; } share_object; unsigned int WINAPI producer_thread( void* lpParameter) { while(1) { WaitForSingleObject(share_object.sem_empty,-1) ; //等待至少一個訊號量 WaitForSingleObject(share_object.mutex,-1); //多個生產者必須互斥 if(share_object.index >= NTIMES){ //到這裡說明全部生產完畢 . //注意先釋放訊號量,否則如果執行緒數大於sem_empty的數量,將發生死鎖 if(producer_gcount == 0){ ++producer_gcount; cout << "producer all done" << endl; } else { cout << " ******* other producer threads *****" << endl; } ReleaseSemaphore(share_object.sem_empty,1,0); ReleaseMutex(share_object.mutex); break; } //到這裡說明繼續生產 share_object.arr[share_object.index++ % BUFF_SIZE] = 1; ReleaseMutex(share_object.mutex); ReleaseSemaphore(share_object.sem_stored,1,0); //通知消費 } return 0; } unsigned int WINAPI consumer_thread( void* lpParameter) { for(int i = 0; i < NTIMES; ++i){ WaitForSingleObject(share_object.sem_stored,-1); WaitForSingleObject(share_object.mutex,-1); share_object.arr[i%BUFF_SIZE] = 0; cout << "consumer modified : " << i << endl; ReleaseMutex(share_object.mutex); ReleaseSemaphore(share_object.sem_empty,1,0); } cout << "consumer done" << endl; return 0; } int main(int argc, char *argv[]) { share_object.mutex = CreateMutex(0,FALSE,0); //注意第3個引數, 最多的數量儘量填寫>=NTIMES; // 生產者 . 初始10個 share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL); // 消費者 . 初始 0 個. 等待生產者通知 . 第三個引數別填寫 最多1個.有可能消費者有問題 //具體情況參考msdn. 畢竟不像linux上的 sem_init ,可以一直漲到很大的數量 share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL); //20個生產者執行緒 HANDLE producer_handles[20]; for(int i = 0; i < 20; ++i) producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0); HANDLE consumer_handle =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0); WaitForMultipleObjects(5,producer_handles,TRUE,-1); WaitForSingleObject(consumer_handle,-1); return 0; }
多個消費者的情況:
const int BUFF_SIZE = 10; //緩衝區大小 const int NTIMES = 50; //總共生產的個數 struct { HANDLE mutex; HANDLE sem_empty; //修改成訊號量, 可以用於多個執行緒同時訪問 HANDLE sem_stored; //同上 int arr[BUFF_SIZE]; int index; //生產者處理的索引 int index_consumer; //消費者處理的索引 bool bEnd; //生產者全部處理完後,通知一下消費們,全自我了斷吧. } share_object; unsigned int WINAPI producer_thread( void* lpParameter) { while(1) { WaitForSingleObject(share_object.sem_empty,-1) ; //等待至少一個訊號量 WaitForSingleObject(share_object.mutex,-1); //多個生產者必須互斥 if(share_object.index >= NTIMES){ if(!share_object.bEnd){ share_object.bEnd = true; //這行千萬別忘了,否則消費者們將一直等待 sem_stored ReleaseSemaphore(share_object.sem_stored,1,0); } ReleaseSemaphore(share_object.sem_empty,1,0); ReleaseMutex(share_object.mutex); break; } //到這裡說明繼續生產 share_object.arr[share_object.index++ % BUFF_SIZE] = 1; ReleaseMutex(share_object.mutex); ReleaseSemaphore(share_object.sem_stored,1,0); //通知消費 } return 0; } unsigned int WINAPI consumer_thread( void* lpParameter) { while(1){ WaitForSingleObject(share_object.sem_stored,-1); WaitForSingleObject(share_object.mutex,-1); if(share_object.index_consumer >= NTIMES){ //到了這裡說明消費者已經完成任務了 ReleaseSemaphore(share_object.sem_stored,1,0); ReleaseMutex(share_object.mutex); break; } share_object.arr[share_object.index_consumer++%BUFF_SIZE] = 0; cout << "consumer modified : " << share_object.index_consumer << endl; ReleaseMutex(share_object.mutex); ReleaseSemaphore(share_object.sem_empty,1,0); } cout << "consumer done" << endl; return 0; } int main(int argc, char *argv[]) { share_object.mutex = CreateMutex(0,FALSE,0); // 生產者 . 初始10個 share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL); // 消費者 . 初始 0 個. 等待生產者通知 share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL); share_object.bEnd = false; HANDLE producer_handles[20] , consumer_handles[10]; for(int i = 0; i < 20; ++i) producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0); for(int i =0 ; i < 10; ++i) consumer_handles[i] =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0); WaitForMultipleObjects(20,producer_handles,TRUE,-1); WaitForMultipleObjects(10,consumer_handles,TRUE,-1); return 0; }