【OSTEP】第28章-鎖
鎖的基本思想
鎖為程式設計師提供了最小程度的排程控制。我們把執行緒視為程式設計師建立的實體,但是被作業系統排程,具體方式由作業系統選擇。鎖讓程式設計師獲得一些控制權。通過給臨界區加鎖,可以保證臨界區內只有一個執行緒活躍。鎖將原本由作業系統排程的混亂狀態變得更為可控。
pthread 鎖
- POSIX 庫將鎖稱為互斥量(mutex),因為它被用來提供執行緒之間的互斥。
- 粗粒度方案:加一把大鎖,把整個都保護起來
- 細粒度方案:用不同的鎖保護不同的資料和結構
實現一個鎖
先自我嘗試一下,改一下之前那個函式。
#include <stdio.h> #include <pthread.h> #include <assert.h> #include <stdlib.h> #include <time.h> pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // volatile 易變的、不穩定的意思 // 遇到這個關鍵字宣告的變數,編譯器對訪問該變數的程式碼就不再進行優化,從而可以提供對特殊地址的穩定訪問 static volatile int counter = 0; void *mypthread(void *arg) { printf("%s: begin\n", (char *)arg); int i; for (i = 0; i < 1e7; i++) { pthread_mutex_lock(&lock); counter += 1; pthread_mutex_unlock(&lock); } printf("%s: done\n", (char *)arg); return NULL; } int main(int argc, char *argv[]) { clock_t t1, t2; t1 = clock(); pthread_t p1, p2; int rc; printf("main: begin (counter = %d)\n", counter); rc = pthread_create(&p1, NULL, mypthread, "A"); assert(rc == 0); rc = pthread_create(&p2, NULL, mypthread, "B"); assert(rc == 0); rc = pthread_join(p1, NULL); assert(rc == 0); rc = pthread_join(p2, NULL); assert(rc == 0); printf("main: done with both (counter = %d)\n", counter); t2 = clock(); printf("%f\n ms", (double)(t2 - t1) / CLOCKS_PER_SEC); return 0; }
加鎖耗時:
不加鎖耗時:
評價鎖
- 提供互斥(mutual exclusion)。有效阻止多個執行緒進入臨界區
- 公平性,每個競爭執行緒都有公平的機會搶到鎖,不能有競爭鎖餓死
- 效能,時間開銷怎麼評判。
控制中斷
進入臨界區的時候關閉中斷,結束都重新開啟,這樣既簡單又正確。缺點:
-
可能被一個貪婪的程式濫用
-
不支援多處理器,你關閉了這個處理器的中斷,但是其他處理器還能進入臨界區
-
可能導致中斷丟失
-
效率問題
Peterson 演算法:jyy 的舉起旗幟上廁所的例子
測試並設定指令(原子交換)
鎖最簡單的硬體支援是測試並設定指令(test-and-set instruction),也叫作原子交換(atomic exchange)。
第一次嘗試:
typedef struct lock_t { int flag; } lock_t; void init(lock_t *mutex) { // 0 -> lock is available, 1 -> held mutex->flag = 0; } void lock(lock_t *mutex) { while (mutex->flag == 1) // TEST the flag ; // spin-wait (do nothing) mutex->flag = 1; } void unlock(lock_t *mutex) { mutex->flag = 0; }
正確性:不正確,比如第一個上鎖的時候,讀取到了沒有鎖,還沒上鎖,中斷了,然後第二個執行緒上了鎖,回到第一個執行緒還會以為沒有鎖,繼續上鎖。
效能:差,自旋(spin-waiting)
實現可用的自旋鎖
我們有一個想法是讓讀 flag 和寫 flag 變成原子操作,硬體有提供這種指令支援。
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store 'new' into old_ptr
return old; }
最關鍵的是上面的程式碼是原子執行的。
typedef struct lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
// 返回值是 1 說明鎖被別人持有,那我只能自旋
// 返回為 0 說明沒有,那我就改成了我的 1
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}
評價自旋鎖
- 正確性:正確
- 公平性:不公平,你瘋狂自旋,別人就餓死了
- 效能:單 CPU 很大,因為都去旋了,沒人幹活,多 CPU 表現還行
比較並交換
比較並交換指令(SPARC系統中是compare-and-swap,x86 系統是compare-and-exchange)。
檢測 ptr 指向的值和 expected 是否相等;如果是,更新 ptr 為新值。返回該記憶體地址的實際值。
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}
最後,你可能會發現,比較並交換指令比測試並設定更強大。當我們在將來簡單探討無等待同步(wait-free synchronization)時,會用到這條指令的強大之處。然而,如果只用它實現一個簡單的自旋鎖,它的行為等價於上面分析的自旋鎖。
連結的載入和條件式儲存指令
int LoadLinked(int *ptr) {
return *ptr;
}
int StoreConditional(int *ptr, int value) {
if (no one has updated *ptr since the LoadLinked to this address) {
*ptr = value;
return 1; // success!
} else {
return 0; // failed to update
}
}
MIPS 架構[H93]中,連結的載入(load-linked)和條件式儲存(store-conditional)可以用來配合使用,實現其他併發結構。
void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock->flag) == 1)
; // spin until it's zero
if (StoreConditional(&lock->flag, 1) == 1)
return; // if set-it-to-1 was a success: all done
// otherwise: try it all over again
}
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
獲取並增加
獲取並增加(fetch-and-add)指令,它能原子地返回特定地址的舊 值,並且讓該值自增一。
這個實現了一人一次的來。
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) {
FetchAndAdd(&lock->turn);
}
自旋過多
如果有 N 個執行緒去競爭一個鎖,情 況會更糟糕。同樣的場景下,會浪費 N−1 個時間片,只是自旋並等待一個執行緒釋放該鎖。
簡單方法:讓出來吧,寶貝
void init() {
flag = 0;
}
void lock() {
while (TestAndSet(&flag, 1) == 1)
yield(); // give up the CPU
}
void unlock() {
flag = 0;
}
時間消耗還是很多,因為會頻繁的進行上下文切換。而且還有可能出現餓死的現象。
使用佇列:休眠代替自旋
- park()能夠讓呼叫執行緒休眠
- unpark(threadID)則會喚醒 threadID 標識的執行緒
- setpark(),執行緒表明自己馬上要 park。
typedef struct lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
/* 如果有執行緒正在嘗試加鎖(guard == 1),那麼要阻塞 */
// TestAndSet(&m->guard, 1) 如果 guard 是 0 會被設定為 1,只迴圈一次
// 表明我要加鎖了,可能成功也可能不成功,等我操作完了之後把 guard 設為 0
// guard 保證 queue 操作的原子性
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
/* 當前沒有執行緒獲得鎖 */
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
/* 已有執行緒獲得鎖,將此執行緒的ID號加入到等待佇列中,並休眠 */
queue_add(m->q, gettid());
// setpark(),如果剛好另一個執行緒被排程,並且呼叫了 unpark
// 那麼後續的 park 呼叫就會直接返回,而不是一直睡眠。
setpark(); // new code
// 肯定要先把佇列讓出來,不然你睡覺去了佇列別人訪問不了,就死鎖了
// 可是讓出來有一個問題,你讓出來了,還沒 park ,哦吼,中斷了
// 然後那個執行緒 unpark 了佇列裡的這個執行緒,回到這個執行緒,你又繼續執行 park
// 此時 這個執行緒已經不在佇列裡了,完了,芭比Q了,park 了沒人叫醒,只能一直睡下去
// 所以一定需要 setpark ,
m->guard = 0;
park();
}
}
void unlock(lock_t *m) {
/* 如果有執行緒正在嘗試加鎖(guard == 1),那麼要阻塞 */
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
/* 當前佇列中有執行緒想要獲得鎖,所以喚醒一個執行緒即可 */
/* 這裡無需做鎖的釋放操作,原因是park()API的使用特性*/
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;
}
不同作業系統,不同實現
Linux 提供了 futex,它類似於 Solaris 的介面,但提供了更多核心功能。具體來說,每個 futex 都關聯一個特定的實體記憶體位置,也有一個事先建好的核心佇列。呼叫者通過 futex 呼叫(見下面的描述)來睡眠或者喚醒。
具體來說,有兩個呼叫。呼叫 futex_wait(address, expected)時,如果 address 處的值等於 expected,就會讓調執行緒睡眠。否則,呼叫立刻返回。呼叫 futex_wake(address)喚醒等待佇列中的一個執行緒。
兩階段鎖
第一階段會先自旋一段時間,希望能獲取鎖
第一階段沒有獲取鎖,那麼第二階段呼叫者會睡眠,直到鎖可用。