1. 程式人生 > >【Linux多執行緒】三個經典同步問題

【Linux多執行緒】三個經典同步問題

在瞭解了《同步與互斥的區別 》之後,我們來看看幾個經典的執行緒同步的例子。相信通過具體場景可以讓我們學會分析和解決這類執行緒同步的問題,以便以後應用在實際的專案中。

一、生產者-消費者問題

問題描述:

一組生產者程序和一組消費者程序共享一個初始為空、大小為 n 的緩衝區,只有緩衝區沒滿時,生產者才能把訊息放入到緩衝區,否則必須等待;只有緩衝區不空時,消費者才能從中取出訊息,否則必須等待。由於緩衝區是臨界資源,它只允許一個生產者放入訊息,或者一個消費者從中取出訊息。

分析:

  1. 關係分析:生產者和消費者對緩衝區互斥訪問是互斥關係,同時生產者和消費者又是一個相互協作的關係,只有生產者生產之後,消費者才能消費,它們也是同步關係。

  2. 整理思路:這裡比較簡單,只有生產者和消費者兩個程序,且這兩個程序存在著互斥關係和同步關係。那麼需要解決的是互斥和同步的PV操作的位置。

  3. 訊號量設定:訊號量mutex作為互斥訊號量,用於控制互斥訪問緩衝池,初值為1;訊號量full用於記錄當前緩衝池中“滿”緩衝區數,初值為 0;訊號量empty用於記錄當前緩衝池中“空”緩衝區數,初值為n。

程式碼示例:(semaphore類的封裝見下文)

#include<iostream>
#include<unistd.h>  // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std; #define N 5 semaphore mutex("/", 1); // 臨界區互斥訊號量 semaphore empty("/home", N); // 記錄空緩衝區數,初值為N semaphore full("/home/songlee",0); // 記錄滿緩衝區數,初值為0 int buffer[N]; // 緩衝區,大小為N int i=0; int j=0; void* producer(void* arg) { empty.P(); // empty減1
mutex.P(); buffer[i] = 10 + rand() % 90; printf("Producer %d write Buffer[%d]: %d\n",arg,i+1,buffer[i]); i = (i+1) % N; mutex.V(); full.V(); // full加1 } void* consumer(void* arg) { full.P(); // full減1 mutex.P(); printf(" \033[1;31m"); printf("Consumer %d read Buffer[%d]: %d\n",arg,j+1,buffer[j]); printf("\033[0m"); j = (j+1) % N; mutex.V(); empty.V(); // empty加1 } int main() { pthread_t id[10]; // 開10個生產者執行緒,10個消費者執行緒 for(int k=0; k<10; ++k) pthread_create(&id[k], NULL, producer, (void*)(k+1)); for(int k=0; k<10; ++k) pthread_create(&id[k], NULL, consumer, (void*)(k+1)); sleep(1); return 0; }

編譯執行輸出結果:

Producer 1 write Buffer[1]: 83
Producer 2 write Buffer[2]: 26
Producer 3 write Buffer[3]: 37
Producer 5 write Buffer[4]: 35
Producer 4 write Buffer[5]: 33
                               Consumer 1 read Buffer[1]: 83
Producer 6 write Buffer[1]: 35
                               Consumer 2 read Buffer[2]: 26
                               Consumer 3 read Buffer[3]: 37
                               Consumer 4 read Buffer[4]: 35
                               Consumer 5 read Buffer[5]: 33
                               Consumer 6 read Buffer[1]: 35
Producer 7 write Buffer[2]: 56
Producer 8 write Buffer[3]: 22
Producer 10 write Buffer[4]: 79
                               Consumer 9 read Buffer[2]: 56
                               Consumer 10 read Buffer[3]: 22
Producer 9 write Buffer[5]: 11
                               Consumer 7 read Buffer[4]: 79
                               Consumer 8 read Buffer[5]: 11

二、讀者-寫者問題

問題描述:

有讀者和寫者兩組併發執行緒,共享一個檔案,當兩個或以上的讀執行緒同時訪問共享資料時不會產生副作用,但若某個寫執行緒和其他執行緒(讀執行緒或寫執行緒)同時訪問共享資料時則可能導致資料不一致的錯誤。因此要求:

  • 允許多個讀者可以同時對檔案執行讀操作;
  • 只允許一個寫者往檔案中寫資訊;
  • 任一寫者在完成寫操作之前不允許其他讀者或寫者工作;
  • 寫者執行寫操作前,應讓已有的讀者和寫者全部退出。

分析:

  1. 關係分析:由題目分析可知,讀者和寫者是互斥的,寫者和寫者也是互斥的,而讀者和讀者不存在互斥問題。

  2. 整理思路:寫者是比較簡單的,它與任何執行緒互斥,用互斥訊號量的 PV 操作即可解決。讀者的問題比較複雜,它必須實現與寫者的互斥,多個讀者還可以同時讀。所以,在這裡用到了一個計數器,用它來判斷當前是否有讀者讀檔案。當有讀者的時候寫者是無法寫檔案的,此時讀者會一直佔用檔案,當沒有讀者的時候寫者才可以寫檔案。同時,不同的讀者對計數器的訪問也應該是互斥的。

  3. 訊號量設定:首先設定一個計數器count,用來記錄當前的讀者數量,初值為0;設定互斥訊號量mutex,用於保護更新 count 變數時的互斥;設定互斥訊號量rw用於保證讀者和寫者的互斥訪問。

程式碼示例:

#include<iostream>
#include<unistd.h>  // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std;

int count = 0;           // 記錄當前的讀者數量
semaphore mutex("/",1);  // 用於保護更新count變數時的互斥
semaphore rw("/home",1); // 用於保證讀者和寫者的互斥

void* writer(void* arg)
{
    rw.P();              // 互斥訪問共享檔案

    printf("  Writer %d start writing...\n", arg);
    sleep(1);
    printf("  Writer %d finish writing...\n", arg);

    rw.V();              // 釋放共享檔案
}

void* reader(void* arg)
{
    mutex.P();           // 互斥訪問count變數
    if(count == 0)       // 當第一個讀執行緒讀檔案時
        rw.P();          // 阻止寫執行緒寫
    ++count;             // 讀者計數器加1
    mutex.V();           // 釋放count變數

    printf("Reader %d start reading...\n", arg);
    sleep(1);
    printf("Reader %d finish reading...\n", arg);

    mutex.P();           // 互斥訪問count變數
    --count;             // 讀者計數器減1
    if(count == 0)       // 當最後一個讀執行緒讀完檔案
        rw.V();          // 允許寫執行緒寫
    mutex.V();           // 釋放count變數
}


int main()
{
    pthread_t id[8];     // 開6個讀執行緒,2個寫執行緒

    pthread_create(&id[0], NULL, reader, (void*)1);
    pthread_create(&id[1], NULL, reader, (void*)2);

    pthread_create(&id[2], NULL, writer, (void*)1);
    pthread_create(&id[3], NULL, writer, (void*)2);

    pthread_create(&id[4], NULL, reader, (void*)3);
    pthread_create(&id[5], NULL ,reader, (void*)4);
    sleep(2);
    pthread_create(&id[6], NULL, reader, (void*)5);
    pthread_create(&id[7], NULL ,reader, (void*)6);

    sleep(4);
    return 0;
}

編譯執行的結果如下:

Reader 2 start reading...
Reader 1 start reading...
Reader 3 start reading...
Reader 4 start reading...
Reader 1 finish reading...
Reader 2 finish reading...
Reader 3 finish reading...
Reader 4 finish reading...
  Writer 1 start writing...
  Writer 1 finish writing...
  Writer 2 start writing...
  Writer 2 finish writing...
Reader 5 start reading...
Reader 6 start reading...
Reader 5 finish reading...
Reader 6 finish reading...

三、哲學家進餐問題

問題描述:

一張圓桌上坐著 5 名哲學家,桌子上每兩個哲學家之間擺了一根筷子,桌子的中間是一碗米飯,如圖所示:



哲學家們傾注畢生精力用於思考和進餐,哲學家在思考時,並不影響他人。只有當哲學家飢餓的時候,才試圖拿起左、右兩根筷子(一根一根拿起)。如果筷子已在他人手上,則需等待。飢餓的哲學家只有同時拿到了兩根筷子才可以開始進餐,當進餐完畢後,放下筷子繼續思考。

分析:

  1. 關係分析:5名哲學家與左右鄰居對其中間筷子的訪問是互斥關係。

  2. 整理思路:顯然這裡有 5 個執行緒,那麼要如何讓一個哲學家拿到左右兩個筷子而不造成死鎖或飢餓現象?解決方法有兩個,一個是讓他們同時拿兩個筷子;二是對每個哲學家的動作制定規則,避免飢餓或死鎖現象的發生。

  3. 訊號量設定:定義互斥訊號量陣列chopstick[5] = {1,1,1,1,1}用於對 5 根筷子的互斥訪問。

示例程式碼:

semaphore chopstick[5] = {1,1,1,1,1}  // 訊號量陣列
Pi()                                  // i號哲學家的執行緒
{
    do
    {
        P(chopstick[i]);              // 取左邊筷子
        P(chopstick[(i+1)%5]);        // 取右邊筷子
        eat;                          // 進餐
        V(chopstick[i]);              // 放回左邊筷子
        V(chopstick[(i+1)%5]);        // 放回右邊筷子
        think;                        // 思考
    }while(1);
}

上面的虛擬碼存在一個問題:當五個哲學家都想要進餐,分別拿起他們左邊筷子的時候(都恰好執行完P(chopstick[i])),筷子已經被拿光了,等到他們再想拿右邊的筷子的時候,就全被阻塞了,這就出現了死鎖。

為了防止死鎖的發生,可以對哲學家執行緒施加一些限制條件,比如:

  • 至多允許四個哲學家同時進餐;
  • 僅當一個哲學家左右兩邊的筷子都可用時才允許他抓起筷子;
  • 對哲學家順序編號,要求奇數號哲學家先抓左邊的筷子,然後再抓他右邊的筷子,而偶數號哲學家剛好相反。

這裡,我們採用第二種方法來改進上面的演算法,即當一個哲學家左右兩邊的筷子都可用時,才允許他抓起筷子。

#include<iostream>
#include<vector>
#include<unistd.h>  // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std;

vector<semaphore*> chopstick;   // 訊號量陣列
semaphore mutex("/", 1);  // 設定取左右筷子的訊號量 <-- 關鍵

void* P1(void* arg)  // 第1個哲學家執行緒
{
    mutex.P();                 // 在取筷子前獲得互斥量
    chopstick[0]->P();         // 取左邊筷子
    chopstick[1]->P();         // 取右邊筷子
    mutex.V();                 // 釋放取筷子的訊號量

    printf("Philosopher 1 eat.\n");

    chopstick[0]->V();          // 放回左邊筷子
    chopstick[1]->V();          // 放回右邊筷子
}

void* P2(void* arg)  // 第2個哲學家執行緒
{
    mutex.P();                 // 在取筷子前獲得互斥量
    chopstick[1]->P();         // 取左邊筷子
    chopstick[2]->P();         // 取右邊筷子
    mutex.V();                 // 釋放取筷子的訊號量

    printf("Philosopher 2 eat.\n");

    chopstick[1]->V();          // 放回左邊筷子
    chopstick[2]->V();          // 放回右邊筷子
}


void* P3(void* arg)  // 第3個哲學家執行緒
{
    mutex.P();                 // 在取筷子前獲得互斥量
    chopstick[2]->P();         // 取左邊筷子
    chopstick[3]->P();         // 取右邊筷子
    mutex.V();                 // 釋放取筷子的訊號量

    printf("Philosopher 3 eat.\n");

    chopstick[2]->V();          // 放回左邊筷子
    chopstick[3]->V();          // 放回右邊筷子
}

void* P4(void* arg)  // 第4個哲學家執行緒
{
    mutex.P();                 // 在取筷子前獲得互斥量
    chopstick[3]->P();         // 取左邊筷子
    chopstick[4]->P();         // 取右邊筷子
    mutex.V();                 // 釋放取筷子的訊號量

    printf("Philosopher 4 eat.\n");

    chopstick[3]->V();          // 放回左邊筷子
    chopstick[4]->V();          // 放回右邊筷子
}


void* P5(void* arg)  // 第5個哲學家執行緒
{
    mutex.P();                 // 在取筷子前獲得互斥量
    chopstick[4]->P();         // 取左邊筷子
    chopstick[0]->P();         // 取右邊筷子
    mutex.V();                 // 釋放取筷子的訊號量

    printf("Philosopher 5 eat.\n");

    chopstick[4]->V();          // 放回左邊筷子
    chopstick[0]->V();          // 放回右邊筷子
}

int main()
{
    semaphore *sem1 = new semaphore("/home", 1);
    semaphore *sem2 = new semaphore("/home/songlee", 1);
    semaphore *sem3 = new semaphore("/home/songlee/java", 1);
    semaphore *sem4 = new semaphore("/home/songlee/ADT", 1);
    semaphore *sem5 = new semaphore("/home/songlee/Test", 1);
    chopstick.push_back(sem1);
    chopstick.push_back(sem2);
    chopstick.push_back(sem3);
    chopstick.push_back(sem4);
    chopstick.push_back(sem5);

    pthread_t id;

    pthread_create(&id, NULL, P1, NULL);
    pthread_create(&id, NULL, P2, NULL);
    pthread_create(&id, NULL, P3, NULL);
    pthread_create(&id, NULL, P4, NULL);
    pthread_create(&id, NULL, P5, NULL);

    sleep(1);
    delete sem1;
    delete sem2;
    delete sem3;
    delete sem4;
    delete sem5;
    return 0;
}

編譯執行的結果如下:

Philosopher 2 eat.
Philosopher 1 eat.
Philosopher 3 eat.
Philosopher 4 eat.
Philosopher 5 eat.

注意:建立訊號量時的 路徑引數 請改成你的系統中存在的路徑!!!

附:semaphore類的封裝

上面的程式碼中都使用了這個semaphore類,實現如下:

  • semaphore.h
#pragma once
#include<iostream>
#include<cstdio>
#include<cstdlib>
#include<sys/sem.h>
using namespace std;

// 聯合體,用於semctl初始化
union semun {
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};


class semaphore {
private:
    int sem_id;
    int init_sem(int);
public:
    semaphore(const char*, int); /*建構函式*/
    ~semaphore();                /*解構函式*/
    void P();                    /*P操作*/
    void V();                    /*V操作*/
};
  • semaphore.cpp
#include"semaphore.h"

semaphore::semaphore(const char* path, int value)
{
    key_t key;
    /*獲取key值*/
    if((key = ftok(path, 'z')) < 0) {
        perror("ftok error");
        exit(1);
    }

    /*建立訊號量集,其中只有一個訊號量*/
    if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1) {
        perror("semget error");
        exit(1);
    }

    init_sem(value);
}


semaphore::~semaphore()
{
    union semun tmp;
    if(semctl(sem_id, 0, IPC_RMID, tmp) == -1) {
        perror("Delete Semaphore Error");
        exit(1);
    }
}


void semaphore::P()
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序號*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1) {
        perror("P operation Error");
    }
}


void semaphore::V()
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序號*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1) {
        perror("V operation Error");
    }
}


// 初始化訊號量
int semaphore::init_sem(int value)
{
    union semun tmp;
    tmp.val = value;
    if(semctl(sem_id, 0, SETVAL, tmp) == -1) {
        perror("Init Semaphore Error");
        return -1;
    }
    return 0;
}

在這裡,要建立不同的訊號量,必須傳遞不同的路徑引數(這樣獲取的 key 值才會不一樣)。

注意,本文的關注點並不在於 linux 下如何建立訊號量以及如何封裝起來才更方便,而是通過幾個經典的同步例項,瞭解在多執行緒環境下如何解決這類執行緒同步問題。