POSIX訊號量-------多個生產者&單個消費者
阿新 • • 發佈:2019-02-07
問題描述
有多個執行緒:多個生產者執行緒和一個消費者執行緒程共享一個初始為空、固定大小為maxbuf的快取(緩衝區)。多個生產者的工作是向緩衝區中存資料,只有緩衝區沒滿時,每次只能選擇一個生產者把訊息放入到緩衝區,否則必須等待,如此反覆; 同時,只有緩衝區不空時,消費者才能從中取出訊息,一次消費一個數據(即將其從快取中移出),否則必須等待。由於緩衝區是臨界資源,它只允許一個生產者放入訊息,或者一個消費者從中取出訊息。
問題分析
在前面的單個生產者&單個消費者問題中,我們使用了三個訊號量來保證生成者與消費者的互斥。
對於生產者:
sem_wait(shared .empty); //empty--
sem_wait(shared.mutex); //獲得互斥鎖
....
sem_post(shared.mutex); //釋放互斥鎖
sem_post(shared.full); //full++
上面的程式碼可以保證每次只有一個生產者進入臨界區,並且與消費者互斥訪問。
對於消費者:
sem_wait(shared.full); //full--
sem_wait(shared.mutex); //獲得互斥鎖
....
sem_post(shared .mutex); //釋放互斥鎖
sem_post(shared.empty); //empty--
上面的程式碼可以保證生產者與消費者之間的互斥。
通過上面的分析,我們發現之前的三個訊號量就可以解決該問題,我們可以嘗試用之前的程式碼框架來解決該問題。
問題解決(基於記憶體的訊號量)
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<semaphore.h>
#define maxbuf 5
#define pthread_n 4
using namespace std;
struct Shared
{
int buf[maxbuf];
int next_in;
sem_t mutex,empty,full;
};
Shared shared;
void *produce(void *arg){
for(;;){
sem_wait(&shared.empty);
sem_wait(&shared.mutex);
if(shared.next_in >= maxbuf){
sem_post(&shared.mutex);
return NULL;
}
if(shared.buf[shared.next_in] == 0){
shared.buf[shared.next_in] = shared.next_in;
printf("Producer %d Successfully produce item %d\n",*((int *)arg),shared.next_in);
}
shared.next_in++;
sem_post(&shared.mutex);
sem_post(&shared.full);
}
}
void *consume(void *arg){
for(int x=0;x<maxbuf;x++){
sem_wait(&shared.full);
sem_wait(&shared.mutex);
if(shared.buf[x]!=0){
printf("Comsumer 0 Successfully consume item %d\n",shared.buf[x]);
shared.buf[x] = 0;
}
sem_post(&shared.mutex);
sem_post(&shared.empty);
}
}
/*const char *m = "/mutex";
const char *e = "/empty";
const char *f = "/full";*/
int main(){
pthread_t p_tid[pthread_n+1];
int count[pthread_n];
shared.next_in = 0;
/* shared.mutex = sem_open(m,O_CREAT | O_EXCL,S_IRUSR | S_IWUSR, 1);
shared.full = sem_open(f,O_CREAT | O_EXCL,S_IRUSR | S_IWUSR, 0);
shared.empty = sem_open(e,O_CREAT | O_EXCL,S_IRUSR | S_IWUSR, maxbuf);*/
sem_init(&shared.mutex,0, 1);
sem_init(&shared.full,0, 0);
sem_init(&shared.empty,0, maxbuf);
for(int x=0;x<pthread_n;x++){
count[x] = x;
pthread_create(&p_tid[x],NULL,&produce,&count[x] );
}
pthread_create(&p_tid[pthread_n],NULL,&consume,NULL);
for(int x=0;x<=pthread_n;x++){
pthread_join(p_tid[x],NULL);
}
sem_destroy(&shared.mutex);
sem_destroy(&shared.full);
sem_destroy(&shared.empty);
}
執行結果:
看起來好像執行正常,沒什麼問題。
我們修改一下建立的執行緒數量:4改為6(生產者數量大於緩衝區數量)
執行之後發現,程式陷入了死鎖。所以原來的框架還是有問題。
修改
原來生產者執行緒中的程式碼:
sem_wait(&shared.empty);
sem_wait(&shared.mutex);
if(shared.next_in >= maxbuf){
sem_post(&shared.mutex);
return NULL;
}
if(shared.buf[shared.next_in] == 0){
shared.buf[shared.next_in] = shared.next_in;
printf("Producer %d Successfully produce item %d\n",*((int *)arg),shared.next_in);
}
shared.next_in++;
sem_post(&shared.mutex);
sem_post(&shared.full);
新增一行程式碼:
sem_wait(&shared.empty);
sem_wait(&shared.mutex);
if(shared.next_in >= maxbuf){
sem_post(&shared.mutex);
sem_post(&shared.empty); //退出前應該將empty的值+1
return NULL;
}
if(shared.buf[shared.next_in] == 0){
shared.buf[shared.next_in] = shared.next_in;
printf("Producer %d Successfully produce item %d\n",*((int *)arg),shared.next_in);
}
shared.next_in++;
sem_post(&shared.mutex);
sem_post(&shared.full);
執行結果:
執行結果正常。
分析
我們來分析一下上面出現死鎖的結果。
死鎖時,shared.next_in的值為5,此時每個生產者執行緒都執行下面的程式碼:
if(shared.next_in >= maxbuf){
sem_post(&shared.mutex);
// sem_post(&shared.empty);
return NULL;
}
此時由於生產者數量大於緩衝區個數,在進入臨界區之前就已經將empty的值減一,但是此時並沒有向快取區生產資料,就會使得出現empty的值小於0的情況,從而造成死鎖。因此返回時,得將empty的值加一。