1. 程式人生 > >C++ 無鎖佇列 ABA

C++ 無鎖佇列 ABA

實驗環境:vs2013  新建一個無stdafx.h預編譯頭的控制檯程式,然後複製以下程式碼

1、連結串列實現無鎖佇列

2、陣列實現無鎖佇列

1、連結串列

注意: Enqueue函式中有使用new分配記憶體,本人在windows下使用VS2013編譯,這裡的new是執行緒安全的?並且如果換成LINUX或者其它系統的其它編譯器,則可能由於new庫不支援多執行緒而導致new不是執行緒安全的。因此,移植到其它系統時可以把在new前後加鎖或者把new動作放在Enqueue函式外面,函式引數加一個引數即可!!

#include <stdio.h>
#include <process.h>
#include <windows.h>
#include <vector>
#include <iostream>
using namespace std;

#define CAS(a,b,c) (InterlockedCompareExchangePointer((PVOID*)a,(PVOID)c,(PVOID)b) == b)

struct Node
{
	void *data;
	Node *next;
};
struct Queue
{
	Node *head;
	Node *tail;
};

//初始化:最開始需要有一個空的節點
void InitQueue(Queue *&queue)
{
	Node *node = new Node;
	node->next = nullptr;
	queue = new Queue;
	queue->head = queue->tail = node;
}

void UnInitQueue(Queue *queue)
{
	if(queue != nullptr)
	{
		Node* node = queue->head;
		Node* next = nullptr;
		while(node != nullptr)
		{
			next = node->next;
			delete node;
			node = next;	
		}
		delete queue;
	}
}

void Enqueue(Queue *queue,void *data)
{
	Node *node, *tail, *next;
	node = new Node;
	node->data = data;
	node->next = nullptr;

	while (true)
	{
		tail = queue->tail;
		next = tail->next;

		if (queue->tail != tail)
			continue;

		if (next != nullptr)
		{
			CAS(&queue->tail, tail, next);
			continue;
		}

		if (CAS(&tail->next, nullptr, node))
			break;
	}
	CAS(&queue->tail, tail, node);
}

void* Dequeue(Queue *queue)
{
	Node *head, *tail, *next;
	void* data = nullptr;

	while (true)
	{
		head = queue->head;
		tail = queue->tail;
		next = head->next;

		if (queue->head != head)
			continue;

		if (next == nullptr)
			return nullptr;

		if (head == tail)
		{
			CAS(&queue->tail,tail,next);
			continue;
		}
		//多消費者,next記憶體可能已經釋放,next->data程式可能崩潰,單消費者,則不可能存在已經釋放的情況
		data = next->data;
		//存在ABA問題
		if (CAS(&queue->head, head, next))
			break;
	}
	delete head;
	return data;
}
/////////////////////////////////////測試程式碼///////////////////////////////////////////////////
struct Test
{
	int id;   //執行緒索引
	int num;  //每個執行緒生產元素個數
	Queue *queue;
	HANDLE event;
	int count;//每個消費者執行緒消費的個數
};
unsigned int __stdcall EnqueueThread(void *data)
{
	Test *test = (Test*)data;
	Queue *queue = test->queue;
	int num = test->num;
	int start = num * (test->id+1);
	int end = start - num;
	::SetEvent(test->event);

	while (start > end)
	{
		int *a = new int;
		*a = start;
		Enqueue(queue, a);
		start--;
	}
	return 0;
}

unsigned int __stdcall DequeueThread(void *data)
{
	Test *test = (Test*)data;
	Queue *queue = test->queue;
	int id = test->id;
	int &count = test->count;
	::SetEvent(test->event);
	int null_num = 0,num = 0;
	while (true)
	{
		int * ptr = (int*)Dequeue(queue);
		if (ptr == nullptr)
			null_num++;
		else
		{
			delete ptr;ptr = nullptr;
			num++;
		}
		if (null_num == 5000000) break;
	}
	Sleep(100);
	cout << "該消費者執行緒索引=" << id << ",退出時消費元素個數為 num=" << num << " null_num=" << null_num << endl;
	count = num;
	return 0;
}

void test()
{
	Queue *queue = nullptr;
	InitQueue(queue);
	HANDLE hThreadEvent = ::CreateEvent(NULL, 0, 0, NULL);

	Test test;
	test.queue = queue;
	test.event = hThreadEvent;
	test.num = 10000;    //單個生產者執行緒生產的元素數量
	test.count = 0;

	int i = 0;
	int num1 = 2;  //生產執行緒數
	int num2 = 3;  //消費執行緒數
	cout << "共有" << num1 << "個生產者執行緒, " << " 共生產出元素個數為:" << num1*test.num << endl;
	for (; i < num1; i++)
	{
		Test t = test;
		t.id = i;
		_beginthreadex(NULL, 0, EnqueueThread, &t, 0, NULL);
		::WaitForSingleObject(hThreadEvent, INFINITE);
		::ResetEvent(hThreadEvent);
	}

	cout << "共有" << num2 << "個消費者執行緒" << endl;
	vector<Test*> vecTest;
	for (int j = i, k = 0; j < i + num2; j++, k++)
	{
		Test *t = new Test;
		*t = test;
		t->id = j;
		vecTest.push_back(t);
		_beginthreadex(NULL, 0, DequeueThread, vecTest[k], 0, NULL);
		::WaitForSingleObject(hThreadEvent, INFINITE);
		::ResetEvent(hThreadEvent);
	}

	cout << "稍等..." << endl;
	Sleep(test.num / 5);
	int sum = 0;
	for (int j = 0; j < vecTest.size(); j++)
	{
		sum += vecTest[j]->count;
	}
	cout << "所有消費者執行緒共消耗的元素為:" << sum << endl;
	cout << endl;
	cout << "請檢視生成的元素是否剛好等於消耗的元素!!!如果相等調整null_num值再試試!" << endl;

	//釋放資源
	for (auto it = vecTest.begin(); it != vecTest.end(); it ++) 
	{
		if (NULL != *it) 
		{
			delete *it; 
			*it = NULL;
		}
	}
	vecTest.clear();
	UnInitQueue(queue);
	CloseHandle(hThreadEvent);
}

int main()
{
	//當程式崩潰時,即是遇到了ABA問題,增大i的值,更有機會出現ABA
	//發生ABA可導致佇列資料丟失等,此時程式不一定崩潰,但在本程式中
	//程式崩潰一定是遇到了ABA
	int num = 1000;
	for (int i = 0; i < num; i++)
	{
		cout << "--------------共"<<num<<"次迴圈---------第【 " << i + 1 << " 】次迴圈.-------------------" << endl;
		cout << endl;
		test();
		cout << endl;
	}


	Sleep(30000);
	system("pause");
	return 0;
}

ABA問題:

舉個更生活化的例子:

土豪拿了一個裝滿錢的 Hermes 黑色錢包去酒吧喝酒,將錢包放到吧檯上後,轉頭和旁邊的朋友聊天,小偷趁土豪轉頭之際拿起錢包,將錢包裡的錢取出來並放入餐巾紙保持錢包厚度,然後放回原處,小偷很有職業道德,只偷錢不偷身份證,土豪轉過頭後發現錢包還在,並且還是他熟悉的 Hermes 黑色錢包,厚度也沒什麼變化,所以土豪認為什麼都沒發生,繼續和朋友聊天,直到結賬時發現錢包中的錢已經被調包成餐巾紙。

所以,我覺得 ABA 問題還可以被俗稱為 "調包問題"。那麼怎麼解決 "調包問題" 呢?土豪開始想辦法了。

土豪想的第一個辦法是,找根繩子將錢包綁在手臂上,要開啟錢包就得先把繩子割斷,割繩子就會被發現。這種做法實際上就是 

Load-Link/Store-Conditional (LL/SC) 架構中所做的工作。

土豪想的另一個辦法是,在錢包上安個顯示屏,每次開啟錢包顯示屏上的數字都會 +1,這樣當土豪在轉頭之前可以先記錄下顯示屏上的數字,在轉過頭後可以確認數字是否有變化,也就知道錢包是否被開啟過。這種做法實際上就是 x86/64 架構中 Double-Word CAS Tagging 所做的工作。

土豪還擔心小偷下次會不會買一個一模一樣的錢包,直接調包整個錢包,這樣連銀行卡和身份證都丟了怎麼辦,土豪決定買一個宇宙獨一無二的錢包,除非把它給燒了,否則就不會再有相同的錢包出現。這種做法實際上就是 Garbage Collection (GC)

 所做的工作。



經過以上測試,大多數時候都正常,但偶爾會遇到ABA問題,這也是無鎖佇列的難點所在,因此以上程式碼時候問題的。

解決辦法:

1、 初始化的時候分配若干記憶體,即記憶體池,入隊時不需要new,而是從記憶體池中取出一塊記憶體,由於出隊時不會delete掉記憶體,所以不會出現ABA問題,但是記憶體池的大小應該多大?這也是需要考慮的問

2、使用Hazard指標,即自己實現一個垃圾回收的機制

3、

使用double-CAS(雙保險的CAS),例如,在32位系統上,我們要檢查64位的內容

1)一次用CAS檢查雙倍長度的值,前半部是指標,後半部分是一個計數器。

2)只有這兩個都一樣,才算通過檢查,要吧賦新的值。並把計數器累加1。

這樣一來,ABA發生時,雖然值一樣,但是計數器就不一樣(但是在32位的系統上,這個計數器會溢位回來又從1開始的,這還是會有ABA的問題)

當然,我們這個佇列的問題就是不想讓那個記憶體重用,這樣明確的業務問題比較好解決,論文《Implementing Lock-Free Queues》給出一這麼一個方法——使用結點記憶體引用計數refcnt

SafeRead(q)
loop:
p q^:next
if p = NULL then
return p
Fetch&Add(p^:refct; 1)
if p = q^:next then
return p
else
Release(p)
goto loop
end


其中的 Fetch&Add和Release分是是加引用計數和減引用計數,都是原子操作,這樣就可以阻止記憶體被回收了。

更多知識可參考

http://www.cnblogs.com/catch/p/3176636.html

http://www.kuqin.com/algorithm/20120907/330193.html

http://www.lxway.com/4462682141.htm

http://blog.csdn.net/pongba/article/details/589864

http://blog.csdn.net/oygg2008/article/details/4524094

解決ABA問題,程式碼請見下一篇文章 點選開啟連結

2、用陣列實現無鎖佇列

使用陣列來實現佇列是很常見的方法,因為沒有記憶體的分部和釋放,一切都會變得簡單,實現的思路如下:

1)陣列佇列應該是一個ring buffer形式的陣列(環形陣列)

2)陣列的元素應該有三個可能的值:HEAD,TAIL,EMPTY(當然,還有實際的資料)

3)陣列一開始全部初始化成EMPTY,有兩個相鄰的元素要初始化成HEAD和TAIL,這代表空佇列。

4)EnQueue操作。假設資料x要入佇列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),則說明佇列滿了。

5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),並把x返回。同樣需要注意,如果x是TAIL,則說明佇列為空。

演算法的一個關鍵是——如何定位HEAD或TAIL?

1)我們可以宣告兩個計數器,一個用來計數EnQueue的次數,一個用來計數DeQueue的次數。

2)這兩個計算器使用使用Fetch&ADD來進行原子累加,在EnQueue或DeQueue完成的時候累加就好了。

3)累加後求個模什麼的就可以知道TAIL和HEAD的位置了。

如下圖所示: