執行緒同步—條件變數
條件變數
互斥量防止多個執行緒同時訪問同一共享變數。條件變數允許一個執行緒就某個共享變數(或其他共享資源)的狀態變化通知其他執行緒,並讓其他執行緒這一通知,在通知未到達之前,執行緒處於阻塞狀態。條件變數本身是由互斥量保護的。執行緒在改變條件狀態之前必須首先鎖住互斥量。其他執行緒在獲得互斥量之前不會察覺到這種改變,因為互斥量必須在鎖定以後才能計算條件。
條件變數總是結合互斥量使用的。條件變數就共享變數/臨界資源的狀態改變發出通知,而互斥量提供對該條件變數的互斥訪問。
在使用條件變數之前,必須先對它進行初始化。由 pthread_cond_t 資料型別表示的條件變數可以用兩種方式進行初始化,可以把常量 PTHREAD_COND_INITIALIZER
在釋放條件變數底層的記憶體空間之前,可以使用 pthread_cond_destroy() 函式對條件變數進行反初始化(deinitialize)。
1 include <pthread.h>
2
3 int pthread_cond_init(pthread_cond_t *restrict cond,
4 const pthread_condattr_t *restrict attr);
5
6 int pthread_cond_destroy(pthread_cond_t *cond);
7
8 //條件變數的靜態初始化方式
9 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 兩個函式的返回值:若成功,返回0;否則,返回錯誤編號。
除非需要建立一個具有非預設屬性的條件變數,否則 pthread_cond_init() 函式的 attr 引數可以設定為 NULL。
條件變數的操作
條件變數的操作主要是傳送通知訊號(Signal)和等待(Wait)。傳送訊號操作即通知一個或多個處於等待狀態的執行緒,某個共享變數的狀態已經改變。等待操作是指在收到一個通知前一直處於阻塞狀態。
1. 設定等待條件
我們使用 pthread_cond_wait() 函式等待條件變數為真。如果在給定的時間內條件不能滿足,那麼會返回一個錯誤編號。
1 #include <pthread.h>
2
3 int pthread_cond_wait(pthread_cond_t *restrict cond,
4 pthread_mutex_t *restrict mutex);
5
6 int pthread_cond_timedwait(pthread_cond_t *restrict cond,
7 pthread_mutex_t *restrict mutex,
8 const struct timespec *restrict abstime);
- 兩個函式的返回值:若成功,返回0;否則,返回錯誤編號。
《函式說明》
(1)傳遞給 pthread_cond_wait() 函式的互斥量對條件變數進行保護。呼叫者把鎖住的互斥量傳給該函式,函式自動把呼叫執行緒放到等待條件的執行緒列表中,然後在 pthread_cond_wait() 函式內部對互斥量解鎖。在未接收到條件變數狀態改變的通知之前,當前執行緒會阻塞在 pthread_cond_wait() 函式中;一旦接收到狀態改變的通知“訊號”,pthread_cond_wait() 才會返回,並且在該函式內部互斥量會被再次鎖定,因此,該函式返回後,還需要對互斥量進行一次解鎖操作。
(2)pthread_cond_timedwait() 函式的功能與 pthread_cond_wait() 函式類似,只是多了一個超時時間。超時值指定了我們願意等待多長時間,它是通過 timespec 結構體指定的。這個時間值是一個絕對數而不是一個相對數。例如,假設願意等待3分鐘,那麼,並不是把3分鐘轉換成 timespec 結構,而是需要把當前時間加上3分鐘再轉換成 timespec 結構。可以使用 clock_gettime() 函式獲取 timespec 結構體表示的當前時間,但是並不是所有的平臺都支援這個函式,Linux系統是支援的,clock_gettime() 函式是在librt庫中實現的,所以需要加上-lrt庫連結。當然,也可以使用另一個函式 gettimeofday() 獲取 timeval 結構表示的當前時間,然後把這個時間轉換成 timespec 結構體。要得到超時值的絕對時間,可以使用下面的函式(假設阻塞的最大時間使用分鐘來表示):
#include <stdlib.h>
#include <sys/time.h>
void maketimeout(struct timespec *tsp, long minutes)
{
struct timeval now;
//get the current time
gettimeofday(&now, NULL);
tsp->tv_sec = now.tv_sec;
tsp->tv_nsec = now.tv_usec * 1000; //usec(微秒)-->nsec(納秒)
//add the offset to get timeout value
tsp->tv_sec += minutes * 60;
}
<連結> struct timespec 和 struct timeval 結構體定義
如果超時時間到期後,條件還是沒有出現,pthread_cond_timedwait() 將重新獲取互斥量,然後返回錯誤 ETIMEDOUT。從 pthread_cond_wait 或者 pthread_cond_timedwait 呼叫成功返回時,執行緒需要重新計算條件,因為另一個執行緒可能已經在執行並改變了條件。
2. 設定通知條件
有兩個函式可以用於通知執行緒條件已經滿足。pthread_cond_signal() 函式至少能喚醒一個等待該條件的執行緒,而pthread_cond_broadcast() 函式則能喚醒等待該條件的所有執行緒。
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
- 兩個函式的返回值:若成功,返回0;否則,返回錯誤編號。
在呼叫 pthread_cond_signal 或 pthread_cond_broadcast 時,我們說這是給執行緒或者條件傳送通知訊號(Signal)。必須注意的是,一定要在改變狀態以後再給執行緒發通知資訊。
<備註> POXIS 規範為了簡化 pthread_cond_signal 的實現,允許它在實現的時候喚醒一個以上的執行緒。
示例:在生產者-消費者模型中,結合使用條件變數和互斥量對執行緒進行同步。程式碼如下:prod_condvar.c
#include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <time.h> #include <pthread.h> //對靜態互斥量的初始化 static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; //對靜態條件變數的初始化 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static int avail = 0; //共享變數,記錄已生產可供訊息的產品數量 static void * producer(void *arg) { int cnt = atoi((char*)arg); int ret, i; printf("producer: pid=%lu, tid=%lu\n", getpid(), pthread_self()); for(i=0; i<cnt; i++){ sleep(1); ret = pthread_mutex_lock(&mtx); //對共享變數 avail 需要互斥訪問 if(ret != 0) printf("pthread_mutex_lock failed!\n"); avail++; //生成一個產品 ret = pthread_mutex_unlock(&mtx); if(ret != 0) printf("pthread_mutex_unlock failed!\n"); ret = pthread_cond_signal(&cond); //喚醒消費者 if(ret != 0) printf("pthread_cond_signal failed!\n"); } return NULL; } int main(int argc, char *argv[]) { pthread_t tid; int ret, i; int totRequired; //所有執行緒將要生產的產品的總數 int numConsumed; //消費者已消費的產品數 bool done; //商品是否消費完成標誌 time_t t; t = time(NULL); printf("main: pid=%lu, tid=%lu\n", getpid(), pthread_self()); //建立所有執行緒 totRequired = 0; for(i=1; i<argc; i++){ totRequired += atoi(argv[i]); ret = pthread_create(&tid, NULL, producer, argv[i]); if(ret != 0){ printf("pthread_create failed!\n"); } } //消費者迴圈消費已生產出來的產品 numConsumed = 0; done = false; for(;;){ pthread_mutex_lock(&mtx); if(avail == 0){ ret = pthread_cond_wait(&cond, &mtx); //等待喚醒通知 if(ret != 0) printf("pthread_cond_wait failed!\n"); } //程式執行到這裡時,互斥量仍是lock的 while(avail > 0){ numConsumed ++; //消費者已消費商品數加1 avail --; //現存商品數減1 printf("T=%ld, numConsumed=%d\n", (long)(time(NULL)-t), numConsumed); done = numConsumed >= totRequired; //當所有生產的商品都已消費完成,done置為true } pthread_mutex_unlock(&mtx); if(done) break; } return 0; }
**編譯命令: gcc prod_condvar.c -o prod_condvar -lpthread
**執行命令: ./prod_condvar 4 5 6
**執行結果:
main: pid=20056, tid=140332428621632 producer: pid=20056, tid=140332420314880 producer: pid=20056, tid=140332411922176 producer: pid=20056, tid=140332403529472 T=1, numConsumed=1 T=1, numConsumed=2 T=1, numConsumed=3 T=2, numConsumed=4 T=2, numConsumed=5 T=2, numConsumed=6 T=3, numConsumed=7 T=3, numConsumed=8 T=3, numConsumed=9 T=4, numConsumed=10 T=4, numConsumed=11 T=4, numConsumed=12 T=5, numConsumed=13 T=5, numConsumed=14 T=6, numConsumed=15
《程式碼分析》
- 執行命令:./prod_condvar 4 5 6,表示的含義是生產者執行緒1、2、3生產的產品個數分別是4、5、6,共計15個。從執行結果可以看到,本示例中,共有4個執行緒,其中主執行緒是main函式,亦即消費者執行緒,而其他三個執行緒是生產者執行緒producer,也就是說總共有3個生產者,1個消費者。這四個執行緒同時共享全域性變數 avail。
- 生產者負責生產商品,當生產者每生產出1個商品,共享變數avail自增加1,然後使用 pthread_cond_signal 喚醒main函式中的消費者執行緒,通知其可以消費商品了。而對於消費者執行緒,剛開始的時候,avail == 0,因此使用 pthread_cond_wait 設定等待條件,此時消費者執行緒會處於阻塞狀態,直到接收到生產者 producer 發出的喚醒通知,消費者執行緒開始繼續執行,並開始消費已生產出來的商品。
- 當消費者已消費的商品數 >= 所有生產者生產出來的商品時,退出 for迴圈,結束主執行緒,同時整個程序結束。
參考
《UNIX環境高階程式設計(第3版)》第11.6.6章節
《Linux_Unix系統程式設計手冊(上)》第30.2章節