1. 程式人生 > >POSIX訊號量-------多個生產者&單個消費者

POSIX訊號量-------多個生產者&單個消費者

問題描述

有多個執行緒:多個生產者執行緒和一個消費者執行緒程共享一個初始為空、固定大小為maxbuf的快取(緩衝區)。多個生產者的工作是向緩衝區中存資料,只有緩衝區沒滿時,每次只能選擇一個生產者把訊息放入到緩衝區,否則必須等待,如此反覆; 同時,只有緩衝區不空時,消費者才能從中取出訊息,一次消費一個數據(即將其從快取中移出),否則必須等待。由於緩衝區是臨界資源,它只允許一個生產者放入訊息,或者一個消費者從中取出訊息。

問題分析

在前面的單個生產者&單個消費者問題中,我們使用了三個訊號量來保證生成者與消費者的互斥。

對於生產者:

         sem_wait(shared
.empty); //empty-- sem_wait(shared.mutex); //獲得互斥鎖 .... sem_post(shared.mutex); //釋放互斥鎖 sem_post(shared.full); //full++

上面的程式碼可以保證每次只有一個生產者進入臨界區,並且與消費者互斥訪問。

對於消費者:

         sem_wait(shared.full);  //full--
             sem_wait(shared.mutex); //獲得互斥鎖
                 ....
             sem_post(shared
.mutex); //釋放互斥鎖 sem_post(shared.empty); //empty--

上面的程式碼可以保證生產者與消費者之間的互斥。

通過上面的分析,我們發現之前的三個訊號量就可以解決該問題,我們可以嘗試用之前的程式碼框架來解決該問題。

問題解決(基於記憶體的訊號量)

#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<semaphore.h>
#define maxbuf 5
#define pthread_n 4 using namespace std; struct Shared { int buf[maxbuf]; int next_in; sem_t mutex,empty,full; }; Shared shared; void *produce(void *arg){ for(;;){ sem_wait(&shared.empty); sem_wait(&shared.mutex); if(shared.next_in >= maxbuf){ sem_post(&shared.mutex); return NULL; } if(shared.buf[shared.next_in] == 0){ shared.buf[shared.next_in] = shared.next_in; printf("Producer %d Successfully produce item %d\n",*((int *)arg),shared.next_in); } shared.next_in++; sem_post(&shared.mutex); sem_post(&shared.full); } } void *consume(void *arg){ for(int x=0;x<maxbuf;x++){ sem_wait(&shared.full); sem_wait(&shared.mutex); if(shared.buf[x]!=0){ printf("Comsumer 0 Successfully consume item %d\n",shared.buf[x]); shared.buf[x] = 0; } sem_post(&shared.mutex); sem_post(&shared.empty); } } /*const char *m = "/mutex"; const char *e = "/empty"; const char *f = "/full";*/ int main(){ pthread_t p_tid[pthread_n+1]; int count[pthread_n]; shared.next_in = 0; /* shared.mutex = sem_open(m,O_CREAT | O_EXCL,S_IRUSR | S_IWUSR, 1); shared.full = sem_open(f,O_CREAT | O_EXCL,S_IRUSR | S_IWUSR, 0); shared.empty = sem_open(e,O_CREAT | O_EXCL,S_IRUSR | S_IWUSR, maxbuf);*/ sem_init(&shared.mutex,0, 1); sem_init(&shared.full,0, 0); sem_init(&shared.empty,0, maxbuf); for(int x=0;x<pthread_n;x++){ count[x] = x; pthread_create(&p_tid[x],NULL,&produce,&count[x] ); } pthread_create(&p_tid[pthread_n],NULL,&consume,NULL); for(int x=0;x<=pthread_n;x++){ pthread_join(p_tid[x],NULL); } sem_destroy(&shared.mutex); sem_destroy(&shared.full); sem_destroy(&shared.empty); }

執行結果:

這裡寫圖片描述

看起來好像執行正常,沒什麼問題。

我們修改一下建立的執行緒數量:4改為6(生產者數量大於緩衝區數量)

這裡寫圖片描述

執行之後發現,程式陷入了死鎖。所以原來的框架還是有問題。

修改

原來生產者執行緒中的程式碼:

         sem_wait(&shared.empty);
             sem_wait(&shared.mutex);
                 if(shared.next_in >= maxbuf){
                    sem_post(&shared.mutex);
                    return NULL;
                  }
                 if(shared.buf[shared.next_in] == 0){
                    shared.buf[shared.next_in] = shared.next_in;
                    printf("Producer %d Successfully produce item %d\n",*((int *)arg),shared.next_in);
                 }
                 shared.next_in++;
             sem_post(&shared.mutex);
         sem_post(&shared.full);

新增一行程式碼:

      sem_wait(&shared.empty);
             sem_wait(&shared.mutex);
                 if(shared.next_in >= maxbuf){
                    sem_post(&shared.mutex);
                    sem_post(&shared.empty); //退出前應該將empty的值+1
                    return NULL;
                  }
                 if(shared.buf[shared.next_in] == 0){
                    shared.buf[shared.next_in] = shared.next_in;
                    printf("Producer %d Successfully produce item %d\n",*((int *)arg),shared.next_in);
                 }
                 shared.next_in++;
             sem_post(&shared.mutex);
         sem_post(&shared.full);

執行結果:

這裡寫圖片描述

執行結果正常。

分析

我們來分析一下上面出現死鎖的結果。
這裡寫圖片描述

死鎖時,shared.next_in的值為5,此時每個生產者執行緒都執行下面的程式碼:

                if(shared.next_in >= maxbuf){
                    sem_post(&shared.mutex);
                 //   sem_post(&shared.empty);
                    return NULL;
                  }

此時由於生產者數量大於緩衝區個數,在進入臨界區之前就已經將empty的值減一,但是此時並沒有向快取區生產資料,就會使得出現empty的值小於0的情況,從而造成死鎖。因此返回時,得將empty的值加一。