C++ 無鎖佇列 ABA
實驗環境:vs2013 新建一個無stdafx.h預編譯頭的控制檯程式,然後複製以下程式碼
1、連結串列實現無鎖佇列
2、陣列實現無鎖佇列
1、連結串列
注意: Enqueue函式中有使用new分配記憶體,本人在windows下使用VS2013編譯,這裡的new是執行緒安全的?並且如果換成LINUX或者其它系統的其它編譯器,則可能由於new庫不支援多執行緒而導致new不是執行緒安全的。因此,移植到其它系統時可以把在new前後加鎖或者把new動作放在Enqueue函式外面,函式引數加一個引數即可!!
#include <stdio.h> #include <process.h> #include <windows.h> #include <vector> #include <iostream> using namespace std; #define CAS(a,b,c) (InterlockedCompareExchangePointer((PVOID*)a,(PVOID)c,(PVOID)b) == b) struct Node { void *data; Node *next; }; struct Queue { Node *head; Node *tail; }; //初始化:最開始需要有一個空的節點 void InitQueue(Queue *&queue) { Node *node = new Node; node->next = nullptr; queue = new Queue; queue->head = queue->tail = node; } void UnInitQueue(Queue *queue) { if(queue != nullptr) { Node* node = queue->head; Node* next = nullptr; while(node != nullptr) { next = node->next; delete node; node = next; } delete queue; } } void Enqueue(Queue *queue,void *data) { Node *node, *tail, *next; node = new Node; node->data = data; node->next = nullptr; while (true) { tail = queue->tail; next = tail->next; if (queue->tail != tail) continue; if (next != nullptr) { CAS(&queue->tail, tail, next); continue; } if (CAS(&tail->next, nullptr, node)) break; } CAS(&queue->tail, tail, node); } void* Dequeue(Queue *queue) { Node *head, *tail, *next; void* data = nullptr; while (true) { head = queue->head; tail = queue->tail; next = head->next; if (queue->head != head) continue; if (next == nullptr) return nullptr; if (head == tail) { CAS(&queue->tail,tail,next); continue; } //多消費者,next記憶體可能已經釋放,next->data程式可能崩潰,單消費者,則不可能存在已經釋放的情況 data = next->data; //存在ABA問題 if (CAS(&queue->head, head, next)) break; } delete head; return data; } /////////////////////////////////////測試程式碼/////////////////////////////////////////////////// struct Test { int id; //執行緒索引 int num; //每個執行緒生產元素個數 Queue *queue; HANDLE event; int count;//每個消費者執行緒消費的個數 }; unsigned int __stdcall EnqueueThread(void *data) { Test *test = (Test*)data; Queue *queue = test->queue; int num = test->num; int start = num * (test->id+1); int end = start - num; ::SetEvent(test->event); while (start > end) { int *a = new int; *a = start; Enqueue(queue, a); start--; } return 0; } unsigned int __stdcall DequeueThread(void *data) { Test *test = (Test*)data; Queue *queue = test->queue; int id = test->id; int &count = test->count; ::SetEvent(test->event); int null_num = 0,num = 0; while (true) { int * ptr = (int*)Dequeue(queue); if (ptr == nullptr) null_num++; else { delete ptr;ptr = nullptr; num++; } if (null_num == 5000000) break; } Sleep(100); cout << "該消費者執行緒索引=" << id << ",退出時消費元素個數為 num=" << num << " null_num=" << null_num << endl; count = num; return 0; } void test() { Queue *queue = nullptr; InitQueue(queue); HANDLE hThreadEvent = ::CreateEvent(NULL, 0, 0, NULL); Test test; test.queue = queue; test.event = hThreadEvent; test.num = 10000; //單個生產者執行緒生產的元素數量 test.count = 0; int i = 0; int num1 = 2; //生產執行緒數 int num2 = 3; //消費執行緒數 cout << "共有" << num1 << "個生產者執行緒, " << " 共生產出元素個數為:" << num1*test.num << endl; for (; i < num1; i++) { Test t = test; t.id = i; _beginthreadex(NULL, 0, EnqueueThread, &t, 0, NULL); ::WaitForSingleObject(hThreadEvent, INFINITE); ::ResetEvent(hThreadEvent); } cout << "共有" << num2 << "個消費者執行緒" << endl; vector<Test*> vecTest; for (int j = i, k = 0; j < i + num2; j++, k++) { Test *t = new Test; *t = test; t->id = j; vecTest.push_back(t); _beginthreadex(NULL, 0, DequeueThread, vecTest[k], 0, NULL); ::WaitForSingleObject(hThreadEvent, INFINITE); ::ResetEvent(hThreadEvent); } cout << "稍等..." << endl; Sleep(test.num / 5); int sum = 0; for (int j = 0; j < vecTest.size(); j++) { sum += vecTest[j]->count; } cout << "所有消費者執行緒共消耗的元素為:" << sum << endl; cout << endl; cout << "請檢視生成的元素是否剛好等於消耗的元素!!!如果相等調整null_num值再試試!" << endl; //釋放資源 for (auto it = vecTest.begin(); it != vecTest.end(); it ++) { if (NULL != *it) { delete *it; *it = NULL; } } vecTest.clear(); UnInitQueue(queue); CloseHandle(hThreadEvent); } int main() { //當程式崩潰時,即是遇到了ABA問題,增大i的值,更有機會出現ABA //發生ABA可導致佇列資料丟失等,此時程式不一定崩潰,但在本程式中 //程式崩潰一定是遇到了ABA int num = 1000; for (int i = 0; i < num; i++) { cout << "--------------共"<<num<<"次迴圈---------第【 " << i + 1 << " 】次迴圈.-------------------" << endl; cout << endl; test(); cout << endl; } Sleep(30000); system("pause"); return 0; }
ABA問題:
舉個更生活化的例子:
土豪拿了一個裝滿錢的 Hermes 黑色錢包去酒吧喝酒,將錢包放到吧檯上後,轉頭和旁邊的朋友聊天,小偷趁土豪轉頭之際拿起錢包,將錢包裡的錢取出來並放入餐巾紙保持錢包厚度,然後放回原處,小偷很有職業道德,只偷錢不偷身份證,土豪轉過頭後發現錢包還在,並且還是他熟悉的 Hermes 黑色錢包,厚度也沒什麼變化,所以土豪認為什麼都沒發生,繼續和朋友聊天,直到結賬時發現錢包中的錢已經被調包成餐巾紙。
所以,我覺得 ABA 問題還可以被俗稱為 "調包問題"。那麼怎麼解決 "調包問題" 呢?土豪開始想辦法了。
土豪想的第一個辦法是,找根繩子將錢包綁在手臂上,要開啟錢包就得先把繩子割斷,割繩子就會被發現。這種做法實際上就是
土豪想的另一個辦法是,在錢包上安個顯示屏,每次開啟錢包顯示屏上的數字都會 +1,這樣當土豪在轉頭之前可以先記錄下顯示屏上的數字,在轉過頭後可以確認數字是否有變化,也就知道錢包是否被開啟過。這種做法實際上就是 x86/64 架構中 Double-Word CAS Tagging 所做的工作。
土豪還擔心小偷下次會不會買一個一模一樣的錢包,直接調包整個錢包,這樣連銀行卡和身份證都丟了怎麼辦,土豪決定買一個宇宙獨一無二的錢包,除非把它給燒了,否則就不會再有相同的錢包出現。這種做法實際上就是 Garbage Collection (GC)
經過以上測試,大多數時候都正常,但偶爾會遇到ABA問題,這也是無鎖佇列的難點所在,因此以上程式碼時候問題的。
解決辦法:
1、 初始化的時候分配若干記憶體,即記憶體池,入隊時不需要new,而是從記憶體池中取出一塊記憶體,由於出隊時不會delete掉記憶體,所以不會出現ABA問題,但是記憶體池的大小應該多大?這也是需要考慮的問
題
2、使用Hazard指標,即自己實現一個垃圾回收的機制
3、
使用double-CAS(雙保險的CAS),例如,在32位系統上,我們要檢查64位的內容
1)一次用CAS檢查雙倍長度的值,前半部是指標,後半部分是一個計數器。
2)只有這兩個都一樣,才算通過檢查,要吧賦新的值。並把計數器累加1。
這樣一來,ABA發生時,雖然值一樣,但是計數器就不一樣(但是在32位的系統上,這個計數器會溢位回來又從1開始的,這還是會有ABA的問題)
當然,我們這個佇列的問題就是不想讓那個記憶體重用,這樣明確的業務問題比較好解決,論文《Implementing Lock-Free Queues》給出一這麼一個方法——使用結點記憶體引用計數refcnt!
SafeRead(q)
loop:
p q^:next
if p = NULL then
return p
Fetch&Add(p^:refct; 1)
if p = q^:next then
return p
else
Release(p)
goto loop
end
其中的 Fetch&Add和Release分是是加引用計數和減引用計數,都是原子操作,這樣就可以阻止記憶體被回收了。
更多知識可參考
http://www.cnblogs.com/catch/p/3176636.html
http://www.kuqin.com/algorithm/20120907/330193.html
http://www.lxway.com/4462682141.htm
http://blog.csdn.net/pongba/article/details/589864
http://blog.csdn.net/oygg2008/article/details/4524094
解決ABA問題,程式碼請見下一篇文章 點選開啟連結
2、用陣列實現無鎖佇列
使用陣列來實現佇列是很常見的方法,因為沒有記憶體的分部和釋放,一切都會變得簡單,實現的思路如下:
1)陣列佇列應該是一個ring buffer形式的陣列(環形陣列)
2)陣列的元素應該有三個可能的值:HEAD,TAIL,EMPTY(當然,還有實際的資料)
3)陣列一開始全部初始化成EMPTY,有兩個相鄰的元素要初始化成HEAD和TAIL,這代表空佇列。
4)EnQueue操作。假設資料x要入佇列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),則說明佇列滿了。
5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),並把x返回。同樣需要注意,如果x是TAIL,則說明佇列為空。
演算法的一個關鍵是——如何定位HEAD或TAIL?
1)我們可以宣告兩個計數器,一個用來計數EnQueue的次數,一個用來計數DeQueue的次數。
2)這兩個計算器使用使用Fetch&ADD來進行原子累加,在EnQueue或DeQueue完成的時候累加就好了。
3)累加後求個模什麼的就可以知道TAIL和HEAD的位置了。
如下圖所示: