1. 程式人生 > >無鎖程式設計(CAS)

無鎖程式設計(CAS)

CAS操作

所謂CAS指Compare and Set(或Compare and Swap)。現在幾乎所有CPU指令都支援CAS,如X86的CMPXCHG彙編指令。CAS通常被視為無鎖(lock free)資料結構的基礎。CAS的C語言描述如下:

int compare_and_swap(int* reg,int oldv,int newv){int old_reg_v =*reg;if(old_reg_v == oldv){*reg = newv;}return old_reg_v;}bool compare_and_swap(int* accum,int* dest,int newv){if
(*accum ==*dest){*dest = newv;returntrue;}returnfalse;}

返回bool的好處在於呼叫者可以知道是否成功更新。與CAS類似的其他原子操作如:Fetch and Add(+1操作)、Test-and-set(寫值到指定記憶體位置並返回舊值,彙編指令BST)、Test and Test-and-set。

無鎖佇列的連結串列實現。入佇列:

push(x){
    q =newRecord();
    q->value = x;
    q->next= NULL;do{
        p = tail;/* fetch tail pointer */
}while(!CAS(p->next, NULL, q)); CAS(tail, p, q);/* set tail point to q (the new record) */}

當while迴圈結束時,q已經加入佇列成為最後一個元素,但tail指標還沒有更新,最後只需將tail指向q即可。分析可能存在的競爭:當thread1將q加入佇列,但還未更新tail時,切換到thread2執行,此時thread2獲取了tail的值,但tail->next != NULL,因此CAS失敗,thread2繼續在while迴圈,直到thread1更新tail值後(tail->next == NULL),thread2才能跳出while迴圈。

這裡有一個潛在問題:如果thread1在把q加入佇列後,更新tail前停止運行了,其他執行緒將進入死迴圈。為防止這種異常,可以在do-while迴圈中,直接搜尋tail指標:

push(x){
    q =newRecord();
    q->value = x;
    q->next= NULL;

    oldp = p = tail;do{/* fetch tail pointer directly */while(p->next) 
            p = p->next;}while(!CAS(p->next, NULL, q));/* TODO: bug here! */
    CAS(tail, oldp, q);/* set tail point to q (the new record) */}

出佇列:

pop(){do{
        p = head;if(!p->next){returnfalse;}}while(!CAS(head, p, p->next));return p->next->value;}

注意出佇列的操作物件是header->next而不是head本身,這樣設計的目的是防止連結串列中只有一個元素,head和tail指向同一節點的問題。

ABA問題

ABA問題:

  1. P1讀取指定記憶體的值為A
  2. P1被掛起P2執行
  3. P2將指定記憶體的值從A修改為B,再改回A。
  4. 再次排程到P1
  5. P1發現指定記憶體的值沒有變,於是繼續執行。

lock free演算法中容易出現ABA問題,特別是用指標的實現。下面有一個很好的比喻:

你拿著一個裝滿錢的手提箱在機場,迎面走來一個辣妹,然後她很曖昧地挑逗著你,並趁你不注意時把你的手提箱掉包。辣妹之前就準備了多個跟你同款的手提包,當她拿到你手提包時,把錢掏空,並從手頭所有的皮箱中(包括剛被掏空那個)任選一個掉包給你。如果你在皮箱上留了記號,大部分情況下,還是能覺察到被調包了;然而如果她恰好選擇“物歸原主”,光看皮箱你是無法作出判斷的。

Sutter介紹了什麼叫ordered atomic變數(lock-free-safe):原子性(Atomicity)表示對其他所有讀、寫執行緒而言,當前執行緒的讀寫操作是原子的,要麼成功、要麼失敗,不會處於一箇中間狀態(all-or-nothing)。原子性變數通常是指標(C/C++)或物件引用(Java、.NET)或整型變數,這些變數通常都等同於CPU的一個字長(word)。有序(order)表示讀、寫操作都按原始碼次序進行,編譯器、CPU、快取都會遵循原始碼的次序。所謂CAS,可以這樣理解:

variable.compare_exchange(expect_value, new_value);

如果variable的值等於expect_value,就用new_value替換它。我們常把它放到if中:

if(variable.compare_exchange(x,y)){}

Ordered atomic變數在C++11中就是atomic。如果語言沒有提供Ordered atomic變數,可以自己模擬:使用CPU字長的變數;要滿足有序(order),則用系統提供的API(如Win32的InterlockedCompareExchange)或記憶體屏障(memory fences/barriers;如Linux的mb)。

下面Sutter給了一段程式碼:實現無鎖佇列。這裡假設只有1個生產者和1個消費者。佇列類(連結串列實現):

templateclassLockFreeQueue{private:structNode{Node(T v): value(v),next(nullptr){}
        T value;Node*next;};Node* first;/* for producer only */
    atomic<Node*> divider,last;/* shared */

接下來是構造(構造一個dummy指標)、解構函式(釋放未消費的節點):

public:LockFreeQueue(){/* add dummy seperator */
        first = divider =last=newNode(T());}~LockFreeQueue(){while(first !=nullptr){/* release list */Node* t = first;
            first = t->next;delete t;}}

生產者函式(只被生產者執行緒呼叫):

/* Produce is called by producer thread only */voidProduce(const T& t){last->next=newNode(t);/* append new item; private for producer */last=last->next;/* update last; publish it */while(first != divider){/* trim unused nodes */Node* t = first;
            first = first->next;delete t;}}

生產者函式第1行:last->next = new Node(t); 這裡新元素只有生產者可見,因為還沒更新last指標。

生產者函式第2行:last = last->next; 原子性地更新last指標。

生產者函式接下來是通過迴圈清除沒有用的節點。不必擔心divider在consumer中被修改,因為我們總能讀取到最新的divider值。

消費者函式(只被消費者執行緒呼叫):

/* Consume is called by consumer thread only */boolConsume(T& result){if(divider !=last){
            result = divider->next->value;
            divider = divider->next;returntrue;}returnfalse;}};

再回頭看佇列的成員:first、divider、last。生產者獨佔first,讀divider、寫last;消費者寫divider、讀last,因此它們的型別分別為:Node*、atomic<Node*>、atomic<Node*>。

多執行緒記憶體模型