1. 程式人生 > 實用技巧 >執行緒同步—條件變數

執行緒同步—條件變數

條件變數

  互斥量防止多個執行緒同時訪問同一共享變數。條件變數允許一個執行緒就某個共享變數(或其他共享資源)的狀態變化通知其他執行緒,並讓其他執行緒這一通知,在通知未到達之前,執行緒處於阻塞狀態。條件變數本身是由互斥量保護的。執行緒在改變條件狀態之前必須首先鎖住互斥量。其他執行緒在獲得互斥量之前不會察覺到這種改變,因為互斥量必須在鎖定以後才能計算條件。

  條件變數總是結合互斥量使用的。條件變數就共享變數/臨界資源的狀態改變發出通知,而互斥量提供對該條件變數的互斥訪問。

  在使用條件變數之前,必須先對它進行初始化。由 pthread_cond_t 資料型別表示的條件變數可以用兩種方式進行初始化,可以把常量 PTHREAD_COND_INITIALIZER

賦給靜態分配的條件變數,但是如果條件變數是動態分配的,則需要使用pthread_cond_init() 函式對它進行初始化。

  在釋放條件變數底層的記憶體空間之前,可以使用 pthread_cond_destroy() 函式對條件變數進行反初始化(deinitialize)。

1 include <pthread.h>
2 
3 int pthread_cond_init(pthread_cond_t *restrict cond,
4                const pthread_condattr_t *restrict attr);
5 
6 int pthread_cond_destroy(pthread_cond_t *cond);
7 8 //條件變數的靜態初始化方式 9 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 兩個函式的返回值:若成功,返回0;否則,返回錯誤編號。

  除非需要建立一個具有非預設屬性的條件變數,否則 pthread_cond_init() 函式的 attr 引數可以設定為 NULL。

條件變數的操作

  條件變數的操作主要是傳送通知訊號(Signal)和等待(Wait)。傳送訊號操作即通知一個或多個處於等待狀態的執行緒,某個共享變數的狀態已經改變。等待操作是指在收到一個通知前一直處於阻塞狀態。

1. 設定等待條件

  我們使用 pthread_cond_wait() 函式等待條件變數為真。如果在給定的時間內條件不能滿足,那麼會返回一個錯誤編號。

1 #include <pthread.h>
2 
3 int pthread_cond_wait(pthread_cond_t *restrict cond,
4       pthread_mutex_t *restrict mutex);
5 
6 int pthread_cond_timedwait(pthread_cond_t *restrict cond,
7       pthread_mutex_t *restrict mutex,
8       const struct timespec *restrict abstime);
  • 兩個函式的返回值:若成功,返回0;否則,返回錯誤編號。

《函式說明》

(1)傳遞給 pthread_cond_wait() 函式的互斥量對條件變數進行保護。呼叫者把鎖住的互斥量傳給該函式,函式自動把呼叫執行緒放到等待條件的執行緒列表中,然後在 pthread_cond_wait() 函式內部對互斥量解鎖。在未接收到條件變數狀態改變的通知之前,當前執行緒會阻塞在 pthread_cond_wait() 函式中;一旦接收到狀態改變的通知“訊號”,pthread_cond_wait() 才會返回,並且在該函式內部互斥量會被再次鎖定,因此,該函式返回後,還需要對互斥量進行一次解鎖操作。

(2)pthread_cond_timedwait() 函式的功能與 pthread_cond_wait() 函式類似,只是多了一個超時時間。超時值指定了我們願意等待多長時間,它是通過 timespec 結構體指定的。這個時間值是一個絕對數而不是一個相對數。例如,假設願意等待3分鐘,那麼,並不是把3分鐘轉換成 timespec 結構,而是需要把當前時間加上3分鐘再轉換成 timespec 結構。可以使用 clock_gettime() 函式獲取 timespec 結構體表示的當前時間,但是並不是所有的平臺都支援這個函式,Linux系統是支援的,clock_gettime() 函式是在librt庫中實現的,所以需要加上-lrt庫連結。當然,也可以使用另一個函式 gettimeofday() 獲取 timeval 結構表示的當前時間,然後把這個時間轉換成 timespec 結構體。要得到超時值的絕對時間,可以使用下面的函式(假設阻塞的最大時間使用分鐘來表示):

#include <stdlib.h>
#include <sys/time.h>

void maketimeout(struct timespec *tsp, long minutes)
{
   struct timeval now;
    
   //get the current time
    gettimeofday(&now, NULL);
  tsp->tv_sec = now.tv_sec;
  tsp->tv_nsec = now.tv_usec * 1000;  //usec(微秒)-->nsec(納秒)
  //add the offset to get timeout value
    tsp->tv_sec += minutes * 60;
}

<連結> struct timespec 和 struct timeval 結構體定義
  如果超時時間到期後,條件還是沒有出現,pthread_cond_timedwait() 將重新獲取互斥量,然後返回錯誤 ETIMEDOUT。從 pthread_cond_wait 或者 pthread_cond_timedwait 呼叫成功返回時,執行緒需要重新計算條件,因為另一個執行緒可能已經在執行並改變了條件。

2. 設定通知條件

  有兩個函式可以用於通知執行緒條件已經滿足。pthread_cond_signal() 函式至少能喚醒一個等待該條件的執行緒,而pthread_cond_broadcast() 函式則能喚醒等待該條件的所有執行緒。

#include <pthread.h>

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

  • 兩個函式的返回值:若成功,返回0;否則,返回錯誤編號。

  在呼叫 pthread_cond_signal 或 pthread_cond_broadcast 時,我們說這是給執行緒或者條件傳送通知訊號(Signal)。必須注意的是,一定要在改變狀態以後再給執行緒發通知資訊。

<備註> POXIS 規範為了簡化 pthread_cond_signal 的實現,允許它在實現的時候喚醒一個以上的執行緒。

示例:在生產者-消費者模型中,結合使用條件變數和互斥量對執行緒進行同步。程式碼如下:prod_condvar.c

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>

//對靜態互斥量的初始化
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
//對靜態條件變數的初始化
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

static int avail = 0;  //共享變數,記錄已生產可供訊息的產品數量

static void *
producer(void *arg)
{
    int cnt = atoi((char*)arg);
    int ret, i;
    
    printf("producer: pid=%lu, tid=%lu\n", getpid(), pthread_self());
    for(i=0; i<cnt; i++){
        sleep(1);
        ret = pthread_mutex_lock(&mtx);  //對共享變數 avail 需要互斥訪問
        if(ret != 0)
            printf("pthread_mutex_lock failed!\n");
        avail++;  //生成一個產品
        ret = pthread_mutex_unlock(&mtx);
        if(ret != 0)
            printf("pthread_mutex_unlock failed!\n");
        ret = pthread_cond_signal(&cond);  //喚醒消費者
        if(ret != 0)
            printf("pthread_cond_signal failed!\n");
    }
    return NULL;
}

int
main(int argc, char *argv[])
{
    pthread_t tid;
    int ret, i;
    int totRequired;  //所有執行緒將要生產的產品的總數
    int numConsumed;  //消費者已消費的產品數
    
    bool done;     //商品是否消費完成標誌
    time_t t;
    
    t = time(NULL);
    
    printf("main: pid=%lu, tid=%lu\n", getpid(), pthread_self());
    //建立所有執行緒
    totRequired = 0;
    for(i=1; i<argc; i++){
        totRequired += atoi(argv[i]);
        ret = pthread_create(&tid, NULL, producer, argv[i]);
        if(ret != 0){
            printf("pthread_create failed!\n");
        }
    }
    
    //消費者迴圈消費已生產出來的產品
    numConsumed = 0;
    done = false;
    for(;;){
        pthread_mutex_lock(&mtx);
        if(avail == 0){
            ret = pthread_cond_wait(&cond, &mtx);  //等待喚醒通知
            if(ret != 0)
                printf("pthread_cond_wait failed!\n");
        }
        //程式執行到這裡時,互斥量仍是lock的
        while(avail > 0){
            numConsumed ++;  //消費者已消費商品數加1
            avail --;        //現存商品數減1
            printf("T=%ld, numConsumed=%d\n", (long)(time(NULL)-t), numConsumed);
            done = numConsumed >= totRequired;  //當所有生產的商品都已消費完成,done置為true
        }
        pthread_mutex_unlock(&mtx);
        
        if(done)
            break;
    }
    
    return 0;
}

**編譯命令: gcc prod_condvar.c -o prod_condvar -lpthread

**執行命令: ./prod_condvar 4 5 6

**執行結果:

main: pid=20056, tid=140332428621632
producer: pid=20056, tid=140332420314880
producer: pid=20056, tid=140332411922176
producer: pid=20056, tid=140332403529472
T=1, numConsumed=1
T=1, numConsumed=2
T=1, numConsumed=3
T=2, numConsumed=4
T=2, numConsumed=5
T=2, numConsumed=6
T=3, numConsumed=7
T=3, numConsumed=8
T=3, numConsumed=9
T=4, numConsumed=10
T=4, numConsumed=11
T=4, numConsumed=12
T=5, numConsumed=13
T=5, numConsumed=14
T=6, numConsumed=15

《程式碼分析》

  • 執行命令:./prod_condvar 4 5 6,表示的含義是生產者執行緒1、2、3生產的產品個數分別是4、5、6,共計15個。從執行結果可以看到,本示例中,共有4個執行緒,其中主執行緒是main函式,亦即消費者執行緒,而其他三個執行緒是生產者執行緒producer,也就是說總共有3個生產者,1個消費者。這四個執行緒同時共享全域性變數 avail。
  • 生產者負責生產商品,當生產者每生產出1個商品,共享變數avail自增加1,然後使用 pthread_cond_signal 喚醒main函式中的消費者執行緒,通知其可以消費商品了。而對於消費者執行緒,剛開始的時候,avail == 0,因此使用 pthread_cond_wait 設定等待條件,此時消費者執行緒會處於阻塞狀態,直到接收到生產者 producer 發出的喚醒通知,消費者執行緒開始繼續執行,並開始消費已生產出來的商品。
  • 當消費者已消費的商品數 >= 所有生產者生產出來的商品時,退出 for迴圈,結束主執行緒,同時整個程序結束。

參考

《UNIX環境高階程式設計(第3版)》第11.6.6章節

《Linux_Unix系統程式設計手冊(上)》第30.2章節