Linux中執行緒的同步與互斥、生產者消費模型和讀者寫者問題、死鎖問題
執行緒的同步與互斥
執行緒是一個存在程序中的一個執行控制流,因為執行緒沒有程序的獨立性,在程序內部執行緒的大部分資源資料都是共享的,所以在使用的過程中就需要考慮到執行緒的安全和資料的可靠。不能因為執行緒之間資源的競爭而導致資料發生錯亂,也不能因為有些執行緒因為排程器長時間沒有排程從而導致飢餓問題。所以線上程中也有了同步與互斥,這裡用 “也” 是因為程序中也有同步與互斥,今天來了解執行緒中的同步與互斥。
互斥量
我們為什麼要有互斥量
首先,一個程序中的多個執行緒因為同處於一個虛擬地址空間中,所以相互之間大部分資料是共享的,從而線上程競爭中出現數據錯亂的情況,我們來舉例子來看一下
首先我們採用多個執行緒去競爭著去修改count的資料,我們定義初始的count為100,讓其減少為0就可以了,我們看看會有什麼結果。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define num 3 //我們用3個執行緒
int count = 100;
void* MyFun(void* arg) //執行緒入口函式
{
int j = (int)arg;
while (ticket >= 0)
{
usleep(1000);
printf("thread %d count %d\n", j, count);
count--;
}
return NULL;
}
int main()
{
pthread_t th[num];
int i = 0;
for (;i < num; ++i)
{
// 建立執行緒
int ret = pthread_create(&th[i], NULL, MyFun, (void*)i);
if (ret != 0)
{
perror("pthread_create error\n");
exit(1);
}
}
i = 0 ;
for (; i < num; ++i)
{
// 等待執行緒,釋放資源
pthread_join(th[i], NULL);
}
return 0;
}
我們先看看結果
看到這個結果,如果仔細看程式碼就會發現,我們寫的是程式碼count到0就退出,但是為什麼會出現到-2呢?這裡我們就解釋一下
首先執行緒之間資源共享,所以隨著cpu的排程,如果你是多核計算機,執行緒是可以同時訪問一個共享資源,這個時候就會發生:
這只是其中的一種情況,還有可能在記憶體讀到暫存器中的時候被切換掉等,就出現了資料錯誤的情況。
所以我們要加上互斥鎖,讓互斥的訪問臨界資源。
我們修改我們的程式碼:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define num 3
// 互斥鎖
pthread_mutex_t mutxe;
int ticket = 100;
void* MyFun(void* arg)
{
long j = (long)arg;
while (1)
{
usleep(1000);
pthread_mutex_lock(&mutxe); // 加鎖
if (ticket > 0)
{
printf("thread %lu ticket %d\n", j, ticket);
ticket--;
pthread_mutex_unlock(&mutxe); // 解鎖
}
else
{
pthread_mutex_unlock(&mutxe); // 解鎖
break;
}
}
return NULL;
}
int main()
{
pthread_mutex_init(&mutxe, NULL); //初始化鎖
pthread_t th[num];
long i = 0;
for (;i < num; ++i)
{
// 建立鎖
int ret = pthread_create(&th[i], NULL, MyFun, (void*)i);
if (ret != 0)
{
perror("pthread_create error\n");
exit(1);
}
}
i = 0;
for (; i < num; ++i)
{
// 等待釋放執行緒
pthread_join(th[i], NULL);
}
pthread_mutex_destroy(&mutxe);
return 0;
}
我們看結果
執行緒中互斥鎖API
建立一個互斥鎖,一般為全域性量
// 當我們初始化為PTHREAD_MUTEX_INITIALIZER的時候互斥量不需要銷燬
ptread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 初始化互斥量
int pthread_mutex_init(pthread_mutex_t* restrict mutex,\
const pthread_mutexattr_t* restrict attr);
// mutex 為互斥量;
// attr NULL;
// 銷燬互斥量
int pthread_mutex_destroy(pthread_mutex_t* mutex);
// mutex 互斥量
// 加鎖解鎖
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
條件變數(同步)
首先我們還是瞭解一下為什麼需要同步,條件變數。
為了防止競爭造成資料錯亂,所以加上互斥鎖,但是當兩個執行緒同時訪問時候,一個執行緒的訪問速度快,一個訪問慢,所以當一個執行緒不停的申請鎖釋放鎖,但是裡面的狀態沒有得到另一個執行緒改變,那麼就會產生資源的浪費。
還有可能因為優先順序問題,一個執行緒的優先順序高,臨界資源不能滿足它的需求,它不停的申請鎖釋放鎖,但是得不到滿足,但是其它執行緒不等訪問臨界資源,從而造成飢餓問題。
問了解決這個問題,所以我們就有了一個同步的條件,來保證公平的訪問,也可以說減少效能上的開銷。
我們先看一下相關API
先需要有個同步變數和互斥鎖
pthread_cond_t cond; // 同步變數,也一般在全域性區
// 初始化條件變數
int pthread_cond_init(pthread_cond_t* restrict cond, const \
pthread_condattr_t* resttict attr);
// cond 初始化的條件變數
// attr NULL
// 銷燬
int pthread_cond_destroy(pthread_cond_t* cond);
// cond 銷燬的條件變數
// 等待條件
int pthread_cond_wait(pthread_cond_t* restrict cond, \
pthread_mutex_t* restrict mutex);
// cond 在cond條件變數上等待
// mutex 互斥量(一會解釋)
// 喚醒等待
int pthread_cond_signal(pthread_cond_t* cond);
// cond 喚醒的條件變數
在這裡我們要注意,在條件變數中的等待,為什麼互斥變數,是因為 wait 方法所做的功能,pthread_cond_wait 函式在等待,要做三件事,先是解鎖再等待等到訊號後還要加鎖,為什麼一個函式要幹這麼多事情呢?
這裡因為競態條件產生的問題,必須要把這三個步驟合到一塊為一個原子操作。
我們來畫圖分析
為了運用條件變數時候,不容易出錯,最好運用
等待條件規範(自己覺得可靠)
pthread_mutex_lock(&mutex);
while (判斷條件)//這裡用while是因為,當收到異常訊號的時候可以重新判斷
pthread_cond_wait(cond, mutex)
修改條件
pthread_mutex_unlock(&mutex);
給條件傳送程式碼
pthread_mutex_lock(&mutex)
設定條件
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
生產者消費者模型
生產者消費者模型是執行緒中同步互斥的很典型的例子。
其中有三種關係:
1)生產者與生產者:互斥
2)消費者與消費者:互斥
3)生產者與消費者:同步互斥
生產者與消費者模型是基於一個場景,就是生產者在每次生產一個數據必須要放在一個倉庫中,而消費者消費必須去倉庫中拿資料,這樣我們就會產生一些問題,比如,在生產者生產好資料去倉庫放東西的時候,消費者是不能進去的,同行消費者取的時候,生產者也不能進去。當倉庫為空的時候消費者就得等生產者放資料,倉庫滿了就必須等消費者取資料。所以為了滿足上面的需求,我們就用同步與互斥來進行讓他們有效的進行。
我們用程式碼模擬實現。我們用一個連結串列來模擬一個倉庫。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
// 互斥量
pthread_mutex_t mutex;
// 條件
pthread_cond_t cond;
// 用與記錄次數
int count = 0;
// 用一個連結串列來當作倉庫
typedef struct ListNode
{
struct ListNode* next;
int data;
}ListNode;
// 帶頭節點
ListNode head;
ListNode* CreateNode(int value)
{
ListNode* new_node = (ListNode*)malloc(sizeof(ListNode));
new_node->next = NULL;
new_node->data = value;
return new_node;
}
void Init(ListNode* head)
{
head->data = 0;
head->next = NULL;
}
void Push(ListNode* head, int value)
{
if (head == NULL)
{
return;
}
ListNode* node = CreateNode(value);
ListNode* nex = head->next;
head->next = node;
node->next = nex;
}
void Pop(ListNode* head, int *top)
{
if (head == NULL)
{
return;
}
if (head->next == NULL)
{
return;
}
*top = head->next->data;
ListNode* node = head->next;
head->next = node->next;
free(node);
}
void* Producer(void* arg)
{
(void)arg;
while (1)
{
// 加鎖
pthread_mutex_lock(&mutex);
Push(&head, count);
printf("Producer %d \n", head.next->data);
++count;
// 產生一個數據,就要喚醒等待的執行緒
pthread_cond_signal(&cond);
// 解鎖
pthread_mutex_unlock(&mutex);
sleep(1);
}
return NULL;
}
void* Consumer(void* arg)
{
(void)arg;
while (1)
{
int value = -1;
pthread_mutex_lock(&mutex);
// 這裡用while是因為有可能被訊號打斷,
// 當再次返回時候,就可以重新判斷
// 也有可能被其它異常訊號喚醒
while (head.next == NULL)
{
// 沒有資料,就要進行
// 解鎖
// 等待
// 加鎖
pthread_cond_wait(&cond, &mutex);
}
Pop(&head, &value);
printf("consumer %d\n",value);
pthread_mutex_unlock(&mutex);
usleep(100000);
}
return NULL;
}
int main()
{
Init(&head);
// 初始化
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
// 建立執行緒
pthread_t producer, consumer;
pthread_create(&producer, NULL, Producer, NULL);
pthread_create(&consumer, NULL, Consumer, NULL);
// 等待執行緒
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
// 銷燬同步與互斥量
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}
讀者寫者問題
上面的生產者消費者模型是兩個執行緒都在修改臨界資源,那麼我們的讀者寫者是一個執行緒修改多個執行緒讀不修改的訪問,這裡我們要做到一下幾點:
1)讀者與讀者之間是可以同時訪問臨界資源
2)寫者與寫者只能有一個,當然這裡只有一個寫者
3)讀者和寫者同時訪問臨界資源,寫者優先
我們來看相關的函式的API
//定義一個讀寫鎖變數
pthread_rwlock_t rwlock;
// 初始化 變數
int pthread_rwlock_init(pthread_rwlock_t* restrict rwlock,\
phthread_rwlockattr_t* restrict attr);
// 銷燬
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
// 加讀鎖和加寫鎖
int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock); // 讀鎖
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock); // 寫鎖
// 解鎖
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);
對照上面的場景和和相關API我們來寫程式碼
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#define num 5
pthread_rwlock_t lock; //讀寫鎖
int count = 0; //臨界資源
void *Reader(void* arg)
{
long i = (long)arg;
while (1)
{
pthread_rwlock_rdlock(&lock); //加讀鎖
printf("Reader %lu count %d\n", i, count);
pthread_rwlock_unlock(&lock); // 解鎖
usleep(500000);
}
return NULL;
}
void *Writer(void* arg)
{
long i = (long)arg;
while (1)
{
pthread_rwlock_wrlock(&lock); //寫鎖
++count;
printf("Writer %lu count %d\n", i, count);
pthread_rwlock_unlock(&lock); //解鎖
sleep(1);
}
return NULL;
}
int main()
{
// 初始化
pthread_rwlock_init(&lock, NULL);
// 建立執行緒
pthread_t th[num];
long i = 0;
for (; i < num-1; ++i)
{
pthread_create(&th[i], NULL, Reader, (void*)i);
}
pthread_create(&th[num], NULL, Writer, (void*)1);
// 等待執行緒,釋放資源
i = 0;
for (; i < num; ++i)
{
pthread_join(th[i], NULL);
}
pthread_rwlock_destroy(&lock);
return 0;
}
死鎖問題
死鎖的產生是因為我們為了保證執行緒或者程序訪問臨界資源時候保證真確的資料。但是又引來了新的問題,就是死鎖。
死鎖,在我們寫程式碼的過程中,加鎖就必須要解鎖,不然就產生死鎖,還有在一個執行緒中我們嘗試對一個鎖進行兩次加鎖操作也會產生死鎖。
還有鎖的數量和資源的數量相同,而每個執行緒需要兩個資源,當同時對資源進行加鎖,就會產生經典的哲學家進餐問題。這也是一種死鎖。
我們總結一下:
死鎖的形成
- 競爭不可搶佔資源引起死鎖
- 競爭可消耗資源引起死鎖
- 程序推進順序不當引起死鎖
產生死鎖的四個必要條件
- 互斥條件
- 請求和保持條件
- 不可搶佔條件
- 迴圈等待條件
那麼我們怎麼解決死鎖問題呢?
預防死鎖必須破壞四個必要條件之一
但是不能破壞互斥條件
如有錯誤,請多多指導!謝謝!