1. 程式人生 > 其它 >基於無鎖的C#併發佇列實現

基於無鎖的C#併發佇列實現

https://www.cnblogs.com/liaofan/archive/2008/11/20/1337888.html

https://blog.csdn.net/kuangben2000/article/details/105219688

非原創,轉自 以上

最近開始學習無鎖程式設計,和傳統的基於Lock的演算法相比,無鎖程式設計具有其獨特的優點,Angel Lucifer的關於無鎖程式設計一文對此有詳細的描述。

無鎖程式設計的目標是在不使用Lock的前提下保證併發過程中共享資料的一致性,其主要的實現基礎是CAS操作,也就是compare_and_swap,通過處理器提供的指令,可以原子地更新共享資料,並同時監測其他執行緒的干擾,.Net中的對應實現是InterLocked.CompareExchange函式。

既然不使用Lock,那在無鎖程式設計中要時刻注意的是,程式碼可能在任意語句中被中斷。如果是單個變數,我們可以使用 InterLocked.XXX 保證操作的原子性,但是如果有多個操作要完成的話,簡單地組合 InterLocked.XXX 是遠遠不夠的。通常的原則是對函式中用到的共享變數,先在程式碼開始處用區域性變數儲存它的內容,在後面更新共享變數時,使用前述變數來判斷其是否發生了改變,如果共享變數發生了改變,那麼我們可能需要重試,或者在某些可能的情況下,當前執行緒可以"幫助"其他更新中的執行緒完成更新。

從上面可以總結出無鎖演算法的兩個基本特徵:

1. 無鎖演算法總是包含一個迴圈結構,以保證更新失敗後重試

2. 無鎖演算法在更新共享變數時,總是使用CAS和原始值進行比較,以保證沒有衝突

下面是按照Michael-Scott演算法實現的併發佇列,其中的Dequeue演算法在IBM的非阻塞演算法一文中有詳細介紹。程式碼如下:


1publicclassConcurrentLinkedQueue<T>
2{
3privateclassNode<K>
4{
5internalKItem;
6internalNode<K>Next;
7
8publicNode(Kitem,Node<K>next)
9{
10this.Item=item;
11this.Next=next;
12}
13}
14
15privateNode<T>_head;
16privateNode<T>_tail;
17
18publicConcurrentLinkedQueue()
19{
20_head=newNode<T>(default(T),null);
21_tail=_head;
22}
23
24publicboolIsEmpty
25{
26get{return(_head.Next==null);}
27}
28
29publicvoidEnqueue(Titem)
30{
31Node<T>newNode=newNode<T>(item,null);
32while(true)
33{
34Node<T>curTail=_tail;
35Node<T>residue=curTail.Next;
36
37//判斷_tail是否被其他process改變
38if(curTail==_tail)
39{
40//A有其他process執行C成功,_tail應該指向新的節點
41if(residue==null)
42{
43//C如果其他process改變了tail.next節點,需要重新取新的tail節點
44if(Interlocked.CompareExchange<Node<T>>(
45refcurTail.Next,newNode,residue)==residue)
46{
47//D嘗試修改tail
48Interlocked.CompareExchange<Node<T>>(ref_tail,newNode,curTail);
49return;
50}
51}
52else
53{
54//B幫助其他執行緒完成D操作
55Interlocked.CompareExchange<Node<T>>(ref_tail,residue,curTail);
56}
57}
58}
59}
60
61publicboolTryDequeue(outTresult)
62{
63Node<T>curHead;
64Node<T>curTail;
65Node<T>next;
66do
67{
68curHead=_head;
69curTail=_tail;
70next=curHead.Next;
71if(curHead==_head)
72{
73if(next==null)//Queue為空
74{
75result=default(T);
76returnfalse;
77}
78if(curHead==curTail)//Queue處於Enqueue第一個node的過程中
79{
80//嘗試幫助其他Process完成操作
81Interlocked.CompareExchange<Node<T>>(ref_tail,next,curTail);
82}
83else
84{
85//取next.Item必須放到CAS之前
86result=next.Item;
87//如果_head沒有發生改變,則將_head指向next並退出
88if(Interlocked.CompareExchange<Node<T>>(ref_head,
89next,curHead)==curHead)
90break;
91}
92}
93}
94while(true);
95returntrue;
96}
97}
98

根據自己的測試(雙核CPU),在輕度和中度爭用情況下,無鎖演算法比基於鎖的演算法效能好很多,在爭用非常嚴重的情況下(100個併發執行緒以上/每CPU),基於鎖的演算法效能開始顯示出優勢,因為一旦發生爭用,基於鎖的演算法會立刻切換到其他執行緒,而無鎖演算法會進入下一次迴圈,導致CPU的佔用。但是如此嚴重的爭用在實際中並不多見,並且可以採用SpinWait的方法加以改進。基於鎖的演算法在測試中曾經出現過類似死鎖的現象,無鎖演算法則完全沒有出過類似問題,另外,處理器核心越多,基於鎖的演算法效率越差。

從上面的演算法實現中,可以體會到無鎖演算法的優勢:在併發的多個執行緒中,總是有執行緒能夠推進,演算法總能在有限的迴圈次數內完成,並且在某些衝突的情況下,一個執行緒可以“幫助”其他執行緒完成被中斷的工作,這些對提高吞吐量都有很大的作用。