1. 程式人生 > 實用技巧 >課設二 使用IPC機制實現“生產者-過濾者-消費者”問題

課設二 使用IPC機制實現“生產者-過濾者-消費者”問題

@目錄

要求

學習Linux程序間通訊機制,使用訊號量和共享記憶體實現程序同步問題“生產者-過濾者-消費者”問題。具體要求:
1.建立訊號量集,實現同步互斥訊號量。
2.建立共享記憶體,模擬存放產品的公共緩衝池。
3.建立併發程序,實現程序對共享緩衝池的併發操作。生產者生產產品後放入緩衝池,過濾者取出產品,對產品過濾(可自己設計,如對產品值進行+1)再放入緩衝池,消費者將過濾後的產品取走。可建立一個或兩個緩衝池。

知識梳理

生產者-過濾者-消費者問題

生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題(英語:Bounded-buffer problem),是一個多執行緒同步問題的經典案例。該問題描述了兩個共享固定大小緩衝區的執行緒——即所謂的“生產者”和“消費者”——在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。

關鍵在於:生產者放入產品之後才解鎖了消費者消費產品的過程,消費者拿出產品之後才解鎖了生產者生產產品的過程。

訊號量和訊號量集

訊號量(Semaphore),有時被稱為訊號燈,是在多執行緒環境下使用的一種設施,是可以用來保證兩個或多個關鍵程式碼段不被併發呼叫。在進入一個關鍵程式碼段之前,執行緒必須獲取一個訊號量;一旦該關鍵程式碼段完成了,那麼該執行緒必須釋放訊號量。
當利用訊號量機制解決了單個資源的互斥訪問後,我們討論如何控制同時需要多個資源的互斥訪問。訊號量集是指同時需要多個資源時的訊號量操作。

訊號量的pv操作

訊號量是個被保護的量,只有P、V操作和訊號量初始化操作才能訪問和改變它的值。
P操作和V操作定義如下,其中S為訊號量值:

共享記憶體

共享儲存

共享儲存操作適得兩個或兩個以上的程序可以共用一段實體記憶體(一般情況下,兩個程序的資料區是完全獨立的,父程序用fork建立子程序後,子程序會複製父程序資料到自己的資料區)。

(1)建立共享記憶體

include<sys/shm.h>

int shmget(key_t key,size_t size, int permflags);

引數key是共享記憶體的標識,size是共享記憶體段的最小位元組數,permflags是訪問許可權,值的設定同semget一樣。

(2)共享記憶體的控制

include<sys/shm.h>

int shmctl(int shmid, int command, struct shmid_ds *shm_stat);

該函式semctl相似,command可設為IPC_STAT,IPC_SET,IPC_RMID。引數shm_stat指向存放屬性的結構體,具體內容請參考手冊。

(3)共享記憶體的附接和斷開

#include<sys/shm.h>

void *shmat(int shmid, const void *addr, int shmflags);

int shmdt(const void *addr);

由於兩個函式需指出程序地址空間中的地址,因此比較複雜。簡化的方法是將shmat中的地址設為NULL。

**P (S)

{
S=S-1;
若S<0,將該程序狀態置為等待狀態,然後將該程序的PCB插入相應的S訊號量等待 佇列末尾,直到有其他程序在S上執行V操作為止;
}**

V (S)
{
S = S + 1 ;
若S<=0,釋放在S訊號量佇列中等待的一個程序,將其狀態改變為就緒態,並將其插 入就緒佇列;然後,執行本操作的程序繼續執行;
}

程式碼實現



#include<sys/sem.h>
#include<sys/ipc.h>
#include<sys/types.h>
#include<errno.h>
#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>

#include <stddef.h>
#include <sys/shm.h>
#include <string.h>

#define TEXT_SIZE 1024


union union_semun {
int val;
struct semid_ds *buf;
unsigned short *array;
}ctl_arg;

/*struct sembuf {
unsigned short sem_num;
short sem_op;
short sem_flg;
};*/

struct databuf {
int written;
char text[TEXT_SIZE];
};

static int shmid, semid_full, semid_empty, semid_zuse;

void initshm(struct databuf**p) {
//\u521b\u5efa\u5171\u4eab\u5185\u5b58
shmid = shmget((key_t)4321, sizeof(struct databuf), 0600 | IPC_CREAT);
if (shmid == -1) {
printf("shmget failed!\n");
return;
}
//\u8fde\u63a5\u5230\u5f53\u524d\u8fdb\u7a0b\uff0c\u8fd4\u56de\u6307\u5411\u7b2c\u4e00\u4e2a\u5b57\u8282\u7684\u6307\u9488
void *shm = shmat(shmid, NULL, 0);
if (shm == (void *)-1) {
printf("shmat failed!\n");
return;
}
printf("\nmemory attached at %d\n", (int)shm);

shm = *p;

}

void setdata(struct databuf**p) {

}
int initsem(key_t semkey) {
//\u521b\u5efa\u4fe1\u53f7\u91cf\u96c6\uff0c\u5e76\u4e14\u521d\u59cb\u5316
printf("\ninitsem running...");
int tmp = semget(semkey, 1, 0600 | IPC_CREAT | IPC_EXCL);
if (tmp == -1) {
printf("semget create failed!");
return;
}
printf("semid %d created successful", tmp);
return tmp;
}
void remobj(void) {
if (semctl(semid_empty, IPC_RMID, 0) == -1)	printf("\nremctl remobj error\n");
else printf("\nremctl delete success");
if (semctl(semid_full, IPC_RMID, 0) == -1)	printf("\nremctl remobj error\n");
else printf("\nremctl delete success");
}

int P(int semid) {
struct sembuf p_buf;
p_buf.sem_num = 0;
p_buf.sem_op = -1;
p_buf.sem_flg = SEM_UNDO;
if (semop(semid, &p_buf, 1) == -1) {
perror("p(semid) failed");
exit(1);
}
return 1;
}

int V(int semid) {
struct sembuf p_buf;
p_buf.sem_num = 0;
p_buf.sem_op = 1;
p_buf.sem_flg = SEM_UNDO;
if (semop(semid, &p_buf, 1) == -1) {
perror("p(semid) failed");
exit(1);
}
return 1;
}

void producter() {
//while (1) {
//if (P(semid)) {
P(semid_empty);
P(semid_zuse);
struct databuf *p = NULL;
//\u6d4b\u8bd5\uff0c\u653e\u5165\u6570\u636e
void *shm = shmat(shmid, NULL, 0);
if (shm == (void *)-1) {
printf("producter shmat failed!\n");
}
printf("\nproducter get shm scuess!");

p = (struct databuf *)shm;
//printf("\nplease input something:");
strcpy(p->text, "Apple");
printf("\nGet data success!");
sleep(10);
V(semid_zuse);
V(semid_full);
//}
//}
}

void filter() {
while (1) {
printf("\nfilter running...");
sleep(10);
}
}

void consumer() {
/*while(1){

}*/
//\u6d4b\u8bd5\uff0c\u8bfb\u51fa\u6570\u636e
while (1) {
//if (P(semid)) {
P(semid_full);
P(semid_zuse);
struct databuf *p = NULL;

void *shm = shmat(shmid, 0, 0);
if (shm == (void *)-1) {
printf("consumer shmat failed!\n");
}
printf("\ncomsumer get shm scuess!");

p = (struct databuf *)shm;
printf("\nYou wirte is %s", p->text);
V(semid_zuse);
V(semid_empty);

}
}

int main() {
key_t semkey, shmkey;
struct databuf *buf;
initshm(&buf);
semid_full = initsem(116);
semid_empty = initsem(117);
semid_zuse = initsem(118);


pid_t fpid = fork();

if (fpid < 0) {
printf("error in fork!");
}
else if (fpid == 0) {
producter();
}
else {
sleep(5);
pid_t fpid2 = fork();
if (fpid2 < 0)	printf("error in fork!");
else if (fpid2 == 0)	filter();
else	consumer();
}

remobj();

}