用原子操作實現無鎖程式設計
阿新 • • 發佈:2019-02-18
假設我們要維護一個全域性的執行緒安全的 int 型別變數 count, 下面這兩行程式碼都是很危險的:
count ++;
count += n;
我們知道, 高階語言中的一條語句, 並不是一個原子操作. 比如一個最簡單的自增操作就分為三步:
1. 從快取取到暫存器
2. 在暫存器加1
3. 存入快取。
多個執行緒訪問同一塊記憶體時, 需要加鎖來保證訪問操作是互斥的.
所以, 我們可以在操作 count 的時候加一個互斥鎖. 如下面的程式碼:
pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&count_lock);
count++;
pthread_mutex_unlock(&count_lock);
另一個辦法就是, 讓 count++ 和 count+=n 這樣的語句變成原子操作. 一個原子操作必然是執行緒安全的. 有兩種使用原子操作的方式:
1. 使用 gcc 的原子操作
2. 使用 c++11中STL中的 stomic 類的函式
在這裡我只介紹 gcc 裡的原子操作, 這些函式分成以下幾組: type __sync_fetch_and_add (type *ptr, type value, ...) type __sync_fetch_and_sub (type *ptr, type value, ...) type __sync_fetch_and_or (type *ptr, type value, ...) type __sync_fetch_and_and (type *ptr, type value, ...) type __sync_fetch_and_xor (type *ptr, type value, ...) type __sync_fetch_and_nand (type *ptr, type value, ...) 返回更新前的值 type __sync_add_and_fetch (type *ptr, type value, ...) type __sync_sub_and_fetch (type *ptr, type value, ...) type __sync_or_and_fetch (type *ptr, type value, ...) type __sync_and_and_fetch (type *ptr, type value, ...) type __sync_xor_and_fetch (type *ptr, type value, ...) type __sync_nand_and_fetch (type *ptr, type value, ...) 返回更新後的值 bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
__sync_fetch_and_add(&count, 1);//返回0, count現在等於1, 類似 count ++
count 初始值為0, 或者這麼寫
__sync_add_and_fetch(&count, 1);//返回1, count現在等於1, 類似 ++ count
原子操作也可以用來實現互斥鎖:
int a = 0; #define LOCK(a) while (__sync_lock_test_and_set(&a,1)) {sched_yield();} #define UNLOCK(a) __sync_lock_release(&a); sched_yield()這個函式可以使用另一個級別等於或高於當前執行緒的執行緒先執行。如果沒有符合條件的執行緒,那麼這個函式將會立刻返回然後繼續執行當前執行緒的程式。 如果去掉 sched_yield(), 這個鎖就會一直自旋. 下面我們利用原子操作來實現一個無鎖併發堆疊; struct Node{ void* data; Node* next Node(void* d):data(d),next(NULL){} }; class Stack{ public: Stack():top(NULL){} void Push(void* d); void* Pop(); private: Node *top; }; void Stack::Push(void* d){ Node* n = new Node(d); for (;;){ n->next = top; if (__sync_bool_compare_and_swap(&top, n->next, n)){ break; } } } 壓棧操作首先建立了一個新節點,它的 next 指標指向堆疊的頂部。然後用原子操作把新的節點複製到 top 位置。 從多個執行緒的角度來看,完全可能有兩個或更多執行緒同時試圖把資料壓入堆疊。假設執行緒 A 試圖把 pA 壓入堆疊,執行緒 B 試圖壓入 pB,執行緒 A 先獲得了時間片。在 n->next = top 指令結束之後,排程程式暫停了執行緒 A。現在,執行緒 B 獲得了時間片,它能夠完成原子操作,把 pB 壓入堆疊後結束。接下來,執行緒 A 恢復執行,顯然對於這個執行緒 *top 和 n->next 不匹配,因為執行緒 B 修改了 top 位置的內容。因此,程式碼回到迴圈的開頭,指向正確的 top 指標(執行緒 B 修改後的),呼叫原子操作,把 pA 壓入堆疊後結束。 void* Stack::Pop(){ for (;;){ Node* n = top; if (n == NULL){ return NULL; } if (top != NULL && __sync_bool_compare_and_swap(&top, n, n->next)){ void* p = n->data; delete n; return p; } } }出棧操作的原理和壓棧類似. 即使執行緒 B 線上程 A 試圖彈出資料的同時修改了堆疊頂,也可以確保不會跳過堆疊中的元素。