1. 程式人生 > >程序間同步互斥經典問題(一)生產者-消費者問題

程序間同步互斥經典問題(一)生產者-消費者問題

問題描述:

生產者-消費者問題,也叫做快取繫結問題(bounded-buffer),是一個多程序同步問題。

  • 即有兩個程序:製造少和消費者,共享一個固定大小的快取
  • 製造商的工作是製造一段資料,放進快取,如此重複。
  • 消費者一次消費一段資料,從快取中取出。
  • 要保證不讓製造商在快取還是滿的時候仍要向內寫資料,不讓消費者試圖從空的快取中取出資料。

問題分析:

  1. 要避免多個生產商競爭一個空位的情況。
  2. 要避免生產商和消費者同時睡覺,造成死鎖
  3. 要避免多個消費者競爭同一段資料的情況

基本思路:

  • 首先我們需要定義三個訊號量:
    • empty:用來表示空位的數量,初值為n(快取區的規模)
    • full:用來表示剩餘商品的數量,初值為0
    • mutex:用來表示互斥量,同一時間共享的記憶體只能由一個程序訪問。
  • 首先考慮生產商,在多個生產商和多個消費者同時執行的情況下:
    • 1.檢查共享記憶體是否滿
      • 如果滿,說明無法急需生產,那麼睡覺。
      • 否則,p(empty),程序繼續
    • 2.每一個生產商在執行操作前申請互斥鎖
      • 如果被其他程序上鎖,說明當前共享記憶體區域被其他程序使用,那麼進入等待佇列等待互斥鎖被釋放
      • 否則,對互斥鎖上鎖,程序繼續
    • 3.生產一個新的商品放入空位,並且釋放互斥鎖,v(full)
  • 然後考慮消費者,在多個消費者和多個生產商同時存在的情況下:
    • 1.檢查共享的記憶體中資料是否為空
      • 如果空,說明無法消費,那麼睡覺
      • 否則,p(full),程序繼續
    • 2.在執行操作前申請互斥鎖
      • 如果被其他程序上鎖,說明當前共享記憶體區域被其他程序使用,那麼進入等待佇列等待互斥鎖被釋放
      • 否則,對互斥鎖上鎖,程序繼續
    • 3.消費一個商品,並且釋放互斥鎖,v(empty)

註釋完整的實現方案(C語言)

  • init.c : 對共享記憶體的資料進行初始化和清理工作
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h> #include <sys/shm.h> #include <sys/msg.h> #include "ipc.h" int main ( int argc , char * argv[] ) { int i,producer_pid,consumer_pid,item,shmid; semaphore mutex,empty,full;//定義互斥量mutex,定義訊號量empty,full; union semun sem_union; void* shared_memory = (void*)0; struct shared_use_st * shared_stuff; /* int semget ( key_t key , int num_sems , int sem_flags ); 1.第一個引數key是一個用來允許不相關的程序訪問相同訊號量的整數值, 所有的訊號量訊號量是為不同的程式提供一個key來簡介訪問的, 對於每一個訊號量系統生成一個訊號量識別符號。符號量鍵值只可以由semget 獲得,所有其他的訊號量函式所用的訊號量識別符號都是由semget所返回的。 2.num_sems引數是所需要的訊號量數目,這個值通常是1 3.sem_flags引數是一個標記集合 IPC_CREAT與IPC_EXCL的組合來保證我們可以的到一個新的唯一的訊號量 如果得不到會報錯 4.返回值如果是一個整數,這是用於其他訊號量函式的識別符號, 如果失敗,則會返回-1 */ if ( ( mutex = semget ( (key_t) KEY_MUTEX , 1 , 0666|IPC_CREAT)) == -1 ) { fprintf ( stderr , "Failed to create semaphore!"); exit(EXIT_FAILURE); } if (( empty = semget( (key_t) KEY_EMPTY , 1 , 0666|IPC_CREAT)) == -1 ) { fprintf ( stderr , "Failed to create semaphore!"); exit(EXIT_FAILURE); } if (( full = semget((key_t) KEY_FULL , 1 , 0666|IPC_CREAT)) == -1 ) { fprintf ( stderr , "Failed to create semaphore!"); exit(EXIT_FAILURE); } if ((shmid = shmget ((key_t)KEY_SHM,sizeof (struct shared_use_st),0666|IPC_CREAT))==-1) { fprintf ( stderr , "Failerd to create shared memory!"); exit(EXIT_FAILURE); } sem_union.val = 1; if ( semctl(mutex,0,SETVAL,sem_union) == -1 ) { fprintf ( stderr , "Failed to set semaphore!"); exit(EXIT_FAILURE); } sem_union.val = 0; if ( semctl(full,0,SETVAL,sem_union) == -1 ) { fprintf ( stderr , "Failed to set semaphore!"); exit(EXIT_FAILURE); } sem_union.val = BUFFER_SIZE; if ( semctl(empty, 0 , SETVAL , sem_union) == -1 ) { fprintf ( stderr , "Failed to set semaphore!"); exit(EXIT_FAILURE); } /* void* shmat ( int shmid , const void* shmaddr , int shmflg ) 連結共享記憶體識別符號為shmid的共享記憶體,連結成功後把共享記憶體區物件 對映到呼叫程序的地址空間,隨後可像本地空間一樣訪問。 shmid 共享記憶體識別符號 shmaddr 指定共享記憶體出現在程序記憶體地址的什麼位置,直接指定為NULL讓核心自己決定 一個合適的地址位置. SHM_RDONLY 為只讀模式,其他為讀寫模式 */ if (( shared_memory = shmat( shmid ,(void*)0 , 0)) == (void*)-1 ) { fprintf ( stderr , "shmat failed\n"); exit(EXIT_FAILURE); } shared_stuff = ( struct shared_use_st * ) shared_memory; for ( i = 0; i < BUFFER_SIZE ; i ++ ) { shared_stuff->buffer[i] = 0; } shared_stuff->low = 0; shared_stuff->high = 0; shared_stuff->cur = 0; exit(EXIT_SUCCESS); } /* int shmdt ( const void* shmaddr ) 與shmat相反,是用來斷開與共享記憶體附加點的地址,進位制本程序訪問瓷片共享記憶體 shmaddr:連結共享記憶體的起始地址 成功返回0,除錯返回-1 */
  • producer.c: 生產商的程序程式碼
#include "ipc.h"


int main ( int argc , char * argv[] )
{
    int i,item,shmid;
    semaphore  mutex,empty,full;
    union semun sem_union;
    void* shared_memory = (void*)0;
    struct shared_use_st *shared_stuff;

    if ((mutex = semget((key_t)KEY_MUTEX,1,0666|IPC_CREAT)) == -1 )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((empty = semget((key_t)KEY_EMPTY,1,0666|IPC_CREAT)) == -1 )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((full = semget((key_t)KEY_FULL,1,0666|IPC_CREAT)) == -1 )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((shmid=shmget((key_t)KEY_SHM,sizeof(struct shared_use_st),0666|IPC_CREAT)) == -1 )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((shared_memory = shmat(shmid,(void*)0,0)) == (void*)-1)
    {
        fprintf ( stderr , "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    shared_stuff = ( struct shared_use_st * ) shared_memory;
    for ( i = 0 ; i < 30 ; i++ )
    {
        item = ++(shared_stuff->cur);
        sleep(1);
        printf ( "Producing item %d\n" , item );
        sem_p ( empty );//減少一個空位
        sem_p ( mutex );//鎖上互斥鎖
        (shared_stuff->buffer)[(shared_stuff->high)] = item;
        (shared_stuff->high) = ((shared_stuff->high)+1) % BUFFER_SIZE;
        printf ( "Inserting item %d\n" , item );
        sem_v ( mutex );
        sem_v ( full );
    }

    if ( shmdt(shared_memory) == -1 )
    {
        fprintf ( stderr , "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    printf ( "Finish!\n" );
    getchar();
    exit(EXIT_SUCCESS);
}
  • consumer.c: 消費者的程序程式碼
#include "ipc.h"

int main ( int argc , char* argv[] )
{
    int i,item,shmid;
    semaphore mutex,empty,full;
    void* shared_memory = ( void* ) 0;
    struct shared_use_st* shared_stuff;

    if ((mutex=semget((key_t)KEY_MUTEX,1,0666|IPC_CREAT)) == -1 )
    {
        fprintf (stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((empty=semget((key_t)KEY_EMPTY,1,0666|IPC_CREAT)) == -1 )
    {
        fprintf (stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((full=semget((key_t)KEY_FULL,1,0666|IPC_CREAT)) == -1 )
    {
        fprintf (stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((shared_memory = shmat(shmid,(void*)0,0) ) ==(void*)-1 )
    {
        fprintf ( stderr , "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    shared_stuff = (struct shared_use_st*)shared_memory;
    for(i=0;i<30;i++)
    {
        sem_p(full);
        sem_p(mutex);
        item = shared_stuff->buffer[shared_stuff->low];
        (shared_stuff->buffer)[(shared_stuff->low)]=0;
        shared_stuff->low = ((shared_stuff->low)+1)%BUFFER_SIZE;
        printf ( "Removing item %d\n" , item );
        sem_v(mutex);
        sem_v(empty);
        printf ( "Consuming item %d\n" ,item );
        sleep(2);
    }

    if ( shmdt(shared_memory) == -1 ) 
    {
        fprintf ( stderr , "shmat failed\n" );
        exit ( EXIT_FAILURE);
    }
    printf ( "Finish!\n" );
    getchar();
    exit(EXIT_SUCCESS);
}