執行緒同步——生產者消費者問題
阿新 • • 發佈:2019-01-03
/*buffer.h*/
typedef int buffer_item;
#define BUFFER_SIZE 5
#include "buffer.h" #include <stdlib.h> #include <stdio.h> #include <time.h> #include <pthread.h> #include <semaphore.h> #include <string.h> #define TRUE 1 /* the buffer */ buffer_item buffer[BUFFER_SIZE]; pthread_mutex_t mutex; sem_t empty,full; int insert_item(buffer_item item){ /* insert item into buffer return 0 if successful, otherwise return -1 indicating an error condition */ sem_wait(&empty); pthread_mutex_lock(&mutex); int i; for(i = 0;i < BUFFER_SIZE;i++){ if(buffer[i] == 0){ buffer[i] = item; break; } } if(i == BUFFER_SIZE) return -1; pthread_mutex_unlock(&mutex); sem_post(&full); return 0; } int remove_item(buffer_item* item){ /* remove an object from buffer placing it in item return 0 if successful, otherwise return -1 indicating an error condition */ sem_wait(&full); pthread_mutex_lock(&mutex); int i; for(i = 0;i < BUFFER_SIZE;i++){ if(buffer[i] != 0){ *item = buffer[i]; buffer[i] = 0; break; } } if(i == BUFFER_SIZE) return -1; pthread_mutex_unlock(&mutex); sem_post(&empty); return 0; } void* producer(void* param){ buffer_item rand_item; int randomtime; int* num = (int*)param; while(TRUE){ /* sleep for a random period of time */ randomtime = (rand() % 10) + 1; sleep(randomtime); /* generate a random number */ rand_item = (rand() % RAND_MAX); printf("producer %d produced %d\n",*num,rand_item); if(insert_item(rand_item)) printf("report error condition\n"); } pthread_exit(0); } void* consumer(void* param){ buffer_item rand_item; int randomtime; int* num = (int*)param; while(TRUE){ /* sleep for a random period of time */ randomtime = (rand() % 10) + 1; sleep(randomtime); if(remove_item(&rand_item)) printf("report error condition\n"); else printf("consumer %d consumed %d\n",*num,rand_item); } pthread_exit(0); } int main(int argc, char* argv[]){ if(argc != 4) return -1; /*1. Get command line arguments argv[1], argv[2], argv[3]*/ int time_to_sleep = atoi(argv[1]); int producer_num = atoi(argv[2]); int consumer_num = atoi(argv[3]); /*2. Initialize buffer*/ /**********************************************************/ pthread_mutex_init(&mutex,NULL); sem_init(&empty,0,BUFFER_SIZE); sem_init(&full,0,0); /**********************************************************/ memset(buffer,0,sizeof(buffer)); srand((unsigned int)time(NULL)); int i; pthread_attr_t attr; pthread_attr_init(&attr); /*3. Create producer thread(s)*/ pthread_t producer_tid[producer_num]; int p_num[producer_num]; for(i = 0;i < producer_num;i++){ p_num[i] = i + 1; pthread_create(&producer_tid[i],&attr,producer,&p_num[i]); } /*4. Create consumer thread(s)*/ pthread_t consumer_tid[consumer_num]; int c_num[consumer_num]; for(i = 0;i < consumer_num;i++){ c_num[i] = i + 1; pthread_create(&consumer_tid[i],&attr,consumer,&c_num[i]); } /*5. Sleep*/ sleep(time_to_sleep); /*6. Exit*/ return 0; }