1. 程式人生 > 實用技巧 >多執行緒同步的四種方式(史上最詳細+用例)

多執行緒同步的四種方式(史上最詳細+用例)

多執行緒同步的四種方式

對於多執行緒程式來說,同步是指在一定的時間內只允許某一個執行緒來訪問某個資源。而在此時間內,不允許其他的執行緒訪問該資源。可以通過互斥鎖(Mutex)、條件變數(condition variable)、讀寫鎖(reader-writer lock)、訊號量(semaphore)來同步資源。

  1. 互斥鎖(Mutex)

    互斥量是最簡單的同步機制,即互斥鎖。多個程序(執行緒)均可以訪問到一個互斥量,通過對互斥量加鎖,從而來保護一個臨界區,防止其它程序(執行緒)同時進入臨界區,保護臨界資源互斥訪問。

    互斥鎖需要滿足三個條件:

    • 互斥 不同執行緒的臨界區沒有重疊
    • 無死鎖 如果一個執行緒正在嘗試獲得一個鎖,那麼總會成功地獲得這個鎖。若執行緒A呼叫lock()但是無法獲得鎖,則一定存在其他執行緒正在無窮次地執行臨界區。
    • 無飢餓 每一個試圖獲得鎖的執行緒最終都能成功。
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
     
    void *function(void *arg);
    pthread_mutex_t mutex;
    int counter = 0;
    int main(int argc, char *argv[])
    {
        int rc1,rc2;
         
        char *str1="hello";
        char *str2="world";
        pthread_t thread1,thread2;
     
        pthread_mutex_init(&mutex,NULL);
        if((rc1 = pthread_create(&thread1,NULL,function,str1)))
        {
            fprintf(stdout,"thread 1 create failed: %d\n",rc1);
        }
     
        if(rc2=pthread_create(&thread2,NULL,function,str2))
        {
            fprintf(stdout,"thread 2 create failed: %d\n",rc2);
        }
     
        pthread_join(thread1,NULL);
        pthread_join(thread2,NULL);
        return 0;
    }
    
    void *function(void *arg)
    {
        char *m;
        m = (char *)arg;
        pthread_mutex_lock(&mutex);
        while(*m != '\0')
        {
            printf("%c",*m);
            fflush(stdout);
            m++;
            sleep(1);
        }
        printf("\n");
        pthread_mutex_unlock(&mutex);
    }
    
  2. 條件變數(condition variable)

    生產者消費者問題:每次生產一個商品,發一個訊號,告訴消費者“我生產商品了,快來消費”,消費者拿到生產者的條件變數後每次消費兩個商品,然後發出訊號“我消費了商品,你可以生產了”--_--(發的這個訊號是一個條件變數,通過傳送這個訊號可以喚醒阻塞的執行緒,收到訊號後,不滿足需求也會繼續阻塞)

    為了防止競爭,條件變數的使用總是和一個互斥鎖結合在一起;條件變數是執行緒的另一種同步機制,它和互斥量是一起使用的。互斥量的目的就是為了加鎖,而條件變數的結合,使得執行緒能夠以等待的狀態來迎接特定的條件發生,而不需要頻繁查詢鎖。

    #include <pthread.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <signal.h>
    
    // 定義條件變數cond_product
        pthread_cond_t cond_product;
    
    // 定義條件變數cond_consume
        pthread_cond_t cond_consume;
    
    // 定義執行緒互斥鎖lock
        pthread_mutex_t lock;
    
    //初始化函式
    void init_work(void)
    {
    // 條件變數的初始化
        pthread_cond_init(&cond_product, NULL);
        pthread_cond_init(&cond_consume, NULL);
    
    // 執行緒鎖的初始化
        pthread_mutex_init(&lock, NULL);
    }
    
    //生產執行緒,每次生產一個產品
    void* handle_product(void *arg){
        int i;
        int* product = NULL;
        product = (int *) arg;
        for(i=1; i<50; i++) {
            pthread_mutex_lock(&lock); //生產程序上鎖,是消費程序無法改變商品個數
            if (*product >= 4) {
    // 倉庫已滿,應該阻塞式等待
                printf("\033[43m倉庫已滿, 暫停生產...\033[0m\n");
                pthread_cond_wait(&cond_product, &lock);
            }
            printf("生產中....\n");
            sleep(2);
            printf("生產成功....\n");
            *product+=1;
            pthread_cond_signal(&cond_consume);//發出訊號,條件已經滿足
            printf("\033[32m生產一個產品,生產%d次,倉庫中剩餘%d個\033[0m\n",i,*product);
            printf ("發訊號--->生產成功\n");
            pthread_mutex_unlock(&lock);//生產程序解鎖
            usleep(50000);
        }
            return NULL;
    }
    
    //消費執行緒,每次消費兩個產品,消費6次間歇
    void* handle_consume(void *arg){
        int i;
        int* product = NULL;
        product = (int *)arg;
            for (i=1; i<50; i++) {
            pthread_mutex_lock(&lock);
            if (*product <= 1) //消費執行緒每次消費2個,故總產品數量小於1即阻塞
            {
    /* 阻塞式等待 */
                printf("\033[43m缺貨中,請等待...\033[0m\n");
                pthread_cond_wait(&cond_consume, &lock);
            }            
     /* 消費產品,每次從倉庫取出兩個產品 */
            printf("消費中...\n");
             sleep(2);
            *product-=2;
            printf("消費完成...\n");
            printf("\033[31m消費兩個產品,共消費%d次,倉庫剩餘%d個\033[0m\n",i,*product);
            pthread_cond_signal(&cond_product);
            printf ("發訊號---> 已消費\n");
            pthread_mutex_unlock(&lock);
            usleep(30000);
            if (i%6 == 0){ //消費間歇
                sleep(9);
            }
        }
            return NULL;
    }
    
    int main()
    {
        pthread_t th_product, th_consume; //定義執行緒號
        int ret;
        int intrinsic = 3;
    //初始化所有變數
        init_work();
    //建立程序並傳遞相關引數
        ret = pthread_create(&th_product, 0, handle_product, &intrinsic);
        if (ret != 0) {
            perror("建立生產執行緒失敗\n");
            exit(1);
        }
    
        ret = pthread_create(&th_consume, 0, handle_consume, &intrinsic);
        if (ret != 0) {
            perror("建立消費執行緒失敗\n");
            exit(1);
        }
    
        pthread_join(th_product, 0);//回收生產執行緒
        pthread_join(th_consume, 0);//回收消費執行緒
    
        return 0;
    }
    
  3. 讀寫鎖(reader-writer lock)

    前面介紹的互斥量加鎖要麼是鎖狀態,要麼就是不加鎖狀態。而且只有一次只有一個執行緒可以對其加鎖。這樣的目的是為了防止變數被不同的執行緒修改。但是如果有執行緒只是想讀而不會去寫的話,這有不會導致變數被修改。但是如果是互斥量加鎖,則讀寫都沒有辦法。這種場景不能使用互斥量,必須使用讀寫鎖。

    讀寫鎖可以有3種狀態:

    1. 讀模式下加鎖狀態

    2. 寫模式下加鎖狀態

    3. 不加鎖狀態

    一次只有一個執行緒可以佔有寫模式的讀寫鎖,但是多個執行緒可以同時佔有讀模式的讀寫鎖。當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的執行緒都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的執行緒都可以得到訪問權。但是任何希望以寫模式對此鎖進行加鎖的執行緒都會阻塞。直到所有的執行緒釋放它們的讀鎖為止。

    讀寫鎖非常適合於對資料結構讀的次數大於寫的情況。當讀寫鎖在寫模式下時,它所保護的資料結構就可以被安全地修改,因為一次只有一個執行緒可以在寫模式下擁有這個鎖。

    讀寫鎖也叫做共享互斥鎖。當讀寫鎖是讀模式鎖住的,就可以說是以共享模式鎖住的。當它是寫模式鎖住的時候,就可以說成是以互斥模式鎖住的。

    #include <pthread.h>

    Int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

    Int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

    讀寫鎖通過呼叫pthread_rwlock_init進行初始化。在釋放讀寫鎖佔有的記憶體之前,需要呼叫pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init為讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在呼叫pthread_rwlock_destroy之前就釋放了讀寫鎖佔用的記憶體空間。那麼分配給這個鎖的資源就會丟失。

    要在讀模式下鎖定讀寫鎖,需要呼叫pthread_rwlock_rdlock,要在寫模式下鎖定讀寫鎖,需要呼叫pthread_rwlock_wrlock。不管以何種方式鎖住讀寫鎖。都可以呼叫pthread_rwlock_unlock進行解鎖。

    Int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

    Int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

    Int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

    #include <pthread.h>
    #include <stdio.h>
    
    
    pthread_rwlock_t rwlock;
    
    int data=1;
    
    
    void readerA(){
    
        while(1){
    
            pthread_rwlock_rdlock(&rwlock);
    
            printf("A讀者讀出:%d\n",data);
    
            pthread_rwlock_unlock(&rwlock);
    
            Sleep(1000);
    
        }
    
    }
    
     
    
    void writerB(){
    
        while(1){
    
            pthread_rwlock_wrlock(&rwlock);
    
            data++;
    
            printf("B作者寫入:%d\n",data);
    
            pthread_rwlock_unlock(&rwlock);
    
            Sleep(1000);
    
        }
    
    }
    
     
    
     
    
    int main()
    
    {
    
        pthread_t t1;
    
        pthread_t t2;
    
        pthread_rwlock_init(&rwlock,NULL);
    
        pthread_create(&t1,NULL,readerA,NULL);
    
        pthread_create(&t2,NULL,writerB,NULL);
    
        pthread_join(t1,NULL);
    
        pthread_join(t2,NULL);
    
        pthread_rwlock_destroy(&rwlock);
    
        return 0;
    
    }
    
  4. 訊號量(semaphore)

    在生產者消費者模型中,對任務數量的記錄就可以使用訊號量來做。可以理解為帶計數的條件變數。當訊號量的值小於0時,工作程序或者執行緒就會阻塞,等待物品到來。當生產者生產一個物品,會將訊號量值加1操作。 這是會喚醒在訊號量上阻塞的程序或者執行緒,它們去爭搶物品。

    這裡用一個讀寫檔案作為例子:

    首先需要用sem_init(); 初始化sem_t型變數,並設定初始訊號量。比如設定為1.

    每次呼叫sem_wait(sem_t *); 訊號量減一,當呼叫sem_post(sem_t *); 訊號量加一。

    當訊號量為0時,sem_wait(); 函式阻塞,等待訊號量 >0 時,才進行。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
typedef struct{
	sem_t *lock;
	int num;
}STRUCT;
void test(void * obj)
{
	STRUCT *point = (STRUCT *)obj;
	sem_t *semlock = point->lock;
	sem_wait(semlock);
	FILE *f = fopen("test.txt","a");
	if(f==NULL)
		printf("fopen is wrong\n");
	printf("sem_wait %d\n",point->num);
	int j=0;
	for(j=0;j<30;j++)
	fprintf(f,"%c111111111111\n",'a'+point->num);
	fclose(f);
	sem_post(semlock);
 
	return;
}
 
 
int main()
{
	pthread_t pid[20];  //  pthread_t pid;
	int ret,i=0;
	STRUCT obj[13];
	sem_t semlock;
	if(sem_init(&semlock,0,1)!=0)    // 此處初始訊號量設為1. 第二個引數為0表示不應用於其他程序。
		printf("sem_init is wrong\n");
	for(i=0;i<10;i++)
	{
		obj[i].num = i;
		obj[i].lock = &semlock;
		ret = pthread_create(&pid[i],NULL,(void *)test,&obj[i]);
	
		if(ret!=0)
		{
			printf("create thread wrong %d!!\n",i);
			return 0;
		}
		
	}
	for(i=0;i<10;i++)
		pthread_join(pid[i],NULL);  // 等待其他執行緒結束,如果沒有這裡,主執行緒先結束,會釋放pid[]及obj[],則出現BUG。 
	return 0;
}