1. 程式人生 > >linux裝置驅動:併發和競態

linux裝置驅動:併發和競態

綜述

首先什麼是併發與竟態呢?併發(concurrency)指的是多個執行單元同時、並行被執行。而併發的執行單元對共享資源(硬體資源和軟體上的全域性、靜態變數)的訪問則容易導致競態(race conditions)。因此再設計自己的驅動程式時,第一個要記住的原則是,只要可能,避免資源的共享。這種思想最明顯的應用就是避免使用全域性變數。競態通常作為可能導致併發和竟態的情況有:

  1. SMP(Symmetric Multi-Processing),對稱多處理結構。SMP是一種緊耦合、共享儲存的系統模型,它的特點是多個CPU使用共同的系統匯流排,因此可訪問共同的外設和儲存器。
  2. 中斷。中斷可 打斷正在執行的程序,若中斷處理程式訪問程序正在訪問的資源,則競態也會發生。中斷也可能被新的更高優先順序的中斷打斷,因此,多箇中斷之間也可能引起併發而導致競態。
  3. 核心程序的搶佔。linux是可搶佔的,所以一個核心程序可能被另一個高優先順序的核心程序搶佔。如果兩個程序共同訪問共享資源,就會出現竟態。

以上三種情況只有SMP是真正意義上的並行,而其他都是巨集觀上的並行,微觀上的序列。但其都會引發對臨界共享區的競爭問題。而解決競態問題的途徑是保證對共享資源的互斥訪問,即一個執行單元在訪問共享資源的時候,其他的執行單元被禁止訪問。那麼linux核心中如何做到對對共享資源的互斥訪問呢?在linux驅動程式設計中,常用的解決併發與竟態的手段有訊號量與互斥鎖,Completions 機制,自旋鎖(spin lock),以及一些其他的不使用鎖的實現方式。下面一一介紹。

訊號量和互斥體

一個訊號量本質是上是一個整數值,進入臨界區的程序呼叫P, 如果訊號量的值大於零, 則這個值減一, 而程序可以繼續。相反,如果訊號量的值為零(或更小),程序必須等待,直到其他人釋放該訊號量。對訊號量的解鎖呼叫V,該函式增加訊號量的值,並在必要時喚醒等待的程序。當訊號量的初始值為1時, 就變成了互斥鎖。這種訊號量在任何給定時刻只能有單個程序或執行緒擁有。
訊號量的典型使用形式:

//宣告訊號量
struct semaphore sem;
//初始化訊號量
void sema_init(struct semaphore *sem, int val);

//以下簡便的宣告和初始化一個互斥體
DECLEAR_MUTEX(NAME); //訊號量name 被初始化為1 DECLEAR_MUTEX_LOCKED(NAME); //訊號量name 被初始化為0 //常用操作 DECLEAR_MUTEX(test_sem); down(&test_sem); // 獲取訊號量,減小訊號量的值 .... //臨界區 .... up(&test_sem); //釋放訊號量, 增加訊號量的值

常見的down 操作還有

//down() 等待訊號量進入休眠,不能被外部訊號打斷,而down_interruptible() 進入休眠的程序能被訊號打斷
int down_interruptible(struct semaphore *sem);
//嘗試獲得訊號量,若立即獲得,他就獲得訊號量並返回0,**否則,他不會導致呼叫者休眠,返回非0值,可在中斷上下文使用**。
int down_trylock(struct semaphore *sem);

Completions 機制

  1. 什麼是completions機制?
    在核心程式設計中常有這樣的場景,在當前執行緒中建立一個執行緒,並且等待它完成之後再繼續執行。通常可以用訊號量來解決它,也可以用completion機制來解決。
  2. 為什麼用completions ,它比訊號量好在哪?
    使用completion比使用訊號量簡單。使用completion可以一次性喚醒所有等待程序,而用訊號量會比較麻煩。
//介面方式建立completion
DECLARE_COMPLETION(my_completion);

//動態初始化
struct completion my_completion;
/*....*/
init_comp;etion(&my_completion);

//等待一個completion被喚醒, 非中斷的等待如果程式碼呼叫了wait_for_completion() 且沒有人會完成該任務,則產生一個不可殺死的程序
void wait_for_completion(struct completion *c);


// 喚醒completion
void complete(struct completion *c);
void complete_all(struct completion *c);

自旋鎖

自旋鎖可在不能休眠的程式碼中使用,當鎖被其他人獲得,則程式碼進入忙迴圈並重複檢查這個鎖,知道該鎖可用。自旋鎖典型的應用在中斷處理函式中。使用於自旋鎖的核心規則是:1.任何擁有自旋鎖的程式碼都必須是原子的,他不能休眠 2.必須在可能的最短時間內擁有

// 定義自旋鎖
spinlock_t my_lock;

//初始化自旋鎖
void spin_lock_init(spinlock_t *lock);

//進入臨界區,呼叫下面函式獲得鎖
void spin_lock(spinlock_t *lock);

//釋放鎖
void spin_unlock(spinlock_t *lock);

//非阻塞的自旋鎖的操作,沒有獲取鎖,返回零, 不在自旋
int spin_tyrlock(spinlock_t *lock);

還有另外一種情形: 我們的驅動正在執行,並且已經獲得一個鎖,這個鎖控制著對裝置的訪問,在擁有這個鎖的時候,裝置產生一箇中斷,它導致中斷處理例程被呼叫。而中斷處理例程在訪問裝置之前,也要獲得這個鎖。這個時候中斷例程自旋,非中斷程式碼將沒有機會來釋放這個鎖。為防止這種影響,提供了以下api

void spin_lock_irqsave(spinlock_t *lock, unsigned long falgs);
void spin_lock_irq(spinlock_t *lock); //禁用本地處理器的中斷
void spin_lock_bh(spinlock_t *lock)   //禁用軟體中斷

除了鎖之外的辦法

免鎖演算法

經常用於免鎖的生產者/消費者任務的資料結構之一是迴圈緩衝區。在linux核心中就有一個通用的無鎖的環形緩衝實現,具體內容參考

原子變數與位操作

原子操作指的是在執行過程中不會被別的程式碼路徑所中斷的操作。原子變數與位操作都是原子操作。以下是其相關操作介紹。



// 設定原子變數的值  
void atomic_set(atomic_t *v, int i);  // 設定原子變數的值為i  
atomic_t v = ATOMIC_INIT(0);  // 定義原子變數v,並初始化為0  

// 獲取原子變數的值  
atomic_read(atomic_t *v);  // 返回原子變數的值  

// 原子變數加/減  
void atomic_add(int i, atomic_t *v);  // 原子變數加i  
void atomic_sub(int i, atomic_t *v);  // 原子變數減i  

// 原子變數自增/自減  
void atomic_inc(atomic_t *v);  // 原子變數增加1  
void atomic_dec(atomic_t *v);  // 原子變數減少1  

// 操作並測試:對原子變數進行自增、自減和減操作後(沒有加)測試其是否為0,為0則返回true,否則返回false  
int atomic_inc_and_test(atomic_t *v);  
int atomic_dec_and_test(atomic_t *v);  
int atomic_sub_and_test(int i, atomic_t *v);  

// 操作並返回: 對原子變數進行加/減和自增/自減操作,並返回新的值  
int atomic_add_return(int i, atomic_t *v);  
int atomic_sub_return(int i, atomic_t *v);  
int atomic_inc_return(atomic_t *v);  
int atomic_dec_return(atomic_t *v);  
  位原子操作:  
// 設定位  
void set_bit(nr, void *addr);  // 設定addr地址的第nr位,即將位寫1  

// 清除位  
void clear_bit(nr, void *addr);  // 清除addr地址的第nr位,即將位寫0  

// 改變位  
void change_bit(nr, void *addr);  // 對addr地址的第nr位取反  

// 測試位  
test_bit(nr, void *addr); // 返回addr地址的第nr位  

// 測試並操作:等同於執行test_bit(nr, void *addr)後再執行xxx_bit(nr, void *addr)  
int test_and_set_bit(nr, void *addr);  
int test_and_clear_bit(nr, void *addr);  
int test_and_change_bit(nr, void *addr);  

seqlock(順序鎖)

使用seqlock鎖,讀執行單元不會被寫執行單元阻塞,即讀執行單元可以在寫執行單元對被seqlock鎖保護的共享資源進行寫操作時仍然可以繼續讀,而不必等待寫執行單元完成寫操作,寫執行單元也不需要等待所有讀執行單元完成讀操作才去進行寫操作。寫執行單元之間仍是互斥的。若讀操作期間,發生了寫操作,必須重新讀取資料。seqlock鎖必須要求被保護的共享資源不含有指標。

// 獲得順序鎖  
void write_seqlock(seqlock_t *sl);  
int write_tryseqlock(seqlock_t *sl);  
write_seqlock_irqsave(lock, flags)  
write_seqlock_irq(lock)  
write_seqlock_bh()  

// 釋放順序鎖  
void write_sequnlock(seqlock_t *sl);  
write_sequnlock_irqrestore(lock, flags)  
write_sequnlock_irq(lock)  
write_sequnlock_bh()  

// 寫執行單元使用順序鎖的模式如下:  
write_seqlock(&seqlock_a);  
...  // 寫操作程式碼塊  
write_sequnlock(&seqlock_a);  
  讀執行單元操作:  
// 讀開始:返回順序鎖sl當前順序號  
unsigned read_seqbegin(const seqlock_t *sl);  
read_seqbegin_irqsave(lock, flags)  

// 重讀:讀執行單元在訪問完被順序鎖sl保護的共享資源後需要呼叫該函式來檢查,在讀訪問期間是否有寫操作。若有寫操作,重讀  
int read_seqretry(const seqlock_t *sl, unsigned iv);  
read_seqretry_irqrestore(lock, iv, flags)  

// 讀執行單元使用順序鎖的模式如下:  
do{  
    seqnum = read_seqbegin(&seqlock_a);  
    // 讀操作程式碼塊   
    ...  
}while(read_seqretry(&seqlock_a, seqnum));  

讀取-拷貝-更新(RCU)

讀取-拷貝-更新(RCU) 是一個高階的互斥方法,在合適的時候可以取得非常高的效率。RCU可以看作讀寫鎖的高效能版本,相比讀寫鎖,RCU的優點在於既允許多個讀執行單元同時訪問被保護的資料,又允許多個讀執行單元和多個寫執行單元同時訪問被保護的資料。但是RCU不能替代讀寫鎖,因為如果寫比較多時,對讀執行單元的效能提高不能彌補寫執行單元導致的損失。由於平時應用較少,所以不做多說。

小結

以上就是linux驅動程式設計中涉及的併發與競態的內容,下面做一個簡單的小結。

現在的處理器基本上都是SMP型別的,而且在新的核心版本中,基本上都支援搶佔式的操作,在linux中很多程式都是可重入的,要保護這些資料,就得使用不同的鎖機制。而鎖機制的基本操作過程其實大同小異的,宣告變數,上鎖,執行臨界區程式碼,然後再解鎖。不同點在於,可以重入的限制不同,有的可以無限制重入,有的只允許異種操作重入,而有的是不允許重入操作的,有的可以在可睡眠程式碼中使用,有的不可以在可睡眠程式碼中使用。而在考慮不同的鎖機制的使用時,也要考慮CPU處理的效率問題,對於不同的程式碼長度,不同的程式碼執行時間,選擇一個好的鎖對CPU的良好使用有很大的影響,否則將造成浪費。