經典程序同步問題--生產者消費者問題
問題描述:
一群生產者程序在生產產品,並將這些產品提供給消費者去消費。為了使生產者程序與消費者程序能夠併發進行,在兩者之間設定一個具有n個緩衝區的緩衝池,生產者程序將產品放入一個緩衝區中;消費者可以從一個緩衝區取走產品去消費。儘管所有的生產者程序和消費者程序是以異方式執行,但它們必須保持同步:當一個緩衝區為空時不允許消費者去取走產品,當一個緩衝區滿時也不允許生產者去存入產品。
解決方案:
我們這裡利用一個一個數組buffer來表示這個n個緩衝區的緩衝池
用輸入指標和輸出指標+1來表示在緩衝池中存入或取出一個產品
由於這裡的緩衝池是迴圈緩衝的,故應把in和out表示成:in = ( in +1 ) % n (或把out表示為 out = ( out +1 ) % n )
當( in +1) % n= out的時候說明緩衝池滿,in = out 則說明緩衝池空。
在這裡還要引入一個整型的變數counter(初始值0),每當在緩衝區存入或取走一個產品時,counter +1或-1。
那麼問題的關鍵就是,把這個counter作為臨界資源處理,即令生產者程序和消費者程序互斥的訪問它。
首先解釋一下訊號量(Semaphores)機制。這是荷蘭學者Dijkstra在1965年提出的一種有效的程序同步工具。Dijkstra定義的整型訊號量是一個用於表示資源數目的整型量,不同於一般的整型量,它除了初始化以外只能通過兩個標準的原子操作(Atomic Operation)waits(S)和signal(S)來訪問。很長時間以來這兩個操作分別被稱為P、V操作。wait和signal操作描述如下:
wait(S){
while (S <= 0); //無可用資源
S-- ;
}
signal(S){
S-- ;
}
waits(S)和signal(S)是原子操作,執行時不可中斷。即:當一個程序在修改某個訊號量時,沒有其他程序可以對其修改。
解決辦法:
1.利用記錄型訊號量解決問題
上述整型訊號量機制中,只要S<=0就會不斷的測試,沒有遵循“讓權等待”準則,可能導致“忙等”。記錄型訊號量機制採取“讓權等待”,但會出現多個程序等待訪問同一個臨界資源的情況,因此在此機制中,除了一個表示資源數量的整型量value以外,還需要增加一個程序連結串列指標list,用於連結所有等待程序。
記錄型訊號量的資料項描述如下:
1 typedef struct{
2
3 int value ;
4 struct processControlBlock *list ;
5
6 }semaphore;
相應的wait(S)和signa(S)為:
wait(semaphore *S){
S->value -- ;
if( S->value < 0 )
block( S->list ); //資源分配完畢,自我阻塞
}
signal(semaphore *S){
S->value ++ ;
if( S->value <= 0 )
weakup( S->list ); //仍有等待該資源的程序被阻塞,將list中的第一個程序喚醒
}
現在是對生產者-消費者問題解決方案的描述:
假定在生產者和消費者之間的公共緩衝池中具有n個緩衝區,利用互斥訊號量mutex來實現諸程序對緩衝池的互斥使用;利用訊號量empty和full分別表示緩衝池中的空槽數量和滿槽數量。
int in = 0 , out = 0 ;
item buffer[n];
semaphore mutex = 1 ; //訊號量,控制對臨界資源的互斥訪問
semaphore empty = n ; //訊號量,表示緩衝池當前的空槽數目
semaphore full = 0 ; //訊號量,表示緩衝池當前的滿槽數目
//producer
void producer {
do {
item = produce_item();
wait(empty); //空槽數目減一
wait(mutex); //進入臨界區
buffer[in] = insert_item(item); //存入產品
in = ( in +1 ) % n ;
signal(mutex); //退出臨界區
signal(full); //滿槽數目加一
} while(1)
}
//consumer
void consumer{
do {
wait(full); //滿槽數目減一
wait(mutex); //進入臨界區 ,
//注意:兩個P操作的順序不能顛倒
remove_item(item) = buffer[out] ; //取出產品
out = ( in +1 ) % n ;
signal(mutex); //退出臨界區
signal(empty); //空槽數目加一
//V的順序都可以
consumer_item(item);
} while(1)
}
void main() {
cobegin //併發執行
producer();consumer();
coend
}
注意:
1.用於實現互斥的wait(mutex) 和signal(mutex)必須成對出現,而對資源的full和empty的wait和signal也要成對出現,這裡是指分別出現在不同的程式中。
2.應該先進行對資源的wait操作再進行對互斥訊號量的wait操作。
2.利用AND訊號量解決問題
首先還是解釋一下AND型訊號量。上述是針對多個併發程式僅共享一個臨界資源的情況,AND型訊號量則是解決多個併發程式共享多個資源的問題。基本思想是:對若干個臨界資源的分配採取原子操作的方法,要麼把它請求的所有資源都分配到程序,要麼就一個也不分配。這裡用Swait()表示要麼全部分配,要麼全不分配,用Ssignal()表示全部釋放。
Swait() 和Ssignal的描述如下:
Swait(S1,S2,S3,...,Sn){
while (1) {
if( S1>=1 && S2>=1 && ... && Sn>=1){
for( i = 1 ; i <= n ; i++) Si --;
break ;
}else{
將這個程序放進等待佇列,第一個<1的Si作為等待佇列的開始
}
}
}
Ssignal(S1,S2,...,Sn) {
while(1) {
for( i = 1 ; i <= n ; i++) {
Si ++;
清空等待佇列並將其存入就緒佇列
}
}
}
具體實現:
consumer_item(item);
int in = 0 , out = 0 ;
item buffer[n];
semaphore mutex = 1 ; //訊號量,控制對臨界資源的互斥訪問
semaphore empty = n ; //訊號量,表示緩衝池當前的空槽數目
semaphore full = 0 ; //訊號量,表示緩衝池當前的滿槽數目
//producer
void producer {
do {
item = produce_item();
Swait(empty,mutex);
buffer[in] = insert_item(item); //存入產品
in = ( in +1 ) % n ;
signal(mutex,full);
} while(1)
}
//consumer
void consumer {
do {
wait(full,mutex);
remove_item(item) = buffer[out] ; //取出產品
out = ( in +1 ) % n ;
signal(mutex,empty);
consumer_item(item);
} while(1)
}
void main() {
cobegin //併發執行
producer();consumer();
coend
}
3.利用管程解決問題
首先介紹一下管程。代表共享資源的資料結構以及對該資料結構實施操作的一組過程所組成的資源管理程式共同構成了一個作業系統的資源管理模組,稱為管程。
這裡先定義一個管程PC,描述如下:
Monitor PC {
item buffer[N];
int in , out ;
condition notfull , notempty; //條件變數
int count ; //緩衝池中已有的產品數目
public:
//沒滿就往裡存,滿了就阻塞
void put( item x ){
if( count >= N)
cwait( notfull ); //緩衝池滿,阻塞,掛在條件condition的佇列上
buffer[in] = x;
in = ( in +1 ) % n ;
count ++ ;
csignal(notempty); // 喚醒一個阻塞在condition佇列中的程序,佇列空則無操作
}
void get( item x ){
if( count <= 0)
cwait( notempty ); //緩衝池空,阻塞,掛在條件condition的佇列上
x = buffer[out];
out = ( out +1 ) % n ;
count -- ;
csignal(notfull); // 喚醒一個阻塞在condition佇列中的程序,佇列空則無操作
}
{ in = 0, out = 0 , count = 0 ;}
}PC;
具體方案:
//producer
void producer(){
item x;
while(1){
x = produce_item();
PC.put(x);
}
}
//consumer
void consumer(){
item x;
while(1){
PC.get(x);
consumer_item() = x ;
}
}
void main() {
cobegin //併發執行
producer();consumer();
coend
}