記憶體管理十 linux核心併發與同步機制
一、臨界資源:
臨界區是指訪問或操作共享資源的程式碼段,這些資源無法同時被多個執行執行緒訪問,為了避免臨界區的併發
訪問,需要保證臨界區的原子性,臨界區不能有多個併發源同時執行,原子性保護的是資源和資料,包括靜態區域性
變數、全域性變數、共享的資料結構、Buffer快取等各種資源資料,產生併發訪問的併發源主要有如下:
- 中斷和異常:中斷髮生後,中斷執行程式和被中斷的程序之間可能產生併發訪問;
- 軟中斷和tasklet:軟中斷或者tasklet隨時可能會被排程執行,從而打斷當前正在執行的程序上下文;
- 核心搶佔:程序排程器支援搶佔特性,會導致程序和程序之間的併發訪問;
- 多處理器併發執行:多個處理器的上可以同時執行多個相同或者不同的程序。
二、同步機制:
上面已經介紹了臨界資源的相關內容,linux核心中的同步機制就是為了保證臨界資源不產生併發訪問的處理
方法,當然核心中針對不同的場景有不同的同步機制,如:原子操作、自旋鎖、訊號量、Mutex互斥體、讀寫鎖、
RCU鎖等機制一面會一一介紹,並比較各種機制之間的差別。
1、原子操作:
(1)原子變數操作:
原子操作是指保證指令以原子的方式執行,執行的過程中不會被打斷。linux核心提供了atomic_t型別的原子
變數,變數的定義如下:
typedef struct { int counter; } atomic_t;
原子變數的常見的操作介面和用法如下:
#define ATOMIC_INIT(i) { (i) } //定義一個原子變數並初始化為i #define atomic_read(v) READ_ONCE((v)->counter) //讀取原子變數的值 static inline void atomic_add(int i, atomic_t *v) //原子變數v增加i static inline void atomic_sub(int i, atomic_t *v) //原子變數v減i static inline void atomic_inc(atomic_t *v) //原子變數值加1 static inline void atomic_dec(atomic_t *v) //原子變數值減1 ......
atomic_t use_cnt;
atomic_set(&use_cnt, 2);
atomic_add(4, &use_cnt);
atomic_inc(use_cnt);
(2)原子位操作:
在編寫程式碼時,有時會使用到對某一個暫存器或者變數設定某一位的操作,可以使用如下的介面:
unsigned long word = 0;
set_bit(0, &word); /*第0位被設定*/
set_bit(1, &word); /*第1位被設定*/
clear_bit(1, &word); /*第1位被清空*/
change_bit(0, &word); /*翻轉第0位*/
2、自旋鎖spin_lock:
(1)自旋鎖的特點如下:
a、忙等待、不允許睡眠、快速執行完成,可用於中斷服務程式中;
b、自旋鎖可以保證臨界區不受別的CPU和本CPU的搶佔程序打擾;
c、如果A執行單元首先獲得鎖,那麼當B進入同一個例程時將獲知自旋鎖已被持有,需等待A釋放後才能進入,
所以B只好原地打轉(自旋);
d、自旋鎖鎖定期間不能呼叫可能引起程序排程的函式,如:copy_from_user(),copy_to_user(), kmalloc(),msleep();
(2)自旋鎖的操作介面:
//定義於#include<linux/spinlock_types.h>
spinlock_t lock; //定義自旋鎖
spin_lock_init(&lock); //初始化自旋鎖
spin_lock(&lock); //如不能獲得鎖,原地打轉。
spin_trylock(&lock);//嘗試獲得,如能獲得鎖返回真,不能獲得返回假,不再原地打轉。
spin_unlock(&lock); //與spin_lock()和spin_trylock()配對使用。
spin_lock_irq()
spin_unlock_irq()
spin_lock_irqsave()
spin_unlock_irqrestore()
spin_lock_bh()
spin_unlock_bh()
(3)使用舉例:
spinlock_t lock; //定義自旋鎖 --全域性變數
spin_lock_init(&lock); //初始化自旋鎖 --初始化函式中
spin_lock(&lock); // 獲取自旋鎖 --成對在操作前後使用
//臨界區......
spin_unlock(&lock) //釋放自旋鎖
3、訊號量:
(1)訊號量的特點:
a、睡眠等待、可以睡眠、可以執行耗時長的程序;
b、共享資源允許被多個不同的程序同時訪問;
c、訊號量常用於暫時無法獲取的共享資源,如果獲取失敗則程序進入不可中斷的睡眠狀態,只能由釋放資源的程序來喚醒;
d、訊號量不能在中斷服務程式中使用,因為中斷服務程式是不允許程序睡眠的;
(2)訊號量的使用:
struct semaphore {
atomic_t count; //共享計數值
int sleepers; //等待當前訊號量進入睡眠的程序個數
wait_queue_head_t wait; // wait是當前訊號量的等待佇列
};
//count用於判斷是否可以獲取該訊號量:
//count大於0說明可以獲取訊號量;
//count小於等於0,不可以獲取訊號量;
操作介面如下:
static inline void down(struct semaphore * sem)
//獲取訊號量,獲取失敗則進入睡眠狀態
static inline void up(struct semaphore * sem)
//釋放訊號量,並喚醒等待佇列中的第一個程序
int down_interruptible(struct semaphore * sem);
// down_interruptible能被訊號打斷;
int down_trylock(struct semaphore * sem);
//該函式嘗試獲得訊號量sem,如果能夠立刻獲得,它就獲得該訊號量並返回0,否則,返回非0值。
如:
down(sem);
...臨界區...
up(sem);
4、mutex_lock互斥鎖:
互斥鎖主要用於實現核心中的互斥訪問功能。核心互斥鎖是在原子API之上實現的,但這對於核心使用者是不可見的。
對它的訪問必須遵循一些規則:同一時間只能有一個任務持有互斥鎖,而且只有這個任務可以對互斥鎖進行解鎖。互斥鎖
不能進行遞迴鎖定或解鎖。一個互斥鎖物件必須通過其API初始化,而不能使用memset或複製初始化。一個任務在持有互
斥鎖的時候是不能結束的。互斥鎖所使用的記憶體區域是不能被釋放的。使用中的互斥鎖是不能被重新初始化的。並且互斥
鎖不能用於中斷上下文。但是互斥鎖比當前的核心訊號量選項更快,並且更加緊湊。
(1)互斥體禁止多個執行緒同時進入受保護的程式碼“臨界區”(critical section),因此,在任意時刻,只有一個執行緒被允許
進入這樣的程式碼保護區。mutex實際上是count=1情況下的semaphore。
struct mutex {
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
struct task_struct *owner;
......
};
結構體成員說明:
atomic_t count;指示互斥鎖的狀態:
1 沒有上鎖,可以獲得;0 被鎖定,不能獲得,初始化為沒有上鎖;
spinlock_t wait_lock;
等待獲取互斥鎖中使用的自旋鎖,在獲取互斥鎖的過程中,操作會在自旋鎖的保護中進行,
初始化為為鎖定。
struct list_head wait_list;
等待互斥鎖的程序佇列。
(2)mutex的使用:
a、初始化
mutex_init(&mutex); //動態初始化互斥鎖
DEFINE_MUTEX(mutexname); //靜態定義和初始化互斥鎖
b、上鎖
void mutex_lock(struct mutex *lock);
//無法獲得鎖時,睡眠等待,不會被訊號中斷。
int mutex_trylock(struct mutex *lock);
//此函式是 mutex_lock()的非阻塞版本,成功返回1,失敗返回0
int mutex_lock_interruptible(struct mutex *lock);
//和mutex_lock()一樣,也是獲取互斥鎖。在獲得了互斥鎖或進入睡眠直
//到獲得互斥鎖之後會返回0。如果在等待獲取鎖的時候進入睡眠狀態收到一
//個訊號(被訊號打斷睡眠),則返回_EINIR。
c、解鎖
void mutex_unlock(struct mutex *lock);
5、讀寫鎖:
讀寫鎖實際是一種特殊的自旋鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,
寫者則需要對共享資源進行寫操作。這種鎖相對於自旋鎖而言,能提高併發性,因為在多處理器系統中,它允許同
時有多個讀者來訪問共享資源,最大可能的讀者數為實際的邏輯CPU數。寫者是排他性的,一個讀寫鎖同時只能有
一個寫者或多個讀者(與CPU數相關),但不能同時既有讀者又有寫者。
如果讀寫鎖當前沒有讀者,也沒有寫者,那麼寫者可以立刻獲得讀寫鎖,否則它必須自旋在那裡,直到沒有任
何寫者或讀者。如果讀寫鎖沒有寫者,那麼讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那裡,直到寫者釋放
該讀寫鎖。
(1)操作函式介面:
rwlock_init(x)
DEFINE_RWLOCK(x)
read_trylock(lock)
write_trylock(lock)
read_lock(lock)
write_lock(lock)
read_unlock(lock)
write_unlock(lock)
(2)總結:
A、讀寫鎖本質上就是一個計數器,初始化值為0x01000000,表示最多可以有0x01000000個讀者同時獲取讀鎖;
B、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1,若結果符號位為0時,獲取讀鎖成功;
C、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1。若結果符號位為1時,獲取讀鎖失敗,表示此時讀寫鎖被寫者
佔有,此時呼叫__read_lock_failed失敗處理函式,迴圈測試rw+1的值,直到結果的值大於等於1;
D、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果為0時,獲取寫鎖成功;
E、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果不為0時,獲取寫鎖失敗,
表示此時有讀者佔有讀寫鎖或有寫著佔有讀寫鎖,此時呼叫__write_lock_failed失敗處理函式,迴圈測試rw+0x01000000,
直到結果的值等於0x01000000;
F、通過對讀寫鎖的原始碼分析,可以看出讀寫鎖其實是帶計數的特殊自旋鎖,能同時被多個讀者佔有或一個寫者佔有,
但不能同時被讀者和寫者佔有。
6、讀寫訊號量:
(1)特點:
a、同一時刻最多有一個寫者(writer)獲得鎖;
b、同一時刻可以有多個讀者(reader)獲得鎖;
c、同一時刻寫者和讀者不能同時獲得鎖;
(2)相關結構和函式介面:
struct rw_semaphore {
/*讀/寫訊號量定義:
* - 如果activity為0,那麼沒有啟用的讀者或寫者。
* - 如果activity為+ve,那麼將有ve個啟用的讀者。
* - 如果activity為-1,那麼將有1個啟用的寫者。 */
__s32 activity; /*訊號量值*/
spinlock_t wait_lock; /*用於鎖等待佇列wait_list*/
struct list_head wait_list; /*如果非空,表示有程序等待該訊號量*/
};
void init_rwsem(struct rw_semaphore* rw_sem); //初始化讀寫訊號量
void down_read(struct rw_semaphore* rw_sem); //獲取讀訊號量
int down_read_trylock(struct rw_semaphore* rw_sem); //嘗試獲取讀訊號量
void up_read(struct rw_semaphore* rw_sem);
void down_write(struct rw_semaphore* rw_sem); //獲取寫訊號量
int down_write_trylock(struct rw_semaphore* rw_sem);//嘗試獲取寫訊號量
void up_write(struct rw_semaphore* rw_sem);
(3)使用方法:
rw_semaphore sem;
init_rwsem(&sem);
down_read(&sem);
...臨界區...
up_read(&sem);
down_write(&sem);
...臨界區...
up_write(&sem);
三、總結與拓展
1、針對上面的各種同步機制直接的差異,可以如下表格清晰的表明:
2、在解決穩定性相關的問題時,難免會出現一些死鎖的問題,可以新增一些debug巨集來配合除錯:
CONFIG_LOCK_STAT=y
CONFIG_DEBUG_LOCKDEP=y
CONFIG_PROVE_LOCKING=y
CONFIG_DEBUG_MUTEXES=y
CONFIG_DEBUG_LOCK_ALLOC=y
CONFIG_RWSEM_SPIN_ON_OWNER=y
如下有一個簡單的死鎖的例程:
static DEFINE_SPINLOCK(hack_spinA);
static DEFINE_SPINLOCK(hack_spinB);
void hack_spinBA(void)
{
printk("%s(),hack A and B\n",__FUNCTION__);
spin_lock(&hack_spinA);
spin_lock(&hack_spinB);
}
void hack_spinAB(void)
{
printk("%s(),hack A \n",__FUNCTION__);
spin_lock(&hack_spinA);
spin_lock(&hack_spinB);
}
static ssize_t hello_test_show(struct device *dev, struct device_attribute *attr, char *buf)
{
printk("%s() \n",__FUNCTION__);
hack_spinBA();
return 0;
}
static ssize_t hello_test_store(struct device *dev, struct device_attribute *attr, const char *buf, size_t count)
{
int test;
printk("hello_test %s,%d\n",__FUNCTION__,__LINE__);
test = 4;
hack_spinAB();
return printk("%s() test = %d\n",__FUNCTION__,test);
}
static DEVICE_ATTR(hello_test, 0664, hello_test_show, hello_test_store);
static int hello_test_probe(struct platform_device *pdev)
{
printk("%s()\n",__FUNCTION__);
device_create_file(&pdev->dev, &dev_attr_hello_test);
return 0;
}
上面的例子中,先cat對應的節點會先後拿住hack_spinA和hack_spinB的兩把鎖,但並沒有去釋放這兩把鎖,
所以再對應的相應節點做echo操作時,會出現一直無法拿住這把鎖,等待30S後會觸發HWT的重啟,看重啟的DB
檔案可以發現:
/***********在58S時通過cat 執行到了hello_test_show函式,從而拿住了鎖********/
[ 58.453863] (1)[2577:cat]Dump cpuinfo
[ 58.456057] (1)[2577:cat]hello_test_show()
[ 58.456093] (1)[2577:cat]hack_spinBA(),hack A and B
[ 58.457426] (1)[2577:cat]note: cat[2577] exited with preempt_count 2
[ 58.458884] (1)[2577:cat]
[ 58.458920] (1)[2577:cat]=====================================
[ 58.458931] (1)[2577:cat][ BUG: cat/2577 still has locks held! ]
[ 58.458944] (1)[2577:cat]4.4.146+ #5 Tainted: G W O
[ 58.458955] (1)[2577:cat]-------------------------------------
[ 58.458965] (1)[2577:cat]lockdep: [Caution!] cat/2577 is runable state
[ 58.458976] (1)[2577:cat]lockdep: 2 locks held by cat/2577:
[ 58.458988] #0: (hack_spinA){......}
[ 58.459015] lockdep: , at: (1)[2577:cat][<c0845e8c>] hack_spinBA+0x24/0x3c
[ 58.459049] #1: (hack_spinB){......}
[ 58.459074] lockdep: , at: (1)[2577:cat][<c0845e94>] hack_spinBA+0x2c/0x3c
[ 58.459098] (1)[2577:cat]
/**********在64S是通過echo指令呼叫到hello_test_store再次去拿鎖,導致死鎖******/
[ 64.083878] (2)[1906:sh]hello_test hello_test_store,41
[ 64.083914] (2)[1906:sh]hack_spinAB(),hack A
/**********在94S時出現 HWT 重啟***************/
[ 94.092084] -(2)[1906:sh][<c0836a40>] (aee_wdt_atf_info) from [<c0837428>] (aee_wdt_atf_entry+0x180/0x1b8)
[ 94.093571] -(2)[1906:sh][<c08372a8>] (aee_wdt_atf_entry) from [<c0152bfc>] (preempt_count_sub+0xe4/0x100)
[ 94.095055] -(2)[1906:sh][<c019694c>] (do_raw_spin_lock) from [<c0c6f5f8>] (_raw_spin_lock+0x48/0x50)
[ 94.096548] -(2)[1906:sh][<c0c6f5b0>] (_raw_spin_lock) from [<c0845ef4>] (hack_spinAB+0x24/0x3c)
[ 94.097666] -(2)[1906:sh][<c0845ed0>] (hack_spinAB) from [<c0845f30>] (hello_test_store+0x24/0x44)
[ 94.098785] -(2)[1906:sh][<c0845f0c>] (hello_test_store) from [<c04c56cc>] (dev_attr_store+0x20/0x2c)
再去看CPU的喂狗資訊:kick=0x0000000b,check=0x0000000f,
可以發現是因為CPU2死鎖導致沒有及時喂狗,所以觸發HWT重啟,看CPU2的堆疊也是掛載sh的程序上:
cpu#2: Online
.nr_running : 5
.load
runnable tasks:
task PID tree-key switches prio wait-time sum-exec sum-sleep
---------------------------------------------------
DispSync 470 0.000000 1054 97 0.000000 157.709688 0.000000 /
kworker/2:2 1083 56925.416956 264 120 119.050538 21.742080 35541.090392 /
R sh 1906 84650.355638 115 120 14.202615 28671.890146 21283.409357 /
作者:frank_zyp
您的支援是對博主最大的鼓勵,感謝您的認真閱讀。
本文無所謂版權,歡迎轉載。