1. 程式人生 > >多個消費者多個生產者

多個消費者多個生產者

之前的是單生產消費者 使用一個mutex解決生產者消費者問題

其中第2個例子使用了事件物件, 用於單執行緒的情況. 若在linux 上用訊號量每次解鎖一次即可;

如果要修改成多個的情況,就要使用訊號量.

 

先解決多個生產 1個消費的問題:

const int BUFF_SIZE = 10;  //緩衝區大小
const int NTIMES = 50;     //總共生產的個數
static int producer_gcount = 0;   //僅僅用於顯示死鎖問題
struct
{
    HANDLE mutex;
    HANDLE sem_empty;   //修改成訊號量, 可以用於多個執行緒同時訪問
    HANDLE sem_stored;        //同上
    int arr[BUFF_SIZE];
    int index;
}   share_object;
unsigned int WINAPI producer_thread( void* lpParameter)
{
    while(1)
    {
        WaitForSingleObject(share_object.sem_empty,-1) ;  //等待至少一個訊號量
        WaitForSingleObject(share_object.mutex,-1);          //多個生產者必須互斥
        if(share_object.index >= NTIMES){
            //到這裡說明全部生產完畢 .
            //注意先釋放訊號量,否則如果執行緒數大於sem_empty的數量,將發生死鎖
            if(producer_gcount == 0){
                ++producer_gcount;
                cout << "producer all done" << endl;
            } else {
                cout << " ******* other producer threads *****"  << endl;
            }

            ReleaseSemaphore(share_object.sem_empty,1,0);
            ReleaseMutex(share_object.mutex);

            break;
        }
        //到這裡說明繼續生產
        share_object.arr[share_object.index++ % BUFF_SIZE] = 1;
        ReleaseMutex(share_object.mutex);
        ReleaseSemaphore(share_object.sem_stored,1,0);  //通知消費
    }
     return 0;
}
unsigned int WINAPI consumer_thread( void* lpParameter)
{

    for(int i = 0; i < NTIMES; ++i){
            WaitForSingleObject(share_object.sem_stored,-1);
            WaitForSingleObject(share_object.mutex,-1);
            share_object.arr[i%BUFF_SIZE] = 0;
            cout << "consumer modified : " << i << endl;
            ReleaseMutex(share_object.mutex);
            ReleaseSemaphore(share_object.sem_empty,1,0);
    }
    cout << "consumer done" << endl;
    return 0;
}



int main(int argc, char *argv[])
{

    share_object.mutex = CreateMutex(0,FALSE,0);
    
    //注意第3個引數, 最多的數量儘量填寫>=NTIMES;
    // 生產者 . 初始10個
    share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL);

    // 消費者 . 初始 0 個. 等待生產者通知 . 第三個引數別填寫 最多1個.有可能消費者有問題
    //具體情況參考msdn. 畢竟不像linux上的 sem_init ,可以一直漲到很大的數量
    share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL);

    //20個生產者執行緒 
    HANDLE producer_handles[20];
    for(int i = 0; i < 20;  ++i)
        producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0);

    HANDLE consumer_handle =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0);

    WaitForMultipleObjects(5,producer_handles,TRUE,-1);
    WaitForSingleObject(consumer_handle,-1);


    return 0;
}

 

 

多個消費者的情況:

 

const int BUFF_SIZE = 10;  //緩衝區大小
const int NTIMES = 50;     //總共生產的個數
struct
{
    HANDLE mutex;
    HANDLE sem_empty;   //修改成訊號量, 可以用於多個執行緒同時訪問
    HANDLE sem_stored;        //同上
    int arr[BUFF_SIZE];
    int index;                     //生產者處理的索引
    int index_consumer;    //消費者處理的索引
    bool bEnd;                  //生產者全部處理完後,通知一下消費們,全自我了斷吧.
}   share_object;
unsigned int WINAPI producer_thread( void* lpParameter)
{
    while(1)
    {
        WaitForSingleObject(share_object.sem_empty,-1) ;  //等待至少一個訊號量
        WaitForSingleObject(share_object.mutex,-1);          //多個生產者必須互斥
        if(share_object.index >= NTIMES){
            if(!share_object.bEnd){
                share_object.bEnd = true;
                //這行千萬別忘了,否則消費者們將一直等待 sem_stored
                ReleaseSemaphore(share_object.sem_stored,1,0);
            }

            ReleaseSemaphore(share_object.sem_empty,1,0);
            ReleaseMutex(share_object.mutex);
            break;
        }
        //到這裡說明繼續生產
        share_object.arr[share_object.index++ % BUFF_SIZE] = 1;
        ReleaseMutex(share_object.mutex);
        ReleaseSemaphore(share_object.sem_stored,1,0);  //通知消費
    }
     return 0;
}
unsigned int WINAPI consumer_thread( void* lpParameter)
{
    while(1){
            WaitForSingleObject(share_object.sem_stored,-1);
            WaitForSingleObject(share_object.mutex,-1);

            if(share_object.index_consumer >= NTIMES){
                //到了這裡說明消費者已經完成任務了
                ReleaseSemaphore(share_object.sem_stored,1,0);
                ReleaseMutex(share_object.mutex);
                break;
            }
            share_object.arr[share_object.index_consumer++%BUFF_SIZE] = 0;
            cout << "consumer modified : " << share_object.index_consumer << endl;
            ReleaseMutex(share_object.mutex);
            ReleaseSemaphore(share_object.sem_empty,1,0);
    }
    cout << "consumer done" << endl;
    return 0;
}



int main(int argc, char *argv[])
{

    share_object.mutex = CreateMutex(0,FALSE,0);
    // 生產者 . 初始10個
    share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL);
    // 消費者 . 初始 0 個. 等待生產者通知
    share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL);
    share_object.bEnd = false;

    HANDLE producer_handles[20] , consumer_handles[10];
    for(int i = 0; i < 20;  ++i)
        producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0);
    for(int i =0 ; i < 10; ++i)
        consumer_handles[i] =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0);

    WaitForMultipleObjects(20,producer_handles,TRUE,-1);
    WaitForMultipleObjects(10,consumer_handles,TRUE,-1);



    return 0;
}