多執行緒實驗之”生產者——消費者“問題
阿新 • • 發佈:2019-02-19
一 例項
參考書籍《從實踐中學linux應用程式開發》
/*producer-customer.c*/ #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <errno.h> #include <semaphore.h> #include <unistd.h> #include <stdio.h> #include <fcntl.h> #include <sys/ipc.h> #define MYFIFO "myfifo" /*緩衝區有名管道的名字*/ #define BUFF_SIZE 3 /*緩衝區的單元數*/ #define UNIT_SIZE 6 /*每個單元的大小*/ #define DELAY_TIME_LEVELS 5.0 /*小任務之間的最大時間間隔*/ #define RUN_TIME 30 /*執行時間*/ sem_t mutex; //mutex訊號量用於解決兩個執行緒之間的互斥問題 sem_t full; //表示有界緩衝區中的非空單元數 sem_t avail; //表示有界緩衝區中的空單元數 int fd; time_t end_time; /*生產者執行緒*/ void *producer(void *arg) { int delay_time=0; int real_write=0; int temp; while(time(NULL) < end_time) { /*RAND_MAX 是 <stdlib.h> 中偽隨機數生成函式 rand 所能返回的最大數值*/ delay_time=(int)(rand()* DELAY_TIME_LEVELS/(RAND_MAX))+1; sleep(delay_time); /*P操作訊號量avail和mutex*/ /*P操作使sem減1*/ temp=sem_wait(&avail); temp+=sem_wait(&mutex); printf("\nproducer:delay=%d\n",delay_time); if(temp !=0 ) { printf("sem_wait error\n"); break; } /*生產者寫入資料*/ if( (real_write=write(fd,"hello",UNIT_SIZE)) == -1 ) { /*寫入資料失敗*/ /*EAGAIN : Resource temporarily unavailable*/ if(errno==EAGAIN) { printf("the fifo has not been read yet,please try later\n"); } } else { printf("write %d to the fifo\n",real_write); } /*V操作訊號量full和mutex*/ /*V操作相當於加1*/ sem_post(&full); sem_post(&mutex); } pthread_exit(NULL); } /*消費者程序*/ void *customer(void *arg) { int delay_time=0; unsigned char read_buffer[UNIT_SIZE]; int real_read; while(time(NULL)< end_time) { /*RAND_MAX 是 <stdlib.h> 中偽隨機數生成函式 rand 所能返回的最大數值*/ delay_time=(int)(rand()* DELAY_TIME_LEVELS/(RAND_MAX)/2.0)+1; sleep(delay_time); /*P操作訊號量full和mutex*/ /*P操作相當於加1*/ sem_wait(&full); sem_wait(&mutex); printf("\ncustomer:delay=%d\n",delay_time); memset(read_buffer,0,UNIT_SIZE); /*消費者讀取資料*/ if((real_read=read(fd,read_buffer,UNIT_SIZE)) == -1) { /*讀取資料失敗*/ if(errno == EAGAIN) { printf("no data yet\n"); } } printf("read %s from fifo\n",read_buffer); /*V操作對avail和mutex*/ sem_post(&avail); sem_post(&mutex); } pthread_exit(NULL); } int main() { pthread_t thrd_prd_id,thrd_cst_id; pthread_t mon_th_id; int ret; srand(time(NULL)); end_time=time(NULL)+RUN_TIME; /*建立有名管道*/ /***************** 函式原型:int mkfifo(const char * pathname, mode_t mode); 函式引數:pathname 要建立的管道 mode O_RDONLY 讀管道 O_WRONLY 寫管道 O_RDWR 讀寫管道 O_CREAT 如果管道不存在,那麼建立一個新的檔案 並用第3個引數為其設定許可權 O_EXCL 如果使用O_CREAT時檔案存在,那麼返回錯誤資訊。 這個引數可以測試檔案是否存在 *******************/ if( ((mkfifo(MYFIFO,O_EXCL|O_CREAT)) < 0) && (errno != EEXIST)) { printf("cannot creat fifo\n"); return errno; } else { printf("creat fifo success\n"); } /*開啟管道*/ fd=open(MYFIFO,O_RDWR); if(fd==-1) { printf("open fifo error,fd=%d\n",fd); return fd; } else { printf("open fifo success,fd=%d\n",fd); } /*初始化互斥訊號量為1*/ /************************ 函式原型:int sem_init(sem_t *sem,int pshared,unsigned int value); 函式引數:sem 訊號量指標 pshared 決定訊號量能否在幾個程序間共享 !!!由於目前linux還沒有實現程序間共享訊號量, 所以這個值只能是0,表示這個訊號量是當前程序的區域性訊號量 value 訊號量初始化的值 ***************************/ ret=sem_init(&mutex,0,1); ret+=sem_init(&full,0,0); ret+=sem_init(&avail,0,BUFF_SIZE); if(ret != 0) { printf("any semaphore initialization failed\n"); return ret; } else { printf("any semaphore initialization success\n"); } /*建立兩個執行緒*/ ret=pthread_create(&thrd_prd_id,NULL,producer,NULL); if(ret!=0) { printf("creat producer thread error\n"); return ret; } else { printf("creat producer thread success\n"); } ret=pthread_create(&thrd_cst_id,NULL,customer,NULL); if(ret!=0) { printf("creat customer thread error\n"); return ret; } else { printf("creat customer thread success\n"); } /*等待執行緒結束*/ pthread_join(thrd_prd_id,NULL); pthread_join(thrd_cst_id,NULL); /*關閉檔案*/ close(fd); unlink(MYFIFO); return 0; }
執行結果