1. 程式人生 > >Linux下robust互斥鎖實現

Linux下robust互斥鎖實現

ont UNC data 結束 推出 art 原子性操作 class null

一、robust互斥鎖
這種類型的鎖可能不是POSIX標準中規定的鎖,但是也有可能是,這個不太確定,暫時不管。這種類型的鎖主要是解決當一個持有互斥鎖的線程退出之後這個鎖成為不可用狀態的一個問題來的。可以想象,一個線程可能持有很多的鎖,也可能沒有,如果此時有一個外部(被其它任務kill)或者內部(出現訪問問題或者直接pthread_exit)原因而推出線程,此時雖然這個線程退出是一了百了了,但是其它的線程還是希望能夠堅強的運行下去,此時就需要有一些機制來保證這個線程持有的那些希望被自動釋放的鎖有一種方法能夠被釋放,而這個工作就責無旁貸的落在了內核的身上。
從具體實現上來看,它打破了我們常規的“用戶態VS內核態”的模式,模糊了用戶和內核的概念,因為內核會遍歷用戶態提供的鏈表,這個是一個危險的操作,因為這個鏈表對內核來說是可能出現環路的,內存訪問錯誤倒是小事。在魔獸中,獸族的斧頭兵在沖鋒陷陣的時候喊得口號是“為了部落”,而內核的程序員在寫程序的時候喊的口號一定是“為了效率”。
二、信號類型的聲明
這些鎖的類型一般中間有一個ROBUST關鍵字,如果同學們有glibc的代碼,看一下glibc-2.7\nptl\pthreadP.h文件中那些ROBUST類型的類型聲明就可以知道它們的家族了,和普通的鎖類型相同,也有一些ERROR_CHECK類型的互斥鎖,當然可能不是POSIX標準,如果對跨平臺要求比較嚴格,請慎用。
這個robust屬性的設置需要使用專門的接口pthread_mutexattr_setrobust_np函數來設置,對於這些NP後綴的,我一直懷疑他們是NonPosix的縮寫,未經證實,但是也有可能是POSIX的某個版本之後追加的一些性能也不好說。
三、向內核註冊鏈表首地址

內核為了實現這種鎖,專門提供了兩個系統調用,分別是sys_set_robust_list和sys_get_robust_list,由於或者對於我們這次分析沒有什麽影響,所以我們只看一下註冊的接口:
asmlinkage long
sys_set_robust_list(struct robust_list_head __user *head,
size_t len)
{
/*
* The kernel knows only one size for now:
*/
if (unlikely(len != sizeof(*head)))
return -EINVAL;

current->robust_list = head;內核還專門為每個線程的控制結構中添加了一個robust_list成員來記錄用戶態註冊的鏈表頭,事實上他是一個robust_list_head類型的結構

return 0;
}
四、用戶態註冊
1、相關結構的分配與初始化
glibc-2.7\nptl\allocatestack.c
allocate_stack (const struct pthread_attr *attr, struct pthread **pdp, ALLOCATE_STACK_PARMS)
/* The robust mutex lists also need to be initialized
unconditionally because the cleanup for the previous stack owner
might have happened in the kernel. */
pd->robust_head.futex_offset = (offsetof (pthread_mutex_t, __data.__lock)
- offsetof (pthread_mutex_t,
__data.__list.__next));這個偏移量是編譯時確定的,它的值是pthread_mutex_t結構中__lock成員和__list成員之間的相對字節偏移處,之前說過,內核是需要遍歷用戶態鏈表的,根據最為原始的鏈表(就是我們上任何一個數據結構中都會提到的)就是一個 struct node {struct node *next; type var;}類型的結構,但是內核說為了給用戶態任務更大的自由度,內核不能假設這個__lock就是這麽正正經經的緊鄰著鏈表成員,所以提供了一個offset,這樣兩者之間可以有更多的控制結構,事實上C庫也就是這麽實現的,它們的確不是連續的。有人問,問什麽這麽巧呢?因為C庫中相關功能的開發是和內核同步的,可以說是C庫的需求促使了內核該功能的添加,是一個訂制功能
pd->robust_head.list_op_pending = NULL;
#ifdef __PTHREAD_MUTEX_HAVE_PREV
pd->robust_prev = &pd->robust_head;
#endif
pd->robust_head.list = &pd->robust_head;初始情況下下,鏈表自回環,為空
2、向內核註冊
glibc-2.7\nptl\pthread_create.c
start_thread (void *arg)
#ifdef __NR_set_robust_list
# ifndef __ASSUME_SET_ROBUST_LIST
if (__set_robust_list_avail >= 0)
# endif
{
INTERNAL_SYSCALL_DECL (err);
/* This call should never fail because the initial call in init.c
succeeded. */
INTERNAL_SYSCALL (set_robust_list, err, 2, &pd->robust_head,這個接口沒什麽好說的,和內核接口直接對應,用戶態和內核態共享結構
sizeof (struct robust_list_head));
}
#endif
五、用戶態如何獲得ROBUST鎖
1、robust鎖的獲取操作
對於robust鎖,內核和用戶態約定了這個__lock的意義,它的低30bits用來記錄當前鎖持有者的線程id號,剩余高2bits記錄FUTEX_WAITERS和FUTEX_OWNER_DIED標誌bit。
glibc-2.7\nptl\pthread_mutex_lock.c中__pthread_mutex_lock (mutex)函數

THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,首先設置內核和用戶態共享的list_op_pending結構,註意的是,這是一個指針,而不是鏈表,它只有一個元素,這和前面說的 robust_list list成員的意義不同,至於為什麽有這個成員,我們之後再解釋
&mutex->__data.__list.__next);

oldval = mutex->__data.__lock;
do
{
again:
if ((oldval & FUTEX_OWNER_DIED) != 0)首先判斷是不是持有者已經在沒有釋放鎖的情況下掛掉了
{
/* The previous owner died. Try locking the mutex. */
int newval = id;
#ifdef NO_INCR
newval |= FUTEX_WAITERS;
#else
newval |= (oldval & FUTEX_WAITERS);
#endif

newval
= atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
newval, oldval);嘗試原子性修改鎖值,這個值內核和用戶態共享各個bit的意義

if (newval != oldval)
{
oldval = newval;
goto again;原子操作失敗,重試
}

/* We got the mutex. */
mutex->__data.__count = 1;
/* But it is inconsistent unless marked otherwise. */
mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;持有者設置為不一致,因為這是從死亡者裏拿到的東西,相當於是“腐屍”

ENQUEUE_MUTEX (mutex);這個操作比較重要,它會把這個鎖太添加到 robust_list list鏈表的開始,表明自己已經獲得了這個鎖,實現比較簡單,這裏省略
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);清空op_pending列表,這個列表在執行鎖操作之前初始化

/* Note that we deliberately exit here. If we fall
through to the end of the function __nusers would be
incremented which is not correct because the old
owner has to be discounted. If we are not supposed
to increment __nusers we actually have to decrement
it here. */
#ifdef NO_INCR
--mutex->__data.__nusers;
#endif

return EOWNERDEAD;這裏雖然獲得了鎖,但是低調的返回EOWNERDEAD,反過來說,雖然返回值為EOWNERDEAD,但是鎖還是已經獲得了
}
………………
oldval = LLL_ROBUST_MUTEX_LOCK (mutex, id);這個操作本質上也是來修改__lock結構為當前線程的tid(其中的id就是線程的tid),只是它做了更多的功能,那就是判斷當獲得鎖失敗的時候貼心的將線程掛起。反過來說,當一個線程因為OWNERDEAD被喚醒之後,它同樣會從這裏返回,並執行這個while大循環,判斷出ownerdead標誌位

if (__builtin_expect (mutex->__data.__owner
== PTHREAD_MUTEX_NOTRECOVERABLE, 0))如果pthread_mutex_lock返回EOWNERDEAD而調用者沒有執行pthread_mutex_consistent_np函數,那麽將會滿足該條件,這下可能真的是完蛋去了
{
/* This mutex is now not recoverable. */
mutex->__data.__count = 0;
lll_unlock (mutex->__data.__lock,
PTHREAD_ROBUST_MUTEX_PSHARED (mutex));
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
return ENOTRECOVERABLE;
}
}
while ((oldval & FUTEX_OWNER_DIED) != 0);

mutex->__data.__count = 1;
ENQUEUE_MUTEX (mutex);
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
break;
2、list_op_pending的意義
前面說過,這個不是鏈表,而只是一個簡單的指針,它一次只指向一個元素。為什麽要這個結構呢?因為這個操作是極為特殊的,特殊之處在於它獲得一個鎖需要執行兩個操作:
1、一個是設置好__lock為自己的tid
2、將這個mutex結構添加到內核識別的robust_list鏈表中去。
這兩個操作明顯不是原子性的,假設在第一步執行結束之後,任務被殺死,那麽線程退出的時候這個鎖是沒有辦法被喚醒的,雖然它事實上已經獲得了這個鎖,但是由於沒有註冊到內核識別的鏈表中而無法被喚醒,這樣就有可能出現悲劇。通常解決原子性操作的方法就是加鎖,但是這裏是鎖本身,所以沒有辦法依賴鎖,只能添加一個可能會競爭的op_pending鏈表,從而讓內核這個可以關中斷並處理鎖的機制來完成這個原子性。這和我們解決問題的模式是一樣的,如果這層沒有辦法做,那麽就向下推一層。
六、內核態喚醒
1、喚醒時機
do_exit--->>>exit_robust_list
if (pending) 如果pending鏈表非空,那麽表示“可能”存在一個鎖,所以嘗試進行喚醒
handle_futex_death((void __user *)pending + futex_offset, curr, pip);

while (entry != &head->list) {這個循環是真正變臉head結構鏈表,其中的每一項都是貨真價實的等待被喚醒的robust鎖,當然裏面還有一個比較特殊的entry!=pending判斷,著同樣是用戶態院子操作的保證,有興趣的同學可以分析一下在用戶態哪個位置切換會出現entry==pending的情況(請大家不要懷疑我是不是知道)
/*
* A pending lock might already be on the list, so
* don‘t process it twice:
*/
if (entry != pending)
if (handle_futex_death((void __user *)entry + futex_offset,
curr, pi))
return;
/*
* Fetch the next entry in the list:
*/
if (fetch_robust_entry(&entry, &entry->next, &pi))
return;
/*
* Avoid excessively long or circular lists:
*/
if (!--limit)這裏最為高效的防止出現循環鏈表的方法,就是假設鏈表最多為 ROBUST_LIST_LIMIT 2048項
break;

cond_resched();
}
2、喚醒函數

/*
* Process a futex-list entry, check whether it‘s owned by the
* dying task, and do notification if so:
*/
int handle_futex_death(u32 __user *uaddr, struct task_struct *curr, int pi)
{
u32 uval, nval, mval;

retry:
if (get_user(uval, uaddr))
return -1;

if ((uval & FUTEX_TID_MASK) == curr->pid) {如果退出線程持有該鎖
/*
* Ok, this dying thread is truly holding a futex
* of interest. Set the OWNER_DIED bit atomically
* via cmpxchg, and if the value had FUTEX_WAITERS
* set, wake up a waiter (if any). (We have to do a
* futex_wake() even if OWNER_DIED is already set -
* to handle the rare but possible case of recursive
* thread-death.) The rest of the cleanup is done in
* userspace.
*/
mval = (uval & FUTEX_WAITERS) | FUTEX_OWNER_DIED;設置用戶態最為關心的OWNER_DIED標誌
nval = futex_atomic_cmpxchg_inatomic(uaddr, uval, mval);

if (nval == -EFAULT)
return -1;

if (nval != uval)
goto retry;

/*
* Wake robust non-PI futexes here. The wakeup of
* PI futexes happens in exit_pi_state():
*/
if (!pi) {
if (uval & FUTEX_WAITERS)如果該鎖上有等待者
futex_wake(uaddr, 1);這裏註意的是此處只會喚醒一個,因為喚醒多個只會讓用戶態更亂,沒有意義
}
}
return 0;
}
七、pthread_mutex_consistent_np函數的意義
int
pthread_mutex_consistent_np (mutex)
pthread_mutex_t *mutex;
{
/* Test whether this is a robust mutex with a dead owner. */
if ((mutex->__data.__kind & PTHREAD_MUTEX_ROBUST_NORMAL_NP) == 0
|| mutex->__data.__owner != PTHREAD_MUTEX_INCONSISTENT)
return EINVAL;

mutex->__data.__owner = THREAD_GETMEM (THREAD_SELF, tid);

return 0;
}
如果不執行,在__pthread_mutex_lock函數返回EOWNERDEAD之後,鎖的__owner將被設置為PTHREAD_MUTEX_INCONSISTENT
mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;
然後在持有者執行pthread_mutex_unlock的時候
/* If the previous owner died and the caller did not succeed in
making the state consistent, mark the mutex as unrecoverable
and make all waiters. */
if (__builtin_expect (mutex->__data.__owner
== PTHREAD_MUTEX_INCONSISTENT, 0))
notrecoverable:
newowner = PTHREAD_MUTEX_NOTRECOVERABLE;
而其它線程再次執行pthread_mutex_lock的時候,
if (__builtin_expect (mutex->__data.__owner
== PTHREAD_MUTEX_NOTRECOVERABLE, 0))
{
/* This mutex is now not recoverable. */
mutex->__data.__count = 0;
lll_unlock (mutex->__data.__lock,
PTHREAD_ROBUST_MUTEX_PSHARED (mutex));
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
return ENOTRECOVERABLE;
}
可見,從原始的INCONSISTENT轉換為NOTRECOVERABLE還是一個比較嚴重的問題,開始只是“不一致”,最後就“無可挽回”了。

Linux下robust互斥鎖實現