1. 程式人生 > >同步和互斥——生產者和消費者程序版

同步和互斥——生產者和消費者程序版

一、同步和互斥的概念<?xml:namespace prefix = o />

什麼是同步?什麼是互斥?

同步是一種時序關係。如規定了程序1處理完事情A後,程序2才能處理事情B,經典的同步問題是生產者和消費者間的同步.

互斥描述的是一種獨佔關係.如任一時刻,進城1和程序2中只能有一個寫檔案C.

有人說互斥是一種特殊的同步,同步是一種更為複雜的互斥關係(見" http//topic.csdn.net/t/20020711/17/867228.html"),但我不這麼認為!

首先上面的言論是矛盾的,如果AB互相包含,那A只能等於B,而同步和互斥顯然是不等的。從前面的描述可以看出,同步和互斥是你中有我,我中有你的關係。當然如果你要從更廣義地角度來說互斥是一種特殊的同步,我也沒辦法,畢竟我們的程式正確執行的前提就是同步。

二、生產者和消費者的問題分析

生產者和消費者的解答網路上有多種執行緒版本,但卻沒看到程序版本,所以我就來填補這一“空白”了。PS:使用程序版本的另一個重要原因是,想順便複習下共享記憶體。

我們使用訊號量來同步,用一個整型陣列來當緩衝區。很顯然這兩者都要能夠在各生產者和消費者程序間全域性可見,所以我們用共享記憶體來實現他們。

生產者和消費者問題從易到難有三種。

1.一個生產者和一個消費者,公用一個緩衝區

解決辦法,定義兩個訊號量。

empty:表示緩衝區是否為空,初值為1,生產者用它來判斷緩衝區是否可寫。

full:表示緩衝區是否為滿,初值為0,消費者用它來判斷緩衝區是否可讀。

producer

(生產者)的偽碼:

while(1)

{

P(empty);

寫緩衝區;

V(full);

}

consumer(消費者)的偽碼:

while(1)

{

P(full);

寫緩衝區;

V(empty);

}

2.一個生產者和一個消費者,公用m個環形緩衝區

分析過程與第一種情況類似,直接看偽碼。

producer(生產者)的偽碼:

while(1)

{

P(empty); /* empty初值為m */

寫第in個緩衝區; /* in用來指示當前的第一個可寫的緩衝區的下標,初值設為0 */

in = (in+1)%m;

V(full);

}

consumer(消費者)的偽碼:

while(1)

{

P(full); /* full

初值為0 */

讀第out個緩衝區; /* out用來指示當前的第一個可讀的緩衝區的下標,初值設為0 */

out = (out+1)%m;

V(empty);

}

3.一組生產者和一組消費者,公用m個環形緩衝區

相比第2種情況,我們所要做的是用兩個互斥變數mutex_producermutex_consumer,來實現各生產者間、各消費者間互斥地訪問某個緩衝區。

producer(生產者)的偽碼:

while(1)

{

P(empty); /* empty初值為m */

P(mutex_producer);

寫第in個緩衝區; /* in用來指示當前的第一個可寫的緩衝區的下標,初值設為0 */

in = (in+1)%m;

V(mutex_producer);

V(full);

}

consumer(消費者)的偽碼:

while(1)

{

P(full); /* full初值為0 */

P(mutex_consumer);

讀第out個緩衝區; /* out用來指示當前的第一個可讀的緩衝區的下標,初值設為0 */

out = (out+1)%m;

V(mutex_consumer);

V(empty);

}

三、生產者和消費者程序版的實現

第一種情況很簡單,我們直接來看後兩種情況。

1.一個生產者和一個消費者,公用m個環形緩衝區

檔案:producer_consumer.c

#include<stdio.h>

#include<sys/types.h>

#include<unistd.h>

#include<semaphore.h> /* 提供了訊號量的相關操作 */

#include "error_plp.h" /* 這是我自定義的一個出錯處理函式,具體內容見後 */

#include<sys/mman.h>/* 提供了共享記憶體的相關操作 */

#include<fcntl.h>

#include<sys/stat.h>

#define BUFFER_SIZE 10/* 公用環形緩衝區的大小 */

#define RWRWRW S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH /* 設定建立的檔案的訪問許可權為:使用者、組使用者和其他使用者都可讀可寫*/

int head, tail; /* 即上面提到的outin指標,分別用來指示當前第一個可讀和可寫的緩衝區的下標*/

int value_read, value_write; /* 分別用來儲存讀到的值和要寫的值 */

int *shared_buffer; /* 公用環形緩衝區指標,為了公用,這個指標將指向一個共享記憶體的陣列 */

sem_t *full, *empty; /* 分別指向fullempty這兩個訊號量的指標,同樣地,為了公用,這兩個指標指向的訊號量在共享記憶體中實現 */

void producer(void); /* 生產者所執行的程式碼 */

void consumer(void); /* 消費者所執行的程式碼 */

int main(void)

{

int fd;

pid_t pid;

void *ptr;

int length;

/* 初始化 */

head = 0;

tail = 0;

value_read = 0;

value_write = 0;

/* 計算共享記憶體的長度 */

length = 2*sizeof(sem_t) + BUFFER_SIZE*sizeof(int);

/* shm_open是一個POSIX函式,用來開啟或建立一個與“/shm”關聯的共享記憶體區 */

if((fd = shm_open("/shm", O_RDWR | O_CREAT, RWRWRW)) == -1)

{

err_exit("shm_open error"); /* 出錯提示,可用簡單的printffprintf代替 */

}

if(ftruncate(fd, length) == -1) /* 截短共享記憶體的長度到我們所需要的長度 */

{

err_exit("ftruncate error");

}

if((ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) /* 將共享記憶體對映到程序地址空間 */

{

err_exit("mmap error");

}

/* 共享記憶體的變數佈局依次為:full, empty, shared_buffer */

full = (sem_t *)ptr;

empty =((sem_t *)ptr) + 1;

shared_buffer =(int *)(((sem_t *)ptr) + 2 ) ;

sem_init(full, 1, 0); /* 初始化full0,且程序間共享 */

sem_init(empty, 1, BUFFER_SIZE); /* 初始化emptyBUFFER_SIZE,且程序間共享 */

switch(pid = fork()) /* 生成子程序,因為子程序繼承了父程序的地址空間所以,共享記憶體在父子程序間都可見(這部分地址空間都對映到一個核心區域) */

{

case -1: /* 生成子程序失敗 */

err_exit("fork error");

break;

case 0: /* 子程序 */

producer(); /* 子程序是生產者 */

exit(0);

break;

default:

consumer(); /* 父程序是消費者 */

break;

}

shm_unlink("/shm"); /* 刪除共享記憶體區,程式中基本上保證了子程序先退出,因此父程序中無wait操作且這部操作放在父程序這裡 */

}

/* 生產者寫5次後退出 */

void producer(void)

{

while(value_write < 5) /* 退出條件判定 */

{

sem_wait(empty); /* 是否有空緩衝區,有則佔有,無則被掛起,是原子操作 */

sleep(2); /* 休眠2s,測試時休眠時間可修改或用隨機數代替 */

value_write++;

shared_buffer[tail] = value_write;

printf("write %5d to position %5d/n", value_write, tail+1);

tail= (tail+1)%BUFFER_SIZE; /* 移動寫指標 */

sem_post(full); /* 寫完一個緩衝區,釋放訊號量full(值加1 */

}

}

/* 消費者讀5次後退出 */

void consumer(void)

{

while(value_read < 5) /* 退出條件判定 */

{

sem_wait(full);/* 獲取訊號量 */

sleep(1); * 休眠1s,測試時休眠時間可修改或用隨機數代替 *

value_read = shared_buffer[head];

printf("read %5d from position %5d/n", value_read, head+1);

head = (head+1)%BUFFER_SIZE; /* 移動讀指標 */

sem_post(empty); /* 讀完一個緩衝區,釋放訊號量empty(值加1 */

}

}

檔案:error_plp.h

#ifndef _ERROR_PLP_H

#define _ERROR_PLP_H

#include<stdio.h>

#include<stdarg.h>

/* 以下兩個函式都定義在error_plp.c */

void err_ret(const char *fmt, ...);

void err_exit(const char *fmt, ...);

#endif/* _ERROR_PLP_H */

檔案:error_plp.c

include "error_plp.h"

#include<errno.h>

#include<stdarg.h>

#include<stdio.h>

#include<string.h>

#include<stdlib.h>

#ifdef MAXLINE

#undef MAXLINE

#endif

#define MAXLINE 4096

static void err_doit(const char *fmt, va_list ap);

void err_ret(const char *fmt, ...)

{

va_list ap;

va_start(ap, fmt);

err_doit(fmt, ap);

va_end(ap);

}

void err_exit(const char *fmt, ...)

{

va_list ap;

va_start(ap, fmt);

err_doit(fmt, ap);

va_end(ap);

exit(1);

}

static void err_doit(const char *fmt, va_list ap)

{

char buf[MAXLINE];

int ret;

ret = vsnprintf(buf, MAXLINE, fmt, ap);

if(ret < 0)

{

return;

}

snprintf(buf+strlen(buf), MAXLINE-strlen(buf), ": %s", strerror(errno));

strcat(buf, "/n"); /* snprintf has assured the last character */

fflush(stdout);

fputs(buf, stderr);

fflush(NULL);/* necessary? */ /* 估計在重定向裡面有用 */

}

可以參照原始碼中的註釋來理解程式。要特別說明的是:這裡的訊號亮的操作函式程式碼在librt(實時庫)中,所以我們編譯連結時要加上-lrt選項,並且注意注意將這幾個檔案都放到同一資料夾中。

shell下輸入“gcc –o producer_consumer producer_consumer.c error_plp.c -lrt”生成可執行檔案。

2.一組生產者和一組消費者,公用m個環形緩衝區

處理思路請參考前面的偽碼。我們的原始碼中增加了兩個函式:random_generator,用來生成隨機數,作休眠時間用,以方便測試;process_create用來生成子程序,這些子程序用來做生產者或消費者。

檔案:producer_consumer_n.c

#include<stdio.h>

#include<sys/types.h>

#include<unistd.h>

#include<semaphore.h> /* 提供了訊號量的相關操作 */

#include "error_plp.h" /* 這是我自定義的一個出錯處理函式,具體內容見前面的error_plp.herror_plp.c */

#include<sys/mman.h>/* 提供了共享記憶體的相關操作 */

#include<fcntl.h>

#include<sys/stat.h>

#include<errno.h>

#include<stdlib.h>

#define BUFFER_SIZE 10 /* 公用環形緩衝區的大小 */

#define PRODUCER_NUM 5 /* 生產者程序的個數 */

#define CONSUMER_NUM 5 /* 消費者程序的個數 */

#define RWRWRW S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH/* 設定建立的檔案的訪問許可權為:使用者、組使用者和其他使用者都可讀可寫*/

int *pwrite, *pread; /* 即上面提到的outin指標,分別用來指示當前第一個可讀和可寫的緩衝區的下標*/

int value_read, value_write; /* 分別用來儲存讀到的值和要寫的值 */

int *shared_buffer; /* 公用環形緩衝區指標,為了公用,這個指標將指向一個共享記憶體的陣列 */

sem_t *full, *empty; /* 分別指向fullempty這兩個訊號量的指標,同樣地,為了公用,這兩個指標指向的訊號量在共享記憶體中實現 */

sem_t *mutex_producer, *mutex_consumer; /* 分別用來互斥生產者間以及消費者間的操作 */

extern int random_generator(int start, int end); /* 用來生成start~end間(包括startend,是離散閉區間)的隨機數 */

void producer(void); /* 生產者所執行的程式碼 */

void consumer(void); /* 消費者所執行的程式碼 */

int process_create(pid_t *pid_new, void (*routine)(void)); /* 生成子程序函式。這個函式的介面類似pthread_creatpid_new用來儲存新的子程序的pidroutine是一個函式指標,指向子程序的執行函式 */

int main(void)

{

int i;

int fd;

pid_t pid;

pid_t pid_producer[PRODUCER_NUM], pid_consumer[CONSUMER_NUM]; /* 這兩個陣列分別用來儲存生產者程序和消費者程序的pid */

void *ptr;

int length;

/* 初始化 */

value_read = 0;

value_write = 0;

/* 計算共享記憶體的長度 */

length = 4*sizeof(sem_t) + (BUFFER_SIZE + 2)*sizeof(int);

/* shm_open是一個POSIX函式,用來開啟或建立一個與“/shm”關聯的共享記憶體區 */

if((fd = shm_open("/shm", O_RDWR | O_CREAT, RWRWRW)) == -1)

{

err_exit("shm_open error"); /* 出錯提示,可用簡單的printffprintf代替 */

}

if(ftruncate(fd, length) == -1) /* 截短共享記憶體的長度到我們所需要的長度 */

{

err_exit("ftruncate error");

}

if((ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) /* 將共享記憶體(核心中的一個區域)對映到程序地址空間 */

{

err_exit("mmap error");

}

/* 共享記憶體的變數佈局依次為:full, empty, mutex_producer, mutex_consumer, pwrite, pread, shared_buffer */

full = (sem_t *)ptr;

empty =((sem_t *)ptr) + 1;

mutex_producer = ((sem_t *)ptr) + 2;

mutex_consumer = ((sem_t *)ptr) + 3;

pwrite = (int *)(((sem_t *)ptr) + 4);

pread = (int *)(((sem_t *)ptr) + 4) + 1;

shared_buffer =(int *)(((sem_t *)ptr) + 4 ) + 2 ;

/* 初始化 */

sem_init(full, 1, 0); /* 初始化full0,且程序間共享 */

sem_init(empty, 1, BUFFER_SIZE); /* 初始化emptyBUFFER_SIZE,且程序間共享 */

sem_init(mutex_producer, 1, 1); /* 初始化mutex_producer1,且程序間共享,用來互斥生產者間的操作 */

sem_init(mutex_consumer, 1, 1); /* 初始化mutex_consumer1,且程序間共享,用來互斥消費者間的操作 */

*pwrite = 0; /* 初始化寫指標為0,即從第一個緩衝區開始寫 */

*pread = 0; /* 初始化讀指標為0,即從第一個緩衝區開始讀(當然必須在生產者放入了產品後才能讀) */

for(i = 0; i < PRODUCER_NUM; i++) /* 生成生產者程序 */

{

if(process_create(&pid_producer[i], producer) != 0) /* 生產者程序的執行函式為producer */

{

/* kill(0, signum) */ /* 生成失敗,尚無好的處理辦法 */

}

}

for(i = 0; i < CONSUMER_NUM; i++) /* 生成消費者程序 */

{

if(process_create(&pid_consumer[i], consumer) != 0) * 消費者程序的執行函式為consumer */

{

/* kill(0, signum) */ /* 生成失敗,尚無好的處理辦法 */

}

}

for(i = 0; i < PRODUCER_NUM + CONSUMER_NUM; i++) /* wait處理,避免殭屍程序(zombie) */

{

waitpid(0, NULL, 0);

}

shm_unlink("/shm"); /* 父程序是最後退出的,所以在他這裡刪除共享記憶體區 */

return 0;

}

/* 生產者寫10次後退出 */

void producer(void)

{

while(value_write < 10) /* 判定退出條件 */

{

sem_wait(empty); /* 是否有空緩衝區,有則佔有,無則被掛起,是原子操作 */

sleep(random_generator(1, 5)); /* 休眠一段隨機的時間(1s~5s,包括端點) */

sem_wait(mutex_producer);/* 獲取互斥量,用來訪問pwrite操作 */

value_write++;

shared_buffer[*pwrite] = value_write; /* 注意互斥區操作應儘可能少,把這個語句和後面的列印語句放到互斥區裡面,是為了更準確的檢視測試結果(如果不放到互斥區,則列印的順序是不確定的) */

printf("in pid: %ld, write %5d to position %5d/n", (long)getpid(), value_write, *pwrite+1);

*pwrite= (*pwrite+1)%BUFFER_SIZE; /* 修改寫指標 */

sem_post(mutex_producer); /* 釋放互斥量 */

sem_post(full); /* 寫完一個緩衝區,釋放訊號量full(值加1 */

}

}

/* 消費者寫10次後退出 */

void consumer(void)

{

while(value_read < 10) /* 判定退出條件 */

{

sem_wait(full); /* 是否有可讀的緩衝區,有則佔有,無則被掛起,是原子操作 */

sleep(random_generator(1, 5));/* 休眠一段隨機的時間(1s~5s,包括端點) */

sem_wait(mutex_consumer); /* 獲取互斥量,用來訪問pread */

value_read = shared_buffer[*pread]; /* 注意互斥區操作應儘可能少,把這個語句和後面的列印語句放到互斥區裡面,是為了更準確的檢視測試結果(如果不放到互斥區,則列印的順序是不確定的) */

printf("in pid: %d, read %5d from position %5d/n", (long)getpid(), value_read, *pread+1);

*pread= (*pread+1)%BUFFER_SIZE; /* 修改讀指標 */

sem_post(mutex_consumer); /* 釋放互斥量 */

sem_post(empty); /* 讀完一個緩衝區,釋放訊號量empty(值加1 */

}

}

/* 生成子程序函式。這個函式的介面類似pthread_creat