多執行緒同步的四種方式(史上最詳細+用例)
多執行緒同步的四種方式
對於多執行緒程式來說,同步是指在一定的時間內只允許某一個執行緒來訪問某個資源。而在此時間內,不允許其他的執行緒訪問該資源。可以通過互斥鎖(Mutex)、條件變數(condition variable)、讀寫鎖(reader-writer lock)、訊號量(semaphore)來同步資源。
-
互斥鎖(Mutex)
互斥量是最簡單的同步機制,即互斥鎖。多個程序(執行緒)均可以訪問到一個互斥量,通過對互斥量加鎖,從而來保護一個臨界區,防止其它程序(執行緒)同時進入臨界區,保護臨界資源互斥訪問。
互斥鎖需要滿足三個條件:
- 互斥 不同執行緒的臨界區沒有重疊
- 無死鎖 如果一個執行緒正在嘗試獲得一個鎖,那麼總會成功地獲得這個鎖。若執行緒A呼叫lock()但是無法獲得鎖,則一定存在其他執行緒正在無窮次地執行臨界區。
- 無飢餓 每一個試圖獲得鎖的執行緒最終都能成功。
#include <stdio.h> #include <stdlib.h> #include <pthread.h> void *function(void *arg); pthread_mutex_t mutex; int counter = 0; int main(int argc, char *argv[]) { int rc1,rc2; char *str1="hello"; char *str2="world"; pthread_t thread1,thread2; pthread_mutex_init(&mutex,NULL); if((rc1 = pthread_create(&thread1,NULL,function,str1))) { fprintf(stdout,"thread 1 create failed: %d\n",rc1); } if(rc2=pthread_create(&thread2,NULL,function,str2)) { fprintf(stdout,"thread 2 create failed: %d\n",rc2); } pthread_join(thread1,NULL); pthread_join(thread2,NULL); return 0; } void *function(void *arg) { char *m; m = (char *)arg; pthread_mutex_lock(&mutex); while(*m != '\0') { printf("%c",*m); fflush(stdout); m++; sleep(1); } printf("\n"); pthread_mutex_unlock(&mutex); }
-
條件變數(condition variable)
生產者消費者問題:每次生產一個商品,發一個訊號,告訴消費者“我生產商品了,快來消費”,消費者拿到生產者的條件變數後每次消費兩個商品,然後發出訊號“我消費了商品,你可以生產了”--_--(發的這個訊號是一個條件變數,通過傳送這個訊號可以喚醒阻塞的執行緒,收到訊號後,不滿足需求也會繼續阻塞)
為了防止競爭,條件變數的使用總是和一個互斥鎖結合在一起;條件變數是執行緒的另一種同步機制,它和互斥量是一起使用的。互斥量的目的就是為了加鎖,而條件變數的結合,使得執行緒能夠以等待的狀態來迎接特定的條件發生,而不需要頻繁查詢鎖。
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <signal.h> // 定義條件變數cond_product pthread_cond_t cond_product; // 定義條件變數cond_consume pthread_cond_t cond_consume; // 定義執行緒互斥鎖lock pthread_mutex_t lock; //初始化函式 void init_work(void) { // 條件變數的初始化 pthread_cond_init(&cond_product, NULL); pthread_cond_init(&cond_consume, NULL); // 執行緒鎖的初始化 pthread_mutex_init(&lock, NULL); } //生產執行緒,每次生產一個產品 void* handle_product(void *arg){ int i; int* product = NULL; product = (int *) arg; for(i=1; i<50; i++) { pthread_mutex_lock(&lock); //生產程序上鎖,是消費程序無法改變商品個數 if (*product >= 4) { // 倉庫已滿,應該阻塞式等待 printf("\033[43m倉庫已滿, 暫停生產...\033[0m\n"); pthread_cond_wait(&cond_product, &lock); } printf("生產中....\n"); sleep(2); printf("生產成功....\n"); *product+=1; pthread_cond_signal(&cond_consume);//發出訊號,條件已經滿足 printf("\033[32m生產一個產品,生產%d次,倉庫中剩餘%d個\033[0m\n",i,*product); printf ("發訊號--->生產成功\n"); pthread_mutex_unlock(&lock);//生產程序解鎖 usleep(50000); } return NULL; } //消費執行緒,每次消費兩個產品,消費6次間歇 void* handle_consume(void *arg){ int i; int* product = NULL; product = (int *)arg; for (i=1; i<50; i++) { pthread_mutex_lock(&lock); if (*product <= 1) //消費執行緒每次消費2個,故總產品數量小於1即阻塞 { /* 阻塞式等待 */ printf("\033[43m缺貨中,請等待...\033[0m\n"); pthread_cond_wait(&cond_consume, &lock); } /* 消費產品,每次從倉庫取出兩個產品 */ printf("消費中...\n"); sleep(2); *product-=2; printf("消費完成...\n"); printf("\033[31m消費兩個產品,共消費%d次,倉庫剩餘%d個\033[0m\n",i,*product); pthread_cond_signal(&cond_product); printf ("發訊號---> 已消費\n"); pthread_mutex_unlock(&lock); usleep(30000); if (i%6 == 0){ //消費間歇 sleep(9); } } return NULL; } int main() { pthread_t th_product, th_consume; //定義執行緒號 int ret; int intrinsic = 3; //初始化所有變數 init_work(); //建立程序並傳遞相關引數 ret = pthread_create(&th_product, 0, handle_product, &intrinsic); if (ret != 0) { perror("建立生產執行緒失敗\n"); exit(1); } ret = pthread_create(&th_consume, 0, handle_consume, &intrinsic); if (ret != 0) { perror("建立消費執行緒失敗\n"); exit(1); } pthread_join(th_product, 0);//回收生產執行緒 pthread_join(th_consume, 0);//回收消費執行緒 return 0; }
-
讀寫鎖(reader-writer lock)
前面介紹的互斥量加鎖要麼是鎖狀態,要麼就是不加鎖狀態。而且只有一次只有一個執行緒可以對其加鎖。這樣的目的是為了防止變數被不同的執行緒修改。但是如果有執行緒只是想讀而不會去寫的話,這有不會導致變數被修改。但是如果是互斥量加鎖,則讀寫都沒有辦法。這種場景不能使用互斥量,必須使用讀寫鎖。
讀寫鎖可以有3種狀態:
-
讀模式下加鎖狀態
-
寫模式下加鎖狀態
-
不加鎖狀態
一次只有一個執行緒可以佔有寫模式的讀寫鎖,但是多個執行緒可以同時佔有讀模式的讀寫鎖。當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的執行緒都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的執行緒都可以得到訪問權。但是任何希望以寫模式對此鎖進行加鎖的執行緒都會阻塞。直到所有的執行緒釋放它們的讀鎖為止。
讀寫鎖非常適合於對資料結構讀的次數大於寫的情況。當讀寫鎖在寫模式下時,它所保護的資料結構就可以被安全地修改,因為一次只有一個執行緒可以在寫模式下擁有這個鎖。
讀寫鎖也叫做共享互斥鎖。當讀寫鎖是讀模式鎖住的,就可以說是以共享模式鎖住的。當它是寫模式鎖住的時候,就可以說成是以互斥模式鎖住的。
#include <pthread.h>
Int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
Int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
讀寫鎖通過呼叫pthread_rwlock_init進行初始化。在釋放讀寫鎖佔有的記憶體之前,需要呼叫pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init為讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在呼叫pthread_rwlock_destroy之前就釋放了讀寫鎖佔用的記憶體空間。那麼分配給這個鎖的資源就會丟失。
要在讀模式下鎖定讀寫鎖,需要呼叫pthread_rwlock_rdlock,要在寫模式下鎖定讀寫鎖,需要呼叫pthread_rwlock_wrlock。不管以何種方式鎖住讀寫鎖。都可以呼叫pthread_rwlock_unlock進行解鎖。
Int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
Int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
Int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
#include <pthread.h> #include <stdio.h> pthread_rwlock_t rwlock; int data=1; void readerA(){ while(1){ pthread_rwlock_rdlock(&rwlock); printf("A讀者讀出:%d\n",data); pthread_rwlock_unlock(&rwlock); Sleep(1000); } } void writerB(){ while(1){ pthread_rwlock_wrlock(&rwlock); data++; printf("B作者寫入:%d\n",data); pthread_rwlock_unlock(&rwlock); Sleep(1000); } } int main() { pthread_t t1; pthread_t t2; pthread_rwlock_init(&rwlock,NULL); pthread_create(&t1,NULL,readerA,NULL); pthread_create(&t2,NULL,writerB,NULL); pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_rwlock_destroy(&rwlock); return 0; }
-
-
訊號量(semaphore)
在生產者消費者模型中,對任務數量的記錄就可以使用訊號量來做。可以理解為帶計數的條件變數。當訊號量的值小於0時,工作程序或者執行緒就會阻塞,等待物品到來。當生產者生產一個物品,會將訊號量值加1操作。 這是會喚醒在訊號量上阻塞的程序或者執行緒,它們去爭搶物品。
這裡用一個讀寫檔案作為例子:
首先需要用sem_init(); 初始化sem_t型變數,並設定初始訊號量。比如設定為1.
每次呼叫sem_wait(sem_t *); 訊號量減一,當呼叫sem_post(sem_t *); 訊號量加一。
當訊號量為0時,sem_wait(); 函式阻塞,等待訊號量 >0 時,才進行。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
typedef struct{
sem_t *lock;
int num;
}STRUCT;
void test(void * obj)
{
STRUCT *point = (STRUCT *)obj;
sem_t *semlock = point->lock;
sem_wait(semlock);
FILE *f = fopen("test.txt","a");
if(f==NULL)
printf("fopen is wrong\n");
printf("sem_wait %d\n",point->num);
int j=0;
for(j=0;j<30;j++)
fprintf(f,"%c111111111111\n",'a'+point->num);
fclose(f);
sem_post(semlock);
return;
}
int main()
{
pthread_t pid[20]; // pthread_t pid;
int ret,i=0;
STRUCT obj[13];
sem_t semlock;
if(sem_init(&semlock,0,1)!=0) // 此處初始訊號量設為1. 第二個引數為0表示不應用於其他程序。
printf("sem_init is wrong\n");
for(i=0;i<10;i++)
{
obj[i].num = i;
obj[i].lock = &semlock;
ret = pthread_create(&pid[i],NULL,(void *)test,&obj[i]);
if(ret!=0)
{
printf("create thread wrong %d!!\n",i);
return 0;
}
}
for(i=0;i<10;i++)
pthread_join(pid[i],NULL); // 等待其他執行緒結束,如果沒有這裡,主執行緒先結束,會釋放pid[]及obj[],則出現BUG。
return 0;
}