同步和互斥——生產者和消費者程序版
一、同步和互斥的概念<?xml:namespace prefix = o />
什麼是同步?什麼是互斥?
同步是一種時序關係。如規定了程序1處理完事情A後,程序2才能處理事情B,經典的同步問題是生產者和消費者間的同步.
互斥描述的是一種獨佔關係.如任一時刻,進城1和程序2中只能有一個寫檔案C.
有人說互斥是一種特殊的同步,同步是一種更為複雜的互斥關係(見" http//topic.csdn.net/t/20020711/17/867228.html"),但我不這麼認為!
首先上面的言論是矛盾的,如果A和B互相包含,那A只能等於B,而同步和互斥顯然是不等的。從前面的描述可以看出,同步和互斥是你中有我,我中有你的關係。當然如果你要從更廣義地角度來說互斥是一種特殊的同步,我也沒辦法,畢竟我們的程式正確執行的前提就是同步。
二、生產者和消費者的問題分析
生產者和消費者的解答網路上有多種執行緒版本,但卻沒看到程序版本,所以我就來填補這一“空白”了。PS:使用程序版本的另一個重要原因是,想順便複習下共享記憶體。
我們使用訊號量來同步,用一個整型陣列來當緩衝區。很顯然這兩者都要能夠在各生產者和消費者程序間全域性可見,所以我們用共享記憶體來實現他們。
生產者和消費者問題從易到難有三種。
1.一個生產者和一個消費者,公用一個緩衝區
解決辦法,定義兩個訊號量。
empty:表示緩衝區是否為空,初值為1,生產者用它來判斷緩衝區是否可寫。
full:表示緩衝區是否為滿,初值為0,消費者用它來判斷緩衝區是否可讀。
producer
while(1)
{
P(empty);
寫緩衝區;
V(full);
}
consumer(消費者)的偽碼:
while(1)
{
P(full);
寫緩衝區;
V(empty);
}
2.一個生產者和一個消費者,公用m個環形緩衝區
分析過程與第一種情況類似,直接看偽碼。
producer(生產者)的偽碼:
while(1)
{
P(empty); /* empty初值為m */
寫第in個緩衝區; /* in用來指示當前的第一個可寫的緩衝區的下標,初值設為0。 */
in = (in+1)%m;
V(full);
}
consumer(消費者)的偽碼:
while(1)
{
P(full); /* full
讀第out個緩衝區; /* out用來指示當前的第一個可讀的緩衝區的下標,初值設為0。 */
out = (out+1)%m;
V(empty);
}
3.一組生產者和一組消費者,公用m個環形緩衝區
相比第2種情況,我們所要做的是用兩個互斥變數mutex_producer和mutex_consumer,來實現各生產者間、各消費者間互斥地訪問某個緩衝區。
producer(生產者)的偽碼:
while(1)
{
P(empty); /* empty初值為m */
P(mutex_producer);
寫第in個緩衝區; /* in用來指示當前的第一個可寫的緩衝區的下標,初值設為0。 */
in = (in+1)%m;
V(mutex_producer);
V(full);
}
consumer(消費者)的偽碼:
while(1)
{
P(full); /* full初值為0 */
P(mutex_consumer);
讀第out個緩衝區; /* out用來指示當前的第一個可讀的緩衝區的下標,初值設為0。 */
out = (out+1)%m;
V(mutex_consumer);
V(empty);
}
三、生產者和消費者程序版的實現
第一種情況很簡單,我們直接來看後兩種情況。
1.一個生產者和一個消費者,公用m個環形緩衝區
檔案:producer_consumer.c
#include<stdio.h>
#include<sys/types.h>
#include<unistd.h>
#include<semaphore.h> /* 提供了訊號量的相關操作 */
#include "error_plp.h" /* 這是我自定義的一個出錯處理函式,具體內容見後 */
#include<sys/mman.h>/* 提供了共享記憶體的相關操作 */
#include<fcntl.h>
#include<sys/stat.h>
#define BUFFER_SIZE 10/* 公用環形緩衝區的大小 */
#define RWRWRW S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH /* 設定建立的檔案的訪問許可權為:使用者、組使用者和其他使用者都可讀可寫*/
int head, tail; /* 即上面提到的out和in指標,分別用來指示當前第一個可讀和可寫的緩衝區的下標*/
int value_read, value_write; /* 分別用來儲存讀到的值和要寫的值 */
int *shared_buffer; /* 公用環形緩衝區指標,為了公用,這個指標將指向一個共享記憶體的陣列 */
sem_t *full, *empty; /* 分別指向full和empty這兩個訊號量的指標,同樣地,為了公用,這兩個指標指向的訊號量在共享記憶體中實現 */
void producer(void); /* 生產者所執行的程式碼 */
void consumer(void); /* 消費者所執行的程式碼 */
int main(void)
{
int fd;
pid_t pid;
void *ptr;
int length;
/* 初始化 */
head = 0;
tail = 0;
value_read = 0;
value_write = 0;
/* 計算共享記憶體的長度 */
length = 2*sizeof(sem_t) + BUFFER_SIZE*sizeof(int);
/* shm_open是一個POSIX函式,用來開啟或建立一個與“/shm”關聯的共享記憶體區 */
if((fd = shm_open("/shm", O_RDWR | O_CREAT, RWRWRW)) == -1)
{
err_exit("shm_open error"); /* 出錯提示,可用簡單的printf或fprintf代替 */
}
if(ftruncate(fd, length) == -1) /* 截短共享記憶體的長度到我們所需要的長度 */
{
err_exit("ftruncate error");
}
if((ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) /* 將共享記憶體對映到程序地址空間 */
{
err_exit("mmap error");
}
/* 共享記憶體的變數佈局依次為:full, empty, shared_buffer */
full = (sem_t *)ptr;
empty =((sem_t *)ptr) + 1;
shared_buffer =(int *)(((sem_t *)ptr) + 2 ) ;
sem_init(full, 1, 0); /* 初始化full為0,且程序間共享 */
sem_init(empty, 1, BUFFER_SIZE); /* 初始化empty為BUFFER_SIZE,且程序間共享 */
switch(pid = fork()) /* 生成子程序,因為子程序繼承了父程序的地址空間所以,共享記憶體在父子程序間都可見(這部分地址空間都對映到一個核心區域) */
{
case -1: /* 生成子程序失敗 */
err_exit("fork error");
break;
case 0: /* 子程序 */
producer(); /* 子程序是生產者 */
exit(0);
break;
default:
consumer(); /* 父程序是消費者 */
break;
}
shm_unlink("/shm"); /* 刪除共享記憶體區,程式中基本上保證了子程序先退出,因此父程序中無wait操作且這部操作放在父程序這裡 */
}
/* 生產者寫5次後退出 */
void producer(void)
{
while(value_write < 5) /* 退出條件判定 */
{
sem_wait(empty); /* 是否有空緩衝區,有則佔有,無則被掛起,是原子操作 */
sleep(2); /* 休眠2s,測試時休眠時間可修改或用隨機數代替 */
value_write++;
shared_buffer[tail] = value_write;
printf("write %5d to position %5d/n", value_write, tail+1);
tail= (tail+1)%BUFFER_SIZE; /* 移動寫指標 */
sem_post(full); /* 寫完一個緩衝區,釋放訊號量full(值加1) */
}
}
/* 消費者讀5次後退出 */
void consumer(void)
{
while(value_read < 5) /* 退出條件判定 */
{
sem_wait(full);/* 獲取訊號量 */
sleep(1); * 休眠1s,測試時休眠時間可修改或用隨機數代替 *
value_read = shared_buffer[head];
printf("read %5d from position %5d/n", value_read, head+1);
head = (head+1)%BUFFER_SIZE; /* 移動讀指標 */
sem_post(empty); /* 讀完一個緩衝區,釋放訊號量empty(值加1) */
}
}
檔案:error_plp.h
#ifndef _ERROR_PLP_H
#define _ERROR_PLP_H
#include<stdio.h>
#include<stdarg.h>
/* 以下兩個函式都定義在error_plp.c中 */
void err_ret(const char *fmt, ...);
void err_exit(const char *fmt, ...);
#endif/* _ERROR_PLP_H */
檔案:error_plp.c
include "error_plp.h"
#include<errno.h>
#include<stdarg.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#ifdef MAXLINE
#undef MAXLINE
#endif
#define MAXLINE 4096
static void err_doit(const char *fmt, va_list ap);
void err_ret(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
err_doit(fmt, ap);
va_end(ap);
}
void err_exit(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
err_doit(fmt, ap);
va_end(ap);
exit(1);
}
static void err_doit(const char *fmt, va_list ap)
{
char buf[MAXLINE];
int ret;
ret = vsnprintf(buf, MAXLINE, fmt, ap);
if(ret < 0)
{
return;
}
snprintf(buf+strlen(buf), MAXLINE-strlen(buf), ": %s", strerror(errno));
strcat(buf, "/n"); /* snprintf has assured the last character */
fflush(stdout);
fputs(buf, stderr);
fflush(NULL);/* necessary? */ /* 估計在重定向裡面有用 */
}
可以參照原始碼中的註釋來理解程式。要特別說明的是:這裡的訊號亮的操作函式程式碼在librt(實時庫)中,所以我們編譯連結時要加上-lrt選項,並且注意注意將這幾個檔案都放到同一資料夾中。
在shell下輸入“gcc –o producer_consumer producer_consumer.c error_plp.c -lrt”生成可執行檔案。
2.一組生產者和一組消費者,公用m個環形緩衝區
處理思路請參考前面的偽碼。我們的原始碼中增加了兩個函式:random_generator,用來生成隨機數,作休眠時間用,以方便測試;process_create用來生成子程序,這些子程序用來做生產者或消費者。
檔案:producer_consumer_n.c
#include<stdio.h>
#include<sys/types.h>
#include<unistd.h>
#include<semaphore.h> /* 提供了訊號量的相關操作 */
#include "error_plp.h" /* 這是我自定義的一個出錯處理函式,具體內容見前面的error_plp.h和error_plp.c */
#include<sys/mman.h>/* 提供了共享記憶體的相關操作 */
#include<fcntl.h>
#include<sys/stat.h>
#include<errno.h>
#include<stdlib.h>
#define BUFFER_SIZE 10 /* 公用環形緩衝區的大小 */
#define PRODUCER_NUM 5 /* 生產者程序的個數 */
#define CONSUMER_NUM 5 /* 消費者程序的個數 */
#define RWRWRW S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH/* 設定建立的檔案的訪問許可權為:使用者、組使用者和其他使用者都可讀可寫*/
int *pwrite, *pread; /* 即上面提到的out和in指標,分別用來指示當前第一個可讀和可寫的緩衝區的下標*/
int value_read, value_write; /* 分別用來儲存讀到的值和要寫的值 */
int *shared_buffer; /* 公用環形緩衝區指標,為了公用,這個指標將指向一個共享記憶體的陣列 */
sem_t *full, *empty; /* 分別指向full和empty這兩個訊號量的指標,同樣地,為了公用,這兩個指標指向的訊號量在共享記憶體中實現 */
sem_t *mutex_producer, *mutex_consumer; /* 分別用來互斥生產者間以及消費者間的操作 */
extern int random_generator(int start, int end); /* 用來生成start~end間(包括start和end,是離散閉區間)的隨機數 */
void producer(void); /* 生產者所執行的程式碼 */
void consumer(void); /* 消費者所執行的程式碼 */
int process_create(pid_t *pid_new, void (*routine)(void)); /* 生成子程序函式。這個函式的介面類似pthread_creat,pid_new用來儲存新的子程序的pid;routine是一個函式指標,指向子程序的執行函式 */
int main(void)
{
int i;
int fd;
pid_t pid;
pid_t pid_producer[PRODUCER_NUM], pid_consumer[CONSUMER_NUM]; /* 這兩個陣列分別用來儲存生產者程序和消費者程序的pid */
void *ptr;
int length;
/* 初始化 */
value_read = 0;
value_write = 0;
/* 計算共享記憶體的長度 */
length = 4*sizeof(sem_t) + (BUFFER_SIZE + 2)*sizeof(int);
/* shm_open是一個POSIX函式,用來開啟或建立一個與“/shm”關聯的共享記憶體區 */
if((fd = shm_open("/shm", O_RDWR | O_CREAT, RWRWRW)) == -1)
{
err_exit("shm_open error"); /* 出錯提示,可用簡單的printf或fprintf代替 */
}
if(ftruncate(fd, length) == -1) /* 截短共享記憶體的長度到我們所需要的長度 */
{
err_exit("ftruncate error");
}
if((ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) /* 將共享記憶體(核心中的一個區域)對映到程序地址空間 */
{
err_exit("mmap error");
}
/* 共享記憶體的變數佈局依次為:full, empty, mutex_producer, mutex_consumer, pwrite, pread, shared_buffer */
full = (sem_t *)ptr;
empty =((sem_t *)ptr) + 1;
mutex_producer = ((sem_t *)ptr) + 2;
mutex_consumer = ((sem_t *)ptr) + 3;
pwrite = (int *)(((sem_t *)ptr) + 4);
pread = (int *)(((sem_t *)ptr) + 4) + 1;
shared_buffer =(int *)(((sem_t *)ptr) + 4 ) + 2 ;
/* 初始化 */
sem_init(full, 1, 0); /* 初始化full為0,且程序間共享 */
sem_init(empty, 1, BUFFER_SIZE); /* 初始化empty為BUFFER_SIZE,且程序間共享 */
sem_init(mutex_producer, 1, 1); /* 初始化mutex_producer為1,且程序間共享,用來互斥生產者間的操作 */
sem_init(mutex_consumer, 1, 1); /* 初始化mutex_consumer為1,且程序間共享,用來互斥消費者間的操作 */
*pwrite = 0; /* 初始化寫指標為0,即從第一個緩衝區開始寫 */
*pread = 0; /* 初始化讀指標為0,即從第一個緩衝區開始讀(當然必須在生產者放入了產品後才能讀) */
for(i = 0; i < PRODUCER_NUM; i++) /* 生成生產者程序 */
{
if(process_create(&pid_producer[i], producer) != 0) /* 生產者程序的執行函式為producer */
{
/* kill(0, signum) */ /* 生成失敗,尚無好的處理辦法 */
}
}
for(i = 0; i < CONSUMER_NUM; i++) /* 生成消費者程序 */
{
if(process_create(&pid_consumer[i], consumer) != 0) * 消費者程序的執行函式為consumer */
{
/* kill(0, signum) */ /* 生成失敗,尚無好的處理辦法 */
}
}
for(i = 0; i < PRODUCER_NUM + CONSUMER_NUM; i++) /* wait處理,避免殭屍程序(zombie) */
{
waitpid(0, NULL, 0);
}
shm_unlink("/shm"); /* 父程序是最後退出的,所以在他這裡刪除共享記憶體區 */
return 0;
}
/* 生產者寫10次後退出 */
void producer(void)
{
while(value_write < 10) /* 判定退出條件 */
{
sem_wait(empty); /* 是否有空緩衝區,有則佔有,無則被掛起,是原子操作 */
sleep(random_generator(1, 5)); /* 休眠一段隨機的時間(1s~5s,包括端點) */
sem_wait(mutex_producer);/* 獲取互斥量,用來訪問pwrite操作 */
value_write++;
shared_buffer[*pwrite] = value_write; /* 注意互斥區操作應儘可能少,把這個語句和後面的列印語句放到互斥區裡面,是為了更準確的檢視測試結果(如果不放到互斥區,則列印的順序是不確定的) */
printf("in pid: %ld, write %5d to position %5d/n", (long)getpid(), value_write, *pwrite+1);
*pwrite= (*pwrite+1)%BUFFER_SIZE; /* 修改寫指標 */
sem_post(mutex_producer); /* 釋放互斥量 */
sem_post(full); /* 寫完一個緩衝區,釋放訊號量full(值加1) */
}
}
/* 消費者寫10次後退出 */
void consumer(void)
{
while(value_read < 10) /* 判定退出條件 */
{
sem_wait(full); /* 是否有可讀的緩衝區,有則佔有,無則被掛起,是原子操作 */
sleep(random_generator(1, 5));/* 休眠一段隨機的時間(1s~5s,包括端點) */
sem_wait(mutex_consumer); /* 獲取互斥量,用來訪問pread */
value_read = shared_buffer[*pread]; /* 注意互斥區操作應儘可能少,把這個語句和後面的列印語句放到互斥區裡面,是為了更準確的檢視測試結果(如果不放到互斥區,則列印的順序是不確定的) */
printf("in pid: %d, read %5d from position %5d/n", (long)getpid(), value_read, *pread+1);
*pread= (*pread+1)%BUFFER_SIZE; /* 修改讀指標 */
sem_post(mutex_consumer); /* 釋放互斥量 */
sem_post(empty); /* 讀完一個緩衝區,釋放訊號量empty(值加1) */
}
}
/* 生成子程序函式。這個函式的介面類似pthread_creat,