生產者與消費者模式(執行緒的同步與互斥)
阿新 • • 發佈:2019-02-05
條件變數
條件變數的提出首先要涉及一個概念,就是生產者消費者模型:
生產者消費者的模型提出了三種關係,兩種角色,一個場所
三種關係:
- 生產者之間的互斥關係
- 消費者之間的競互斥關係
- 生產者和消費者之間互斥和同步關係(同一時刻只能有一個,要麼在生產,要麼在消費,這就是互斥關係,只能在生產者生產完了之後才能消費,這就是同步關係)
兩個角色:一般是用程序或執行緒來承擔生產者或消費者
一個場所:有效的記憶體區域。(如單鏈表,陣列)
我們就可以把這個想象成生活中的超市供貨商,超市,顧客的關係,超市供貨商供貨,超市是擺放貨物的場所,然後使用者就是消費的。
條件變數屬於執行緒的一種同步的機制,條件變數與互斥鎖一起使用,可以使得執行緒進行等待特定條件的發生。條件本身是由互斥量保護的,執行緒在改變條件狀態之前首先會鎖住互斥量。其他執行緒在獲得互斥量之前不會察覺這種改變,因此互斥量鎖定後才能計算條件。
和互斥鎖一樣,使用條件變數,同樣首先進行初始化:
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 1
- 2
- 3
- 4
- 5
- 1
- 2
- 3
- 4
- 5
和互斥鎖的初始化一樣,它也可以採用init或者是直接利用巨集進行初始化。
條件變數本身就是依賴互斥鎖的,條件本身是由互斥量保護的,執行緒在改變條件狀態錢先要鎖住互斥量,它是利用執行緒間共享的全域性變數進行同步的一種機制。
我們使用pthread_cond_wait進行等待條件變數變為真,如果在規定的時間不能滿足,就會生成一個返回錯誤碼的變數。
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
- 1
- 2
- 3
- 1
- 2
- 3
把鎖傳遞給wait函式,函式自動把等待條件的執行緒掛起,放入消費者等待佇列,然後解鎖掛起執行緒所佔有的互斥鎖,這個時候就可以去跑其他執行緒,然後當等待條件滿足的時候,這個時候從等待佇列中出來執行,獲得剛才自己所佔有的鎖。
一個執行緒可以呼叫pthread_cond_wait在一個Condition
Variable上阻塞等待,這個函式做以下三步操作:
1. 釋放Mutex
2. 阻塞等待
3. 當被喚醒時,重新獲得Mutex並返回
滿足條件的時候可以使用函式pthread_cond_signal進行喚醒
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
- 1
- 2
- 3
- 1
- 2
- 3
這兩個函式都是用來進行喚醒執行緒操作的,signal一次從消費者佇列中至少喚醒一個執行緒,broad_cast能喚醒等待該條件的所有執行緒。
當然和mutex類似,條件變數也需要清除。
int pthread_cond_destroy(pthread_cond_t *cond);
生產者消費者示例:#include<stdio.h>
#include<assert.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
typedef struct Node
{
struct Node *next;
int val;
}Node;
void list_init(Node **phead)
{
assert(phead);
Node *temp = (Node*)malloc(sizeof(Node));
temp->next = NULL;
temp->val = 0;
*phead = temp;
}
void list_push(Node *phead,int _data) //頭插
{
assert(phead);
Node *pM = (Node*)malloc(sizeof(Node));
if (pM)
{
pM->val = _data;
pM->next = phead->next;
phead->next = pM;
}
}
int empty(Node *phead)
{
return (phead->next == NULL ? 1: 0);
}
void list_print(Node *phead)
{
assert(phead);
if (empty(phead))
return ;
Node *pCur = phead->next;
while (pCur)
{
printf("%d->",pCur->val);
pCur = pCur->next;
}
printf("%s\n","NULL");
}
void list_pop(Node *phead,int *data) //頭刪
{
assert(phead);
if (empty(phead))
return ;
Node *ptemp = phead->next;
phead->next = ptemp->next;
*data = ptemp->val;
free(ptemp);
ptemp = NULL;
}
void list_destroy(Node *phead)
{
assert(phead);
while (!empty(phead))
{
int data;
list_pop(phead,&data);
}
}
//生產者執行緒
void* producer(void *arg)
{
Node *phead = (Node *)arg;
while (1)
{
pthread_mutex_lock(&mutex); //申請互斥鎖
int data = rand()%100;
list_push(phead,data);
pthread_mutex_unlock(&mutex);//釋放互斥鎖
printf("prodecer sucess %d\n",data);
pthread_cond_signal(&cond); //以單播的方式通知擁有條件變數的另外一個執行緒,告訴消費者,生產者生產好了,可以消費了。
sleep(1);
}
return NULL;
}
//消費者執行緒
void* consumer(void *arg)
{
Node *phead = (Node *)arg;
while (1)
{
int data;
pthread_mutex_lock(&mutex);
if (empty(phead)) //如果沒有資源可以消費了,則等待
{
pthread_cond_wait(&cond,&mutex); //這個函式呼叫一定是在擁有互斥鎖的前提下.這個函式做三件事,第一:釋放互斥鎖,二,阻塞等待,三,喚醒的時候重新獲得互斥鎖。
}
list_pop(phead,&data);//有資源就消費
pthread_mutex_unlock(&mutex);
printf("consumer sucess %d\n",data);
}
return NULL;
}
int main()
{
Node *phead;
list_init(&phead);
pthread_t id1;
pthread_t id2;
pthread_create(&id1,NULL,producer,(void*)phead);
pthread_create(&id2,NULL,consumer,(void*)phead);
pthread_join(id1,NULL);
pthread_join(id2,NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
return 0;
}
訊號量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);//類似P操作
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);//類似V操作
用陣列模擬環形佇列儲存資料程式碼:
#include<stdio.h>
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<semaphore.h>
//二個規則,(1)消費者一定要等到生產才能消費,(2)生產者如果生產的快的話,不能套消費者一個圈
#define SIZE 64
sem_t blanks; //二個訊號量
sem_t datas;
int buf[SIZE]; //用陣列模擬一個環形佇列。
void *productor(void *arg)
{
int i = 0; //i = 0表示生產的起始位置。
while (1)
{
// pthread_mutex_lock(&mutex_p);
sem_wait (&blanks);//生產者在生產之前要有格子資源。
int data = rand()%1234;
buf[i] = data;
printf("productor done...data:%d\n",data);
i++;
i%=SIZE;//i++總有超過陣列的長度的時候,為了模擬環形佇列,所以求模。
sleep(1); //生產的慢點
sem_post(&datas);//生產者生產完了,資料資源就多了一個。
// pthread_mutex_unlock(&mutex_p);
}
return NULL;
}
void *consummer(void *arg)
{
int i = 0;
while (1)
{
sem_wait(&datas); //消費者在消費之前要有消費資源。
int data = buf[i];
printf("consummer done...data:%d\n",data);
i++;
i%= SIZE;
sem_post(&blanks);//消費者消費完了格子資源就多了一個。
}
return NULL;
}
int main()
{
sem_init(&blanks,0,SIZE);
sem_init(&datas,0,0); //訊號量的初始化要在多執行緒之前
pthread_t id1,id2;
pthread_create(&id1,NULL,productor,NULL);
pthread_create(&id2,NULL,consummer,NULL);
pthread_join(id1,NULL);
pthread_join(id2,NULL);
sem_destroy(&blanks);
sem_destroy(&datas);
return 0;
}