1. 程式人生 > 其它 >【OSTEP】第28章-鎖

【OSTEP】第28章-鎖

鎖的基本思想

鎖為程式設計師提供了最小程度的排程控制。我們把執行緒視為程式設計師建立的實體,但是被作業系統排程,具體方式由作業系統選擇。鎖讓程式設計師獲得一些控制權。通過給臨界區加鎖,可以保證臨界區內只有一個執行緒活躍。鎖將原本由作業系統排程的混亂狀態變得更為可控。

pthread 鎖

  • POSIX 庫將鎖稱為互斥量(mutex),因為它被用來提供執行緒之間的互斥。
  • 粗粒度方案:加一把大鎖,把整個都保護起來
  • 細粒度方案:用不同的鎖保護不同的資料和結構

實現一個鎖

先自我嘗試一下,改一下之前那個函式。

#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <stdlib.h>
#include <time.h>

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

// volatile 易變的、不穩定的意思
// 遇到這個關鍵字宣告的變數,編譯器對訪問該變數的程式碼就不再進行優化,從而可以提供對特殊地址的穩定訪問
static volatile int counter = 0;

void *mypthread(void *arg)
{
    printf("%s: begin\n", (char *)arg);
    int i;
    for (i = 0; i < 1e7; i++)
    {
        pthread_mutex_lock(&lock);
        counter += 1;
        pthread_mutex_unlock(&lock);
    }
    printf("%s: done\n", (char *)arg);
    return NULL;
}

int main(int argc, char *argv[])
{
    clock_t t1, t2;
    t1 = clock();
    pthread_t p1, p2;
    int rc;
    printf("main: begin (counter = %d)\n", counter);
    rc = pthread_create(&p1, NULL, mypthread, "A");
    assert(rc == 0);
    rc = pthread_create(&p2, NULL, mypthread, "B");
    assert(rc == 0);
    rc = pthread_join(p1, NULL);
    assert(rc == 0);
    rc = pthread_join(p2, NULL);
    assert(rc == 0);
    printf("main: done with both (counter = %d)\n", counter);
    t2 = clock();
    printf("%f\n ms", (double)(t2 - t1) / CLOCKS_PER_SEC);
    return 0;
}

加鎖耗時:

不加鎖耗時:

評價鎖

  1. 提供互斥(mutual exclusion)。有效阻止多個執行緒進入臨界區
  2. 公平性,每個競爭執行緒都有公平的機會搶到鎖,不能有競爭鎖餓死
  3. 效能,時間開銷怎麼評判。

控制中斷

進入臨界區的時候關閉中斷,結束都重新開啟,這樣既簡單又正確。缺點:

  • 可能被一個貪婪的程式濫用

  • 不支援多處理器,你關閉了這個處理器的中斷,但是其他處理器還能進入臨界區

  • 可能導致中斷丟失

  • 效率問題

Peterson 演算法:jyy 的舉起旗幟上廁所的例子

測試並設定指令(原子交換)

鎖最簡單的硬體支援是測試並設定指令(test-and-set instruction),也叫作原子交換(atomic exchange)。

第一次嘗試:

typedef struct lock_t { int flag; } lock_t; 
void init(lock_t *mutex) {
	// 0 -> lock is available, 1 -> held 
    mutex->flag = 0;
} 
void lock(lock_t *mutex) {
	while (mutex->flag == 1) // TEST the flag 
        ; // spin-wait (do nothing)
    mutex->flag = 1;
} 
void unlock(lock_t *mutex) {
	mutex->flag = 0;
}

正確性:不正確,比如第一個上鎖的時候,讀取到了沒有鎖,還沒上鎖,中斷了,然後第二個執行緒上了鎖,回到第一個執行緒還會以為沒有鎖,繼續上鎖。

效能:差,自旋(spin-waiting)

實現可用的自旋鎖

我們有一個想法是讓讀 flag 和寫 flag 變成原子操作,硬體有提供這種指令支援。

int TestAndSet(int *old_ptr, int new) {
	int old = *old_ptr; // fetch old value at old_ptr 
	*old_ptr = new; // store 'new' into old_ptr
    return old; }

最關鍵的是上面的程式碼是原子執行的。

typedef struct lock_t { int flag; } lock_t; 
void init(lock_t *mutex) {
	// 0 -> lock is available, 1 -> held 
    mutex->flag = 0;
} 
void lock(lock_t *mutex) {
    // 返回值是 1 說明鎖被別人持有,那我只能自旋
    // 返回為 0 說明沒有,那我就改成了我的 1
	while (TestAndSet(&lock->flag, 1) == 1)
        ; // spin-wait (do nothing)
} 
void unlock(lock_t *mutex) {
	mutex->flag = 0;
}

評價自旋鎖

  • 正確性:正確
  • 公平性:不公平,你瘋狂自旋,別人就餓死了
  • 效能:單 CPU 很大,因為都去旋了,沒人幹活,多 CPU 表現還行

比較並交換

比較並交換指令(SPARC系統中是compare-and-swap,x86 系統是compare-and-exchange)。

檢測 ptr 指向的值和 expected 是否相等;如果是,更新 ptr 為新值。返回該記憶體地址的實際值。

int CompareAndSwap(int *ptr, int expected, int new) {
	int actual = *ptr;
	if (actual == expected) 
        *ptr = new;
	return actual;
}

最後,你可能會發現,比較並交換指令比測試並設定更強大。當我們在將來簡單探討無等待同步(wait-free synchronization)時,會用到這條指令的強大之處。然而,如果只用它實現一個簡單的自旋鎖,它的行為等價於上面分析的自旋鎖。

連結的載入和條件式儲存指令

int LoadLinked(int *ptr) {
	return *ptr; 
} 
int StoreConditional(int *ptr, int value) { 
    if (no one has updated *ptr since the LoadLinked to this address) {
        *ptr = value; 
        return 1; // success! 
    } else { 
        return 0; // failed to update
    }
}

MIPS 架構[H93]中,連結的載入(load-linked)和條件式儲存(store-conditional)可以用來配合使用,實現其他併發結構。

void lock(lock_t *lock) {
	while (1) {
		while (LoadLinked(&lock->flag) == 1) 
            ; // spin until it's zero
		if (StoreConditional(&lock->flag, 1) == 1)
			return; // if set-it-to-1 was a success: all done 
        			// otherwise: try it all over again
	} 
}
void unlock(lock_t *lock) {
	lock->flag = 0;
}

獲取並增加

獲取並增加(fetch-and-add)指令,它能原子地返回特定地址的舊 值,並且讓該值自增一。

這個實現了一人一次的來。

int FetchAndAdd(int *ptr) {
	int old = *ptr; 
    *ptr = old + 1; 
    return old;
}
typedef struct lock_t { 
    int ticket;
	int turn; 
} lock_t;
void lock_init(lock_t *lock) { 
    lock->ticket = 0; 
    lock->turn = 0;
}
void lock(lock_t *lock) {
	int myturn = FetchAndAdd(&lock->ticket);
	while (lock->turn != myturn) 
        ; // spin
}
void unlock(lock_t *lock) { 
	FetchAndAdd(&lock->turn);
}

自旋過多

如果有 N 個執行緒去競爭一個鎖,情 況會更糟糕。同樣的場景下,會浪費 N−1 個時間片,只是自旋並等待一個執行緒釋放該鎖。

簡單方法:讓出來吧,寶貝

void init() {
	flag = 0; 
} 
void lock() {
	while (TestAndSet(&flag, 1) == 1) 
		yield(); // give up the CPU
}
void unlock() {
	flag = 0;
}

時間消耗還是很多,因為會頻繁的進行上下文切換。而且還有可能出現餓死的現象。

使用佇列:休眠代替自旋

  • park()能夠讓呼叫執行緒休眠
  • unpark(threadID)則會喚醒 threadID 標識的執行緒
  • setpark(),執行緒表明自己馬上要 park。
typedef struct lock_t {
	int flag; 
    int guard; 
    queue_t *q;
} lock_t;
void lock_init(lock_t *m) { 
    m->flag = 0; 
    m->guard = 0;
	queue_init(m->q);
} 
void lock(lock_t *m) { 
    /* 如果有執行緒正在嘗試加鎖(guard == 1),那麼要阻塞 */
    // TestAndSet(&m->guard, 1) 如果 guard 是 0 會被設定為 1,只迴圈一次
    // 表明我要加鎖了,可能成功也可能不成功,等我操作完了之後把 guard 設為 0
    // guard 保證 queue 操作的原子性
	while (TestAndSet(&m->guard, 1) == 1)
        ; //acquire guard lock by spinning
    if (m->flag == 0) {
        /* 當前沒有執行緒獲得鎖 */
		m->flag = 1; // lock is acquired 
        m->guard = 0;
	} else {
        /* 已有執行緒獲得鎖,將此執行緒的ID號加入到等待佇列中,並休眠 */
		queue_add(m->q, gettid()); 
        // setpark(),如果剛好另一個執行緒被排程,並且呼叫了 unpark
        // 那麼後續的 park 呼叫就會直接返回,而不是一直睡眠。
        setpark(); // new code
        // 肯定要先把佇列讓出來,不然你睡覺去了佇列別人訪問不了,就死鎖了
        // 可是讓出來有一個問題,你讓出來了,還沒 park ,哦吼,中斷了
        // 然後那個執行緒 unpark 了佇列裡的這個執行緒,回到這個執行緒,你又繼續執行 park
        // 此時 這個執行緒已經不在佇列裡了,完了,芭比Q了,park 了沒人叫醒,只能一直睡下去
        // 所以一定需要 setpark ,
        m->guard = 0; 
        park();
	}
} 
void unlock(lock_t *m) {
    /* 如果有執行緒正在嘗試加鎖(guard == 1),那麼要阻塞 */
	while (TestAndSet(&m->guard, 1) == 1)
        ; //acquire guard lock by spinning 
    if (queue_empty(m->q)) 
        m->flag = 0; // let go of lock; no one wants it 
    else
        /* 當前佇列中有執行緒想要獲得鎖,所以喚醒一個執行緒即可 */
        /* 這裡無需做鎖的釋放操作,原因是park()API的使用特性*/
        unpark(queue_remove(m->q)); // hold lock (for next thread!) 
    m->guard = 0;
}

不同作業系統,不同實現

Linux 提供了 futex,它類似於 Solaris 的介面,但提供了更多核心功能。具體來說,每個 futex 都關聯一個特定的實體記憶體位置,也有一個事先建好的核心佇列。呼叫者通過 futex 呼叫(見下面的描述)來睡眠或者喚醒。

具體來說,有兩個呼叫。呼叫 futex_wait(address, expected)時,如果 address 處的值等於 expected,就會讓調執行緒睡眠。否則,呼叫立刻返回。呼叫 futex_wake(address)喚醒等待佇列中的一個執行緒。

兩階段鎖

第一階段會先自旋一段時間,希望能獲取鎖

第一階段沒有獲取鎖,那麼第二階段呼叫者會睡眠,直到鎖可用。