1. 程式人生 > >Linux驅動開發-併發控制

Linux驅動開發-併發控制

1.併發與競態

 併發(Concurrency)是指多個單元同時、並行被執行,而併發執行單元對共享資源(硬體資源和軟體上的全域性變數,靜態變數等)的訪問很容易導致競態(Race Conditions)

競爭狀態的分類:

對稱多處理器(SMP)的多個CPU SMP是一種緊耦合、共享儲存的系統型別,因為多個CPU同時共享系統匯流排,因此可以訪問共同的外設和儲存器
單CPU內程序與搶佔它的程序 linux2.6以後支援核心搶佔排程,一個程序在核心執行的時候可能耗完了自己的時間片,也可能被另一個高優先順序程序打斷,程序和搶佔它的程序訪問共享資源,競態發生
中斷(硬中斷、軟中斷、Tasklet、底半部)與程序之間 中斷打斷程序,中斷打斷中斷(中斷程式訪問程序或另一箇中斷正在訪問的資源,則競態發生)

競態的解決方法是:

保證對共享資源的互斥訪問(即一個執行單元在訪問共享資源時,其他的執行單元被禁止訪問)訪問共享資源的程式碼區域被稱為臨界區(critical sections),臨界區需要被以某種互斥機制加以保護。
Linux常見互斥機制:中斷遮蔽、原子操作、自旋鎖和訊號量、互斥體

2.編譯亂序和執行亂序

程式在執行時記憶體實際的訪問順序和程式程式碼編寫的訪問順序不一定一致,這就是記憶體亂序訪問。記憶體亂序訪問行為出現的理由是為了提升程式執行時的效能。記憶體亂序訪問主要發生在兩個階段:

  1. 編譯時,編譯器優化導致記憶體亂序訪問(指令重排)
  2. 執行時,多 CPU 間互動引起記憶體亂序訪問

防止編譯亂序:

未加屏障 加編譯屏障

// test.cpp

int x, y, r;

void f()

{

x = r;

y = 1;

}

int x, y, r;

void f()

{

x = r;

__asm__ __volatile__("" ::: "memory");

y = 1;

}

  1. g++ -S test.cpp    沒有亂序
  2. g++ -O2 -S test.cpp  使用優化選項變異,亂序(y記憶體訪問在x之前了)
編譯後,對於x的記憶體訪問必定在y賦值之前

執行亂序:(主要表現在多CPU上)

如果是單核CPU,執行程式時碰到依賴點(如f=1;while(f==0);//會等待f=1執行完,再執行while),會等待,因此程式設計師感受不到亂序;

但是,這個依賴點等待對於其他核是不可見的,例如:

CPU0:

while(f==0);//wait
printf(x);

CPU1:

x=42;
f=1;
  1. 在CPU0中,x的列印依賴於while迴圈的結束,但是CPU1並不知道這一依賴;
  2. 因此在CPU1中對於x,f的賦值是亂序的(先賦值x或者先賦值f都是有可能的);
  3. 所以CPU0列印的x資訊並不一定是42!

執行亂序的解決方法:

概括的講:ISB>DSB>DMB

屏障指令 功能 應用 解釋
DMB(Data memory barrier) 資料記憶體屏障:DMB可以繼續執行之後的指令,只要這條指令不是記憶體訪問指令;

core0:write A;DMB;write B

core1:Load B;Load A

寫入A完成後才能寫入B,因此載入B的值正確是,A的值也必然正確
DSB(Data Synchronization Barrier) 資料同步指令:等待DSB之前的所有指令完成(包括指令前的所有快取,跳轉預測,TLB維護操作)
ISB(Instruction Synchronization Barrier) 指令同步屏障:Flush流水線,使指令之後執行的指令都是從快取或記憶體中獲得的

3.原子操作

3.1原子操作原理

要了解原子操作,首先需要了解LDREX,STREX指令,

首先我們看一下LDR和STR的含義:

  • LDR   ---   Load from memory into a register(從記憶體中載入資料,存到暫存器)
  • STR   ---   Store from a register into memory(暫存器中資料儲存到記憶體裡)

字尾EX其實是Exclusive(獨佔的);

LDREX和STREX總結:

  • LDREX將記憶體中的某個資料拷貝到暫存器中,並設定該記憶體地址為獨佔 ;
  • STREX試圖將暫存器資料寫回記憶體,即更新該值,若更新時檢測到 該記憶體為獨佔,則更新成功,並去掉獨佔標誌,否則更新失敗,對一個暫存器寫入1(下面例子中的result);
  • 通過這兩個指令的配合,能檢測程式碼段中該記憶體是否被併發訪問過,如果被訪問過 ,STREX就會失敗!!!

之後我們可以來看原子操作的原始碼:(以atomic_add()atomic_add_return()為例)

//保證原子操作的輸入引數都是atomic結構體,因此可以對原子操作進行計數
typedef struct {
    int counter;
} atomic_t;
#if __LINUX_ARM_ARCH__ >= 6 ----------------------(1)
static inline void atomic_add(int i, atomic_t *v)
{
    unsigned long tmp;
    int result;
//prefetchw : 將counter的值讀入記憶體中
    prefetchw(&v->counter); -------------------------(2)
    __asm__ __volatile__("@ atomic_add\n" ------------------(3)
"1:    ldrex    %0, [%3]\n" --------------------------(4)
"    add    %0, %0, %4\n" --------------------------(5)
"    strex    %1, %0, [%3]\n" -------------------------(6)
"    teq    %1, #0\n" -----------------------------(7)
"    bne    1b"
    : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) ---對應%0,%1,%2
    : "r" (&v->counter), "Ir" (i) -------------對應%3,%4
    : "cc");
}

#else

#ifdef CONFIG_SMP
#error SMP not supported on pre-ARMv6 CPUs
#endif

static inline int atomic_add_return(int i, atomic_t *v)
{
    unsigned long flags;
    int val;

    raw_local_irq_save(flags);
    val = v->counter;
    v->counter = val += i;
    raw_local_irq_restore(flags);

    return val;
}
#define atomic_add(i, v)    (void) atomic_add_return(i, v)

#endif

最終要的是下面的程式碼的含義

 __asm__ __volatile__("@ atomic_add\n"                    //__asm_ _volatile()表示下面的彙編程式碼編譯不要優化;@表示該行是註釋           
"1:    ldrex    %0, [%3]\n"                                                //%3就是"r",%0就是"=&r",從記憶體中讀(&v-counter),並存到另一個暫存器中
"    add    %0, %0, %4\n"                                                //%0暫存器記錄的是v-counter的值,這裡 的操作是暫存器值+1
"    strex    %1, %0, [%3]\n"                                   //將暫存器值寫回記憶體中,即更新記憶體中v->counter值,如果成功%1暫存器值為0,否則為1
"    teq    %1, #0\n"                                                 //比較%1的值是不是0,若不是,跳回第一步重新執行
"    bne    1b"                                                      

備註:

%3就是input operand list中的"r" (&v->counter),r是限制符(constraint),用來告訴編譯器gcc,去選擇一個通用暫存器儲存該運算元吧。%0對應output openrand list中的"=&r" (result),=表示該運算元是write only的,&表示該運算元是一個earlyclobber operand,具體是什麼意思呢?編譯器在處理嵌入式彙編的時候,傾向使用盡可能少的暫存器,如果output operand沒有&修飾的話,彙編指令中的input和output運算元會使用同樣一個暫存器。因此,&確保了%3和%0使用不同的暫存器。

(5)完成步驟(4)後,%0這個output運算元已經被賦值為atomic_t變數的old value,毫無疑問,這裡的操作是要給old value加上i。這裡%4對應"Ir" (i),這裡“I”這個限制符對應ARM平臺,表示這是一個有特定限制的立即數,該數必須是0~255之間的一個整數通過rotation的操作得到的一個32bit的立即數。這是和ARM的data-processing instructions如何解析立即數有關的。每個指令32個bit,其中12個bit被用來表示立即數,其中8個bit是真正的資料,4個bit用來表示如何rotation。更詳細的內容請參考ARM ARM文件。

(6)這一步將修改後的new value儲存在atomic_t變數中。是否能夠正確的操作的狀態標記儲存在%1運算元中,也就是"=&r" (tmp)。

(7)檢查memory update的操作是否正確完成,如果OK,皆大歡喜,如果發生了問題(有其他的核心路徑插入),那麼需要跳轉到lable 1那裡,從新進行一次read-modify-write的操作

原子操作原始碼總結:

ldrex(記憶體讀資料到暫存器)---->atomic.counter操作(+,-,等)------>strex(嘗試寫資料到記憶體中)

但是:如果原子操作過程中,如果發生過併發的訪問,那麼strex會執行失敗 ,跳回ldrex重新執行 !!!!

3.2 整型原子操作和位原子操作

整型原子操作:

Function Name Explain
void atomic_set(atomic *v,int i) 設定原子變數v->counter為1
atomic v=ATOMIC_INIT(0) 定義原子變數v,並初始化v->counter為0
atomic_read(atomic *v) 返回原子變數的值
void atomic_add(int i,atomic *v) 原子變數值加i
void atomic_sub(int i,atomic *v)  原子變數值減i
void atomic_inc(atomic *v) 原子變數值自增
void atomic_dec(atomic *v) 原子變數值自減

int atomic_inc_and_test(atomic *v)

int atomic_dec_and_test(atomic *v)

int atomic_sub_and_test(int i,atomic *v)

操作(自增,自減,減)並返回,測試操作是否為0;

為0返回true

否則返回false

void atomic_add_return(int i,atomic *v)

void atomic_sub_return(int i,atomic *v)

void atomic_inc_return(int i,atomic *v)

void atomic_dec_return(int i,atomic *v)

操作並返回新的值;

先返回測試值,再操作!!!!!

位原子操作:

void set_bit(nr,void *addr) 設定addr地址的第nr位(該位寫1)
void clear_bit(nr,void *addr) 清除addr地址的第nr位(該位寫0)
void change_bit(nr,void *addr)

addr地址的第nr位取反

test_bit(nr,void *addr) 測試addr地址的第nr位,返回值

int test_and_set_bit(nr,void *addr)

int test_and_clear_bit(nr,void *addr)

int test_and_change_bit(nr,void *addr)

測試並操作位,返回測試值

3.3.例子:使用原子變數使裝置只能被一個程序開啟

static atomic_t xxx_available=ATOMIC_INIT(1); //定義原子變數xxx_available,初值為1
static int xxx_open(struct inode *inode,struct file *filep)
{
....
    if(!atomic_dec_and_test(&xxx_available))
    {
        atomic_inc(&xxx_available);
        return -EBUSY;                        //已經開啟
    }
....
    return 0;
}
static int xxx_release(struct inode *inode,struct file *filep)
{
    atomic_inc(&xxx_available);            //釋放裝置
    return 0;
}

假設裝置未開啟,則xxx_available=1;

開啟裝置,執行xxx_open,執行atomic_dec_and_test(),先返回值1,再執行xxx_avaiable--;此時xxx_available=0,裝置開啟;

另一個程序也想開啟裝置(二次開啟),執行xxx_open,執行atomic_dec_and_test(),因為裝置已經開啟,返回xxx_available值0,在讓xxx_available=-1,if條件滿足,執行xxx_available++,返回EBUSY;

釋放裝置,xxx_available=0+1;

4.自旋鎖