程序間同步互斥經典問題(一)生產者-消費者問題
阿新 • • 發佈:2019-02-05
問題描述:
生產者-消費者問題,也叫做快取繫結問題(bounded-buffer),是一個多程序同步問題。
- 即有兩個程序:製造少和消費者,共享一個固定大小的快取
- 製造商的工作是製造一段資料,放進快取,如此重複。
- 消費者一次消費一段資料,從快取中取出。
- 要保證不讓製造商在快取還是滿的時候仍要向內寫資料,不讓消費者試圖從空的快取中取出資料。
問題分析:
- 要避免多個生產商競爭一個空位的情況。
- 要避免生產商和消費者同時睡覺,造成死鎖
- 要避免多個消費者競爭同一段資料的情況
基本思路:
- 首先我們需要定義三個訊號量:
- empty:用來表示空位的數量,初值為n(快取區的規模)
- full:用來表示剩餘商品的數量,初值為0
- mutex:用來表示互斥量,同一時間共享的記憶體只能由一個程序訪問。
- 首先考慮生產商,在多個生產商和多個消費者同時執行的情況下:
- 1.檢查共享記憶體是否滿
- 如果滿,說明無法急需生產,那麼睡覺。
- 否則,p(empty),程序繼續
- 2.每一個生產商在執行操作前申請互斥鎖
- 如果被其他程序上鎖,說明當前共享記憶體區域被其他程序使用,那麼進入等待佇列等待互斥鎖被釋放
- 否則,對互斥鎖上鎖,程序繼續
- 3.生產一個新的商品放入空位,並且釋放互斥鎖,v(full)
- 1.檢查共享記憶體是否滿
- 然後考慮消費者,在多個消費者和多個生產商同時存在的情況下:
- 1.檢查共享的記憶體中資料是否為空
- 如果空,說明無法消費,那麼睡覺
- 否則,p(full),程序繼續
- 2.在執行操作前申請互斥鎖
- 如果被其他程序上鎖,說明當前共享記憶體區域被其他程序使用,那麼進入等待佇列等待互斥鎖被釋放
- 否則,對互斥鎖上鎖,程序繼續
- 3.消費一個商品,並且釋放互斥鎖,v(empty)
- 1.檢查共享的記憶體中資料是否為空
註釋完整的實現方案(C語言)
- init.c : 對共享記憶體的資料進行初始化和清理工作
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include "ipc.h"
int main ( int argc , char * argv[] )
{
int i,producer_pid,consumer_pid,item,shmid;
semaphore mutex,empty,full;//定義互斥量mutex,定義訊號量empty,full;
union semun sem_union;
void* shared_memory = (void*)0;
struct shared_use_st * shared_stuff;
/*
int semget ( key_t key , int num_sems , int sem_flags );
1.第一個引數key是一個用來允許不相關的程序訪問相同訊號量的整數值,
所有的訊號量訊號量是為不同的程式提供一個key來簡介訪問的,
對於每一個訊號量系統生成一個訊號量識別符號。符號量鍵值只可以由semget
獲得,所有其他的訊號量函式所用的訊號量識別符號都是由semget所返回的。
2.num_sems引數是所需要的訊號量數目,這個值通常是1
3.sem_flags引數是一個標記集合
IPC_CREAT與IPC_EXCL的組合來保證我們可以的到一個新的唯一的訊號量
如果得不到會報錯
4.返回值如果是一個整數,這是用於其他訊號量函式的識別符號,
如果失敗,則會返回-1
*/
if ( ( mutex = semget ( (key_t) KEY_MUTEX , 1 , 0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if (( empty = semget( (key_t) KEY_EMPTY , 1 , 0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if (( full = semget((key_t) KEY_FULL , 1 , 0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shmid = shmget ((key_t)KEY_SHM,sizeof (struct shared_use_st),0666|IPC_CREAT))==-1)
{
fprintf ( stderr , "Failerd to create shared memory!");
exit(EXIT_FAILURE);
}
sem_union.val = 1;
if ( semctl(mutex,0,SETVAL,sem_union) == -1 )
{
fprintf ( stderr , "Failed to set semaphore!");
exit(EXIT_FAILURE);
}
sem_union.val = 0;
if ( semctl(full,0,SETVAL,sem_union) == -1 )
{
fprintf ( stderr , "Failed to set semaphore!");
exit(EXIT_FAILURE);
}
sem_union.val = BUFFER_SIZE;
if ( semctl(empty, 0 , SETVAL , sem_union) == -1 )
{
fprintf ( stderr , "Failed to set semaphore!");
exit(EXIT_FAILURE);
}
/*
void* shmat ( int shmid , const void* shmaddr , int shmflg )
連結共享記憶體識別符號為shmid的共享記憶體,連結成功後把共享記憶體區物件
對映到呼叫程序的地址空間,隨後可像本地空間一樣訪問。
shmid 共享記憶體識別符號
shmaddr 指定共享記憶體出現在程序記憶體地址的什麼位置,直接指定為NULL讓核心自己決定
一個合適的地址位置.
SHM_RDONLY 為只讀模式,其他為讀寫模式
*/
if (( shared_memory = shmat( shmid ,(void*)0 , 0)) == (void*)-1 )
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = ( struct shared_use_st * ) shared_memory;
for ( i = 0; i < BUFFER_SIZE ; i ++ )
{
shared_stuff->buffer[i] = 0;
}
shared_stuff->low = 0;
shared_stuff->high = 0;
shared_stuff->cur = 0;
exit(EXIT_SUCCESS);
}
/*
int shmdt ( const void* shmaddr )
與shmat相反,是用來斷開與共享記憶體附加點的地址,進位制本程序訪問瓷片共享記憶體
shmaddr:連結共享記憶體的起始地址
成功返回0,除錯返回-1
*/
- producer.c: 生產商的程序程式碼
#include "ipc.h"
int main ( int argc , char * argv[] )
{
int i,item,shmid;
semaphore mutex,empty,full;
union semun sem_union;
void* shared_memory = (void*)0;
struct shared_use_st *shared_stuff;
if ((mutex = semget((key_t)KEY_MUTEX,1,0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((empty = semget((key_t)KEY_EMPTY,1,0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((full = semget((key_t)KEY_FULL,1,0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shmid=shmget((key_t)KEY_SHM,sizeof(struct shared_use_st),0666|IPC_CREAT)) == -1 )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shared_memory = shmat(shmid,(void*)0,0)) == (void*)-1)
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = ( struct shared_use_st * ) shared_memory;
for ( i = 0 ; i < 30 ; i++ )
{
item = ++(shared_stuff->cur);
sleep(1);
printf ( "Producing item %d\n" , item );
sem_p ( empty );//減少一個空位
sem_p ( mutex );//鎖上互斥鎖
(shared_stuff->buffer)[(shared_stuff->high)] = item;
(shared_stuff->high) = ((shared_stuff->high)+1) % BUFFER_SIZE;
printf ( "Inserting item %d\n" , item );
sem_v ( mutex );
sem_v ( full );
}
if ( shmdt(shared_memory) == -1 )
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
printf ( "Finish!\n" );
getchar();
exit(EXIT_SUCCESS);
}
- consumer.c: 消費者的程序程式碼
#include "ipc.h"
int main ( int argc , char* argv[] )
{
int i,item,shmid;
semaphore mutex,empty,full;
void* shared_memory = ( void* ) 0;
struct shared_use_st* shared_stuff;
if ((mutex=semget((key_t)KEY_MUTEX,1,0666|IPC_CREAT)) == -1 )
{
fprintf (stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((empty=semget((key_t)KEY_EMPTY,1,0666|IPC_CREAT)) == -1 )
{
fprintf (stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((full=semget((key_t)KEY_FULL,1,0666|IPC_CREAT)) == -1 )
{
fprintf (stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shared_memory = shmat(shmid,(void*)0,0) ) ==(void*)-1 )
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = (struct shared_use_st*)shared_memory;
for(i=0;i<30;i++)
{
sem_p(full);
sem_p(mutex);
item = shared_stuff->buffer[shared_stuff->low];
(shared_stuff->buffer)[(shared_stuff->low)]=0;
shared_stuff->low = ((shared_stuff->low)+1)%BUFFER_SIZE;
printf ( "Removing item %d\n" , item );
sem_v(mutex);
sem_v(empty);
printf ( "Consuming item %d\n" ,item );
sleep(2);
}
if ( shmdt(shared_memory) == -1 )
{
fprintf ( stderr , "shmat failed\n" );
exit ( EXIT_FAILURE);
}
printf ( "Finish!\n" );
getchar();
exit(EXIT_SUCCESS);
}