1. 程式人生 > >多執行緒同步與互斥

多執行緒同步與互斥

多執行緒執行緒基礎操作
關於本篇部落格的更多程式碼:GitHub連結

執行緒的同步與互斥,學習生產者消費者模型及應用場景

執行緒安全:生產者與消費者模型,讀寫者模型,同步與互斥的實現,互斥鎖,條件變數,posix訊號量,讀寫鎖,自旋鎖

大部分情況,執行緒使用的資料都是區域性變數,變數的地址空間線上程棧空間內,這種情況,變數歸
屬單個執行緒,其他執行緒無法獲得這種變數。
但有時候,很多變數都需要線上程間共享,這樣的變數稱為共享變數,可以通過資料的共享,完成
執行緒之間的互動。
多個執行緒併發的操作共享變數,會帶來⼀些問題。
執行緒共享執行緒所處的程序的虛擬地址空間,這些公共資源缺乏資料的訪問控制容易造成資料混亂,因此為了解決執行緒安全提出來生產者與消費者模型:

1、生產者與消費者模型

一個場所,二個角色,三個關係:生產者與消費者爭搶同一個臨界區的臨界資源,
生產者與生產者都在搶著訪問操作同一個資源,生產者與生產者之間的關係:互斥關係
生產者與消費者:同步關係+互斥關係
消費者與消費者:互斥關係

在這裡插入圖片描述

為了保證維持生產者與消費者之間的關係來解決資料的安全訪問操作,因此提出了同步與互斥,同步就是解決時序訪問問題;互斥就是解決同一資源同一時間的唯一訪問性問題。
解決執行緒的安全問題實際就是對映模型中的關係來解決
同步原則保證了不會產生飢餓問題,互斥原則保證了訪問原子性。鎖變數本身必須是原子操作。

2、讀寫者模型

適用場景:在編寫多執行緒的時候,有⼀種情況是十分常見的。那就是,有些公共資料修改的機會比較少。相比較改寫,它們讀的機會反而高的多。通常而言,在讀的過程中,往往伴隨著查詢的操作,中間耗時很長。給這種程式碼段加鎖,會極大地降低我們程式的效率。那麼有沒有⼀種⽅法,可以專門處理這種多讀少寫的情況呢? 有,
那就是讀寫鎖。讀寫鎖本質上是⼀種⾃旋鎖

在這裡插入圖片描述

注意:寫獨佔,讀共享,寫鎖優先順序⾼
讀寫鎖就是基於自旋鎖實現的
自旋鎖是一直輪詢判斷,非常消耗CPU資源,是用於確定等待花費時間比較少,很快就能獲取到鎖的這種情況。互斥鎖是掛起等待。
/*  這是一個驗證讀寫鎖的程式碼
 *      1. 讀寫鎖的初始化
 *      2. 讀寫鎖的操作(加讀鎖/加寫鎖/解鎖)
 *      3. 讀寫鎖的釋放
 *  特性:
 *      寫獨佔,讀共享,寫優先順序高
 *  有多個寫執行緒,多個讀執行緒,驗證特性
 */
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <pthread.h> int ticket = 100; pthread_rwlock_t rwlock; void *thr_write(void *arg) { while(1) { //加寫鎖 //int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); //int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); //pthread_rwlock_wrlock(&rwlock);//寫獨佔 //pthread_rwlock_rdlock(&rwlock);//讀共享 pthread_rwlock_wrlock(&rwlock);//寫獨佔 if (ticket > 0 ) { sleep(5); ticket--; printf("ticket:%d\n", ticket); } printf("this is write!!\n"); pthread_rwlock_unlock(&rwlock); sleep(5); } return 0; } void *thr_read(void *arg) { while(1) { pthread_rwlock_rdlock(&rwlock); if (ticket > 0) { sleep(5); ticket--; printf("ticket:%d\n", ticket); } printf("this is read!!!\n"); pthread_rwlock_unlock(&rwlock); sleep(5); } return 0; } int main() { pthread_t wtid[4], rtid[4]; int ret, i; //1. 讀寫鎖的初始化 //int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, // const pthread_rwlockattr_t *restrict attr); pthread_rwlock_init(&rwlock, NULL); for (i = 0; i < 4; i++) { ret = pthread_create(&wtid[i], NULL, thr_write, NULL); if (ret != 0) { printf("pthread_create error\n"); return -1; } } for (i = 0; i < 4; i++) { ret = pthread_create(&rtid[i], NULL, thr_read, NULL); if (ret != 0) { printf("pthread_create error\n"); return -1; } } pthread_join(wtid[0], NULL); pthread_join(wtid[1], NULL); pthread_join(wtid[2], NULL); pthread_join(wtid[3], NULL); pthread_join(rtid[0], NULL); pthread_join(rtid[1], NULL); pthread_join(rtid[2], NULL); pthread_join(rtid[3], NULL); //3. 銷燬讀寫鎖 pthread_rwlock_destroy(&rwlock); return 0; }

實際中使用互斥鎖(阻塞等待)還是讀寫鎖(自旋鎖)取決於正在佔用鎖的執行緒佔用鎖的時間。

實現執行緒間的互斥:互斥鎖(互斥量)mutex

1、 定義一個互斥鎖

pthread_mutex_t mutex;

2、初始化互斥鎖

互斥鎖的初始化:
1、定義時候直接賦值初始化,最後不需要手動釋放
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2、函式介面初始化,最後需要手動釋放
int pthread_mutex_init(pthread_mutex_t* mutex,const pthread_mutexattr_t* attr);
mutex:互斥鎖變數
attr:互斥鎖屬性,可以置空NULL
成功返回:0 錯誤:errno

3、對臨界操作進行加鎖/解鎖

int pthread_mutex_lock(pthread_mutex_t *mutex);

阻塞加鎖,如果獲取不到鎖則阻塞等待鎖被解開

int pthread_mutex_trylock(pthread_mutex_t *mutex);
非阻塞加鎖,如果獲取不到鎖則立即報錯返回EBUSY
int pthread_mutex_timedlock (pthread_mutex_t *mutex,struct timespec *t);
限時阻塞加鎖,如果獲取不到鎖則等待指定時間,在這段時間內如果一直獲取不到,則報錯返回,否則加鎖
int pthread_mutex_unlock(pthread_mutex_t *mutex);
解鎖

4、釋放互斥鎖

int pthread_mutex_destroy(pthread_mutex_t *mutex);
實現執行緒間的同步:條件變數、posix訊號量
/*這是一個火車站黃牛買票的栗子
 * 每一個黃牛都是一個執行緒,在這個栗子中有一個總票數ticket
 */
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>

int ticket = 100;

pthread_mutex_t mutex;//定義一個鎖變數

void* sale_ticket(void* arg)
{
    int id = (int)arg;
    while(1){
    pthread_mutex_lock(&mutex);//加鎖
        if(ticket > 0){
            usleep(100);
            printf("Yellow cow %d get a ticket    ticket:%d\n",id,ticket);
            ticket--;
        }else{
            printf("have no ticket:%d\n",ticket);
            pthread_mutex_unlock(&mutex);//有可能退出,需要解鎖,否則會死鎖
            pthread_exit(NULL);
        }
    pthread_mutex_unlock(&mutex);//解鎖
    }
    return NULL;
}

int main()
{
    pthread_t tid[4];
    int i = 0;

    pthread_mutex_init(&mutex,NULL);//使用函式初始化鎖
    for(i=0;i<4;i++)
    {
        int ret=pthread_create(&tid[i],NULL,sale_ticket,(void*)i);
        //不能傳i的地址,如果傳i的地址,執行緒函式在呼叫這個地址時候都是3
        if( ret !=0){
            perror("pthread_create error");
            exit(-1);
        }
    }

    pthread_join(tid[0],NULL);
    pthread_join(tid[1],NULL);
    pthread_join(tid[2],NULL);
    pthread_join(tid[3],NULL);


    pthread_mutex_destroy(&mutex);//銷燬鎖
    return 0;
}
對互斥鎖進行操作的時候,有加鎖就一定要有解鎖,並且必須在任意一個有可能會退出的地方都要進行解鎖操作,否則會造成其它執行緒的死鎖。
死鎖情況:因為一直獲取不到鎖資源而造成的鎖死情況

死鎖的必要條件:必須具備條件才能滿足

1. 互斥條件-----我獲取了鎖你就不能再獲取
2. 不可剝奪條件----我拿到了鎖別人不能釋放我的鎖
3. 請求與保持條件----拿了鎖1又去獲取鎖2,如果沒有獲取到鎖2不釋放鎖1
4. 環路等待條件----a拿了鎖1去請求鎖2,b拿了鎖2區求鎖1
預防產生死鎖:破壞請求與保持條件
避免產生死鎖的經典例項:銀行家演算法

實現執行緒間的同步:條件變數

在這裡插入圖片描述

需要一個條件:表示臨界區有沒有資源
為什麼條件變數要和互斥鎖搭配使用?

>因為等待需要被喚醒,而被喚醒的前提條件就是條件已經滿足,並且這個條件本身就是一個臨界資源,因此改變條件的操作需要被保護。

條件變數的初始化及銷燬:
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t * cond,const pthread_condattr_t * attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

沒有資源就去阻塞等待:

int pthread_cond_wait(pthread_cond_t * cond,pthread_mutex_t * mutex);
//解鎖+等待,當被喚醒時候,它自動獲得鎖

還有限時等待的函式:

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,const struct timespec *abstime);

pthread_cond_wait函式會對互斥鎖做判斷,如果呼叫執行緒加鎖,就解鎖,然後陷入等待,整個過程是原子操作,防止消費者先拿到鎖,發現條件變數不滿足,無法消費,它陷入阻塞等待,這時候生產者得不到鎖。

喚醒在條件變數上的執行緒:

int pthread_cond_broadcast(pthread_cond_t *cond);//廣播喚醒
int pthread_cond_signal(pthread_cond_t *cond);//喚醒第一個等待的條件變數的執行緒
條件變數程式碼演示:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <errno.h>

pthread_cond_t cond;
pthread_mutex_t mutex;
int basket = 0;//公共條件,需要互斥鎖來保證原子性

void* saler(void*arg)
{
    while(1){
        pthread_mutex_lock(mutex);
        if(basket == 0){
            printf("I sell a good\n");
            basket = 1;
            pthread_cond_signal(&cond);
        pthread_mutex_unlock(mutex);
        }
    }
    return NULL;
}

void* customer(void*arg)
{
    while(1){
        pthread_mutex_lock(mutex);
        if(basket == 0){
            //初始狀態等待,睡眠
            //pthread_cond_wait函式會對互斥鎖做判斷,如果呼叫執行緒加鎖,就解鎖,然後陷入等待,整個過程是原子操作
            pthread_cond_wait(&cond,&mutex);
        }
        printf("I bought a gift for my girl friend!!\n");
        basket = 0;
        pthread_mutex_unlock(mutex);
    }
    return NULL;
}

int main()
{
    pthread_t t1,t2;
    int ret;
    pthread_cond_init(&cond,NULL);//條件變數初始化
    pthread_mutex_init(&mutex,NULL);

    ret = pthread_create(&t1,NULL,saler,NULL);
    if(ret != 0){
        perror("pthread_create error");
        exit(-1);
    }

    ret = pthread_create(&t1,NULL,customer,NULL);
    if(ret != 0){
        perror("pthread_create error");
        exit(-1);
    }
    pthread_join(t1,NULL);
    pthread_join(t2,NULL);
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
    return 0;
}

posix訊號量

POSIX訊號量和SystemV訊號量作用相同,都是用於同步操作,達到無衝突的訪問共享資源的目的。 但POSIX可以用於執行緒間同步。systemV標準posix實現程序間通訊
即可以實現同步也可以實現互斥,既可以用於程序間同步與互斥,也可以用於執行緒間的同步與互斥。
訊號量是什麼(具有一個等待佇列的計數器)

posix執行緒同步實現

消費者:沒有資源則等待
生產者:生產出來則通知佇列中的等待者

1、訊號量的初始化

int sem_init(sem_t *sem, int pshared, unsigned int value);
If pshared has the value 0, then the semaphore is shared between the threads of a process
If pshared is nonzero, then the semaphore is shared between processes
sem:訊號量變數名
value:訊號量初始計數
成功:0 失敗:-1

2、訊號量的操作(等待/通知)

等待:對於消費者,沒有資源則等待。等待訊號量,會將訊號量的值減1。
int sem_wait(sem_t *sem);//阻塞等待
int sem_trywait(sem_t *sem);//沒有資源,報錯返回
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);//限時等待,超時報錯返回
釋出訊號量:生產者生產出資源,通知消費者。釋出訊號量,表示資源使⽤完畢,可以歸還資源了。將訊號量值加1。
int sem_post(sem_t *sem);

3、訊號量的釋放

int sem_destroy(sem_t *sem);
Link with -pthread.
/*訊號量實現執行緒同步與互斥
 * 同步:
 *      1、訊號量的初始化
 *      2、訊號量的操作(等待/通知)
 *      3、訊號量的釋放
 */
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>

sem_t sem;//定義訊號量

void*thr_producer(void*arg)
{
    while(1){
        sleep(1);
        printf("I make a hot beef noodles!!\n");
        sem_post(&sem);
    }
    return NULL;
}

void*thr_customer(void*arg)
{
    while(1){
        sem_wait(&sem);
        printf("It is declicious!!\n");
    }
    return NULL;

}

int main()
{
    pthread_t t1,t2;
    int ret;

    sem_init(&sem,0,0);//訊號量初始化
    ret = pthread_create(&t1,NULL,thr_producer,NULL);
    if(ret != 0 ){
        perror("pthread_create error");
        exit(-1);
    }

    ret = pthread_create(&t2,NULL,thr_customer,NULL);
    if(ret != 0 ){
        perror("pthread_create error");
        exit(-1);
    }

    pthread_join(t1,NULL);
    pthread_join(t2,NULL);
    sem_destroy(&sem);
    
    return 0;
}

posix執行緒互斥實現

訊號量的操作(等待+通知)
sem_wait()訊號量減1
sem_post()釋出訊號量,表示資源使⽤完畢,可以歸還資源了。將訊號量值加1。
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>

sem_t sem;//定義訊號量

int ticket = 100;//黃牛搶票,總票數100

void* buy_ticket(void*arg)
{
    int id = (int)arg;
    while(1){
        sem_wait(&sem);
        if(ticket > 0){
            usleep(1000);
            ticket--;
            printf("%d Buy a ticket,the ticket has %d\n",id,ticket);
        }
        sem_post(&sem);
    }
    return NULL;
}
int main()
{
    pthread_t tid[4];
    int ret;
    sem_init(&sem,0,1);//訊號量初始化

    int i = 0;
    for(i=0;i<4;i++){
        ret = pthread_create(&tid[i],NULL,buy_ticket,(void