1. 程式人生 > >記憶體管理十 linux核心併發與同步機制

記憶體管理十 linux核心併發與同步機制

一、臨界資源:

  臨界區是指訪問或操作共享資源的程式碼段,這些資源無法同時被多個執行執行緒訪問,為了避免臨界區的併發

訪問,需要保證臨界區的原子性,臨界區不能有多個併發源同時執行,原子性保護的是資源和資料,包括靜態區域性

變數、全域性變數、共享的資料結構、Buffer快取等各種資源資料,產生併發訪問的併發源主要有如下:

  • 中斷和異常:中斷髮生後,中斷執行程式和被中斷的程序之間可能產生併發訪問;
  • 軟中斷和tasklet:軟中斷或者tasklet隨時可能會被排程執行,從而打斷當前正在執行的程序上下文;
  • 核心搶佔:程序排程器支援搶佔特性,會導致程序和程序之間的併發訪問;
  • 多處理器併發執行:多個處理器的上可以同時執行多個相同或者不同的程序。

二、同步機制:

  上面已經介紹了臨界資源的相關內容,linux核心中的同步機制就是為了保證臨界資源不產生併發訪問的處理

方法,當然核心中針對不同的場景有不同的同步機制,如:原子操作、自旋鎖、訊號量、Mutex互斥體、讀寫鎖、

RCU鎖等機制一面會一一介紹,並比較各種機制之間的差別。

1、原子操作:

(1)原子變數操作:

  原子操作是指保證指令以原子的方式執行,執行的過程中不會被打斷。linux核心提供了atomic_t型別的原子

變數,變數的定義如下:

typedef struct {
	int counter;
} atomic_t;

  原子變數的常見的操作介面和用法如下:

#define ATOMIC_INIT(i)	{ (i) }  //定義一個原子變數並初始化為i
#define atomic_read(v)	READ_ONCE((v)->counter)  //讀取原子變數的值
static inline void atomic_add(int i, atomic_t *v)  //原子變數v增加i
static inline void atomic_sub(int i, atomic_t *v)  //原子變數v減i
static inline void atomic_inc(atomic_t *v)  //原子變數值加1
static inline void atomic_dec(atomic_t *v)  //原子變數值減1
......
atomic_t use_cnt;
atomic_set(&use_cnt, 2);
atomic_add(4, &use_cnt);
atomic_inc(use_cnt);

(2)原子位操作:

  在編寫程式碼時,有時會使用到對某一個暫存器或者變數設定某一位的操作,可以使用如下的介面:

unsigned long word = 0;
set_bit(0, &word); /*第0位被設定*/
set_bit(1, &word); /*第1位被設定*/
clear_bit(1, &word); /*第1位被清空*/
change_bit(0, &word); /*翻轉第0位*/

2、自旋鎖spin_lock:

(1)自旋鎖的特點如下:

a、忙等待、不允許睡眠、快速執行完成,可用於中斷服務程式中;

b、自旋鎖可以保證臨界區不受別的CPU和本CPU的搶佔程序打擾;

c、如果A執行單元首先獲得鎖,那麼當B進入同一個例程時將獲知自旋鎖已被持有,需等待A釋放後才能進入,

  所以B只好原地打轉(自旋);

d、自旋鎖鎖定期間不能呼叫可能引起程序排程的函式,如:copy_from_user(),copy_to_user(), kmalloc(),msleep();

(2)自旋鎖的操作介面:

//定義於#include<linux/spinlock_types.h>
spinlock_t lock;    //定義自旋鎖
spin_lock_init(&lock);    //初始化自旋鎖
spin_lock(&lock);    //如不能獲得鎖,原地打轉。
spin_trylock(&lock);//嘗試獲得,如能獲得鎖返回真,不能獲得返回假,不再原地打轉。
spin_unlock(&lock);    //與spin_lock()和spin_trylock()配對使用。
spin_lock_irq()
spin_unlock_irq()
spin_lock_irqsave()
spin_unlock_irqrestore()
spin_lock_bh()
spin_unlock_bh()

(3)使用舉例:

spinlock_t lock;     //定義自旋鎖  --全域性變數
spin_lock_init(&lock); //初始化自旋鎖    --初始化函式中
spin_lock(&lock);    // 獲取自旋鎖    --成對在操作前後使用
	//臨界區......
spin_unlock(&lock)    //釋放自旋鎖

3、訊號量:

(1)訊號量的特點:

a、睡眠等待、可以睡眠、可以執行耗時長的程序;

b、共享資源允許被多個不同的程序同時訪問;

c、訊號量常用於暫時無法獲取的共享資源,如果獲取失敗則程序進入不可中斷的睡眠狀態,只能由釋放資源的程序來喚醒;

d、訊號量不能在中斷服務程式中使用,因為中斷服務程式是不允許程序睡眠的;

(2)訊號量的使用:

struct semaphore {
    atomic_t count;         //共享計數值
    int sleepers;           //等待當前訊號量進入睡眠的程序個數
    wait_queue_head_t wait; // wait是當前訊號量的等待佇列
};

//count用於判斷是否可以獲取該訊號量:
//count大於0說明可以獲取訊號量;
//count小於等於0,不可以獲取訊號量;

  操作介面如下:

static inline void down(struct semaphore * sem)
//獲取訊號量,獲取失敗則進入睡眠狀態
static inline void up(struct semaphore * sem)
//釋放訊號量,並喚醒等待佇列中的第一個程序
int down_interruptible(struct semaphore * sem);
// down_interruptible能被訊號打斷;
int down_trylock(struct semaphore * sem);
//該函式嘗試獲得訊號量sem,如果能夠立刻獲得,它就獲得該訊號量並返回0,否則,返回非0值。

如:
down(sem);
...臨界區...
up(sem);

4、mutex_lock互斥鎖:

  互斥鎖主要用於實現核心中的互斥訪問功能。核心互斥鎖是在原子API之上實現的,但這對於核心使用者是不可見的。

對它的訪問必須遵循一些規則:同一時間只能有一個任務持有互斥鎖,而且只有這個任務可以對互斥鎖進行解鎖。互斥鎖

不能進行遞迴鎖定或解鎖。一個互斥鎖物件必須通過其API初始化,而不能使用memset或複製初始化。一個任務在持有互

斥鎖的時候是不能結束的。互斥鎖所使用的記憶體區域是不能被釋放的。使用中的互斥鎖是不能被重新初始化的。並且互斥

鎖不能用於中斷上下文。但是互斥鎖比當前的核心訊號量選項更快,並且更加緊湊。

(1)互斥體禁止多個執行緒同時進入受保護的程式碼“臨界區”(critical section),因此,在任意時刻,只有一個執行緒被允許

進入這樣的程式碼保護區。mutex實際上是count=1情況下的semaphore。

struct mutex {  
    atomic_t        count;  
    spinlock_t      wait_lock;  
    struct list_head    wait_list;  
    struct task_struct  *owner;
		 ...... 
};

  結構體成員說明:
     atomic_t count;指示互斥鎖的狀態:

     1 沒有上鎖,可以獲得;0 被鎖定,不能獲得,初始化為沒有上鎖;
   spinlock_t wait_lock;
     等待獲取互斥鎖中使用的自旋鎖,在獲取互斥鎖的過程中,操作會在自旋鎖的保護中進行,

     初始化為為鎖定。
   struct list_head wait_list;
      等待互斥鎖的程序佇列。

(2)mutex的使用:

a、初始化

mutex_init(&mutex); //動態初始化互斥鎖
DEFINE_MUTEX(mutexname); //靜態定義和初始化互斥鎖

b、上鎖

void mutex_lock(struct mutex *lock);
    //無法獲得鎖時,睡眠等待,不會被訊號中斷。

int mutex_trylock(struct mutex *lock);
   //此函式是 mutex_lock()的非阻塞版本,成功返回1,失敗返回0

int mutex_lock_interruptible(struct mutex *lock);
   //和mutex_lock()一樣,也是獲取互斥鎖。在獲得了互斥鎖或進入睡眠直
  //到獲得互斥鎖之後會返回0。如果在等待獲取鎖的時候進入睡眠狀態收到一
    //個訊號(被訊號打斷睡眠),則返回_EINIR。

c、解鎖
  void mutex_unlock(struct mutex *lock);

5、讀寫鎖:

  讀寫鎖實際是一種特殊的自旋鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,

寫者則需要對共享資源進行寫操作。這種鎖相對於自旋鎖而言,能提高併發性,因為在多處理器系統中,它允許同

時有多個讀者來訪問共享資源,最大可能的讀者數為實際的邏輯CPU數。寫者是排他性的,一個讀寫鎖同時只能有

一個寫者或多個讀者(與CPU數相關),但不能同時既有讀者又有寫者。

  如果讀寫鎖當前沒有讀者,也沒有寫者,那麼寫者可以立刻獲得讀寫鎖,否則它必須自旋在那裡,直到沒有任

何寫者或讀者。如果讀寫鎖沒有寫者,那麼讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那裡,直到寫者釋放

該讀寫鎖。

(1)操作函式介面:

rwlock_init(x)  
DEFINE_RWLOCK(x) 
read_trylock(lock)
write_trylock(lock)
read_lock(lock)
write_lock(lock)
read_unlock(lock)
write_unlock(lock)

(2)總結:

A、讀寫鎖本質上就是一個計數器,初始化值為0x01000000,表示最多可以有0x01000000個讀者同時獲取讀鎖;

B、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1,若結果符號位為0時,獲取讀鎖成功;

C、獲取讀鎖時,rw計數減1,判斷結果的符號位是否為1。若結果符號位為1時,獲取讀鎖失敗,表示此時讀寫鎖被寫者

佔有,此時呼叫__read_lock_failed失敗處理函式,迴圈測試rw+1的值,直到結果的值大於等於1;

D、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果為0時,獲取寫鎖成功;

E、獲取寫鎖時,rw計數減RW_LOCK_BIAS_STR,即rw-0x01000000,判斷結果是否為0。若結果不為0時,獲取寫鎖失敗,

表示此時有讀者佔有讀寫鎖或有寫著佔有讀寫鎖,此時呼叫__write_lock_failed失敗處理函式,迴圈測試rw+0x01000000,

直到結果的值等於0x01000000;

F、通過對讀寫鎖的原始碼分析,可以看出讀寫鎖其實是帶計數的特殊自旋鎖,能同時被多個讀者佔有或一個寫者佔有,

但不能同時被讀者和寫者佔有。

6、讀寫訊號量:

(1)特點:

  a、同一時刻最多有一個寫者(writer)獲得鎖;

  b、同一時刻可以有多個讀者(reader)獲得鎖;

  c、同一時刻寫者和讀者不能同時獲得鎖;

(2)相關結構和函式介面:

struct rw_semaphore {
     /*讀/寫訊號量定義:
     * - 如果activity為0,那麼沒有啟用的讀者或寫者。
     * - 如果activity為+ve,那麼將有ve個啟用的讀者。
     * - 如果activity為-1,那麼將有1個啟用的寫者。 */
     __s32			activity;   /*訊號量值*/
     spinlock_t		wait_lock;   /*用於鎖等待佇列wait_list*/
     struct list_head wait_list; /*如果非空,表示有程序等待該訊號量*/
};
void init_rwsem(struct rw_semaphore* rw_sem); //初始化讀寫訊號量

void down_read(struct rw_semaphore* rw_sem); //獲取讀訊號量
int down_read_trylock(struct rw_semaphore* rw_sem); //嘗試獲取讀訊號量
void up_read(struct rw_semaphore* rw_sem);             

void down_write(struct rw_semaphore* rw_sem); //獲取寫訊號量
int down_write_trylock(struct rw_semaphore* rw_sem);//嘗試獲取寫訊號量
void up_write(struct rw_semaphore* rw_sem);       

(3)使用方法:

	rw_semaphore sem;   
	init_rwsem(&sem);   

	down_read(&sem);    
	...臨界區...          
	up_read(&sem);     

	down_write(&sem);   
	...臨界區...             
	up_write(&sem);    

三、總結與拓展

1、針對上面的各種同步機制直接的差異,可以如下表格清晰的表明:

  

2、在解決穩定性相關的問題時,難免會出現一些死鎖的問題,可以新增一些debug巨集來配合除錯:

CONFIG_LOCK_STAT=y
CONFIG_DEBUG_LOCKDEP=y
CONFIG_PROVE_LOCKING=y
CONFIG_DEBUG_MUTEXES=y
CONFIG_DEBUG_LOCK_ALLOC=y
CONFIG_RWSEM_SPIN_ON_OWNER=y

  如下有一個簡單的死鎖的例程:

static DEFINE_SPINLOCK(hack_spinA);
static DEFINE_SPINLOCK(hack_spinB);
void hack_spinBA(void)
{
    printk("%s(),hack A and B\n",__FUNCTION__);
    spin_lock(&hack_spinA);
    spin_lock(&hack_spinB);
}
void hack_spinAB(void)
{
    printk("%s(),hack A \n",__FUNCTION__);
    spin_lock(&hack_spinA);
    spin_lock(&hack_spinB);
}
static ssize_t hello_test_show(struct device *dev, struct device_attribute *attr, char *buf)
{
    printk("%s() \n",__FUNCTION__);
    hack_spinBA();
    return 0;
}
static ssize_t hello_test_store(struct device *dev, struct device_attribute *attr, const char *buf, size_t count)
{
    int test;
    printk("hello_test %s,%d\n",__FUNCTION__,__LINE__);
    
    test = 4;
    hack_spinAB();
    return printk("%s() test = %d\n",__FUNCTION__,test);
}
static DEVICE_ATTR(hello_test, 0664, hello_test_show, hello_test_store);

static int hello_test_probe(struct platform_device *pdev)
{
	printk("%s()\n",__FUNCTION__);
	
    device_create_file(&pdev->dev, &dev_attr_hello_test);
    
	return 0;
}

  上面的例子中,先cat對應的節點會先後拿住hack_spinA和hack_spinB的兩把鎖,但並沒有去釋放這兩把鎖,

所以再對應的相應節點做echo操作時,會出現一直無法拿住這把鎖,等待30S後會觸發HWT的重啟,看重啟的DB

檔案可以發現:

/***********在58S時通過cat 執行到了hello_test_show函式,從而拿住了鎖********/
[   58.453863]  (1)[2577:cat]Dump cpuinfo
[   58.456057]  (1)[2577:cat]hello_test_show() 
[   58.456093]  (1)[2577:cat]hack_spinBA(),hack A and B
[   58.457426]  (1)[2577:cat]note: cat[2577] exited with preempt_count 2
[   58.458884]  (1)[2577:cat]
[   58.458920]  (1)[2577:cat]=====================================
[   58.458931]  (1)[2577:cat][ BUG: cat/2577 still has locks held! ]
[   58.458944]  (1)[2577:cat]4.4.146+ #5 Tainted: G        W  O   
[   58.458955]  (1)[2577:cat]-------------------------------------
[   58.458965]  (1)[2577:cat]lockdep: [Caution!] cat/2577 is runable state
[   58.458976]  (1)[2577:cat]lockdep: 2 locks held by cat/2577:
[   58.458988]  #0:  (hack_spinA){......}
[   58.459015] lockdep: , at:  (1)[2577:cat][<c0845e8c>] hack_spinBA+0x24/0x3c
[   58.459049]  #1:  (hack_spinB){......}
[   58.459074] lockdep: , at:  (1)[2577:cat][<c0845e94>] hack_spinBA+0x2c/0x3c
[   58.459098]  (1)[2577:cat]
/**********在64S是通過echo指令呼叫到hello_test_store再次去拿鎖,導致死鎖******/
[   64.083878]  (2)[1906:sh]hello_test hello_test_store,41
[   64.083914]  (2)[1906:sh]hack_spinAB(),hack A 

/**********在94S時出現 HWT 重啟***************/
[   94.092084] -(2)[1906:sh][<c0836a40>] (aee_wdt_atf_info) from [<c0837428>] (aee_wdt_atf_entry+0x180/0x1b8)
[   94.093571] -(2)[1906:sh][<c08372a8>] (aee_wdt_atf_entry) from [<c0152bfc>] (preempt_count_sub+0xe4/0x100)
[   94.095055] -(2)[1906:sh][<c019694c>] (do_raw_spin_lock) from [<c0c6f5f8>] (_raw_spin_lock+0x48/0x50)
[   94.096548] -(2)[1906:sh][<c0c6f5b0>] (_raw_spin_lock) from [<c0845ef4>] (hack_spinAB+0x24/0x3c)
[   94.097666] -(2)[1906:sh][<c0845ed0>] (hack_spinAB) from [<c0845f30>] (hello_test_store+0x24/0x44)
[   94.098785] -(2)[1906:sh][<c0845f0c>] (hello_test_store) from [<c04c56cc>] (dev_attr_store+0x20/0x2c)

  再去看CPU的喂狗資訊:kick=0x0000000b,check=0x0000000f,

  可以發現是因為CPU2死鎖導致沒有及時喂狗,所以觸發HWT重啟,看CPU2的堆疊也是掛載sh的程序上:

cpu#2: Online
  .nr_running                    : 5
  .load  
runnable tasks:
            task   PID         tree-key  switches  prio     wait-time             sum-exec        sum-sleep
---------------------------------------------------
         DispSync   470         0.000000      1054    97         0.000000       157.709688         0.000000 /
      kworker/2:2  1083     56925.416956       264   120       119.050538        21.742080     35541.090392 /
R              sh  1906     84650.355638       115   120        14.202615     28671.890146     21283.409357 /

 

作者:frank_zyp
您的支援是對博主最大的鼓勵,感謝您的認真閱讀。
本文無所謂版權,歡迎轉載。