1. 程式人生 > >[一起讀原始碼]走進C#併發佇列ConcurrentQueue的內部世界

[一起讀原始碼]走進C#併發佇列ConcurrentQueue的內部世界

決定從這篇文章開始,開一個讀原始碼系列,不限制平臺語言或工具,任何自己感興趣的都會寫。前幾天碰到一個小問題又讀了一遍ConcurrentQueue的原始碼,那就拿C#中比較常用的併發佇列ConcurrentQueue作為開篇來聊一聊它的實現原理。 話不多說,直奔主題。 > 要提前說明下的是,本文解析的原始碼是基於.NET Framework 4.8版本,地址是:https://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs 本來是打算用.NET Core版本的,但是找了一下竟然沒找到:https://github.com/dotnet/runtime/tree/master/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent 不知道是我找錯位置了還是咋回事,有知道的大佬告知一下。不過我覺得實現原理應該類似吧,後面找到了我對比一下,不同的話再寫一篇來分析。
### 帶著問題出發 如果是自己實現一個簡單的佇列功能,我們該如何設計它的儲存結構呢?一般來說有這兩種方式:陣列或者連結串列,先來簡單分析下。 我們都知道,陣列是固定空間的集合,意味著初始化的時候要指定陣列大小,但是佇列的長度是隨時變化的,超出陣列大小了怎麼辦?這時候就必須要對陣列進行擴容。問題又來了,擴容要擴多少呢,少了不夠用多了浪費記憶體空間。與之相反的,連結串列是動態空間型別的資料結構,元素之間通過指標相連,不需要提前分配空間,需要多少分配多少。但隨之而來的問題是,大量的出隊入隊操作伴隨著大量物件的建立銷燬,GC的壓力又變得非常大。 事實上,在C#的普通佇列`Queue`型別中選擇使用陣列進行實現,它實現了一套擴容機制,這裡不再詳細描述,有興趣的直接看[原始碼](https://referencesource.microsoft.com/#System/compmod/system/collections/generic/queue.cs),比較簡單。 回到主題,要實現一個高效能的執行緒安全佇列,我們試著回答以下問題: - 儲存結構是怎樣的 - 如何初始化(初始容量給多少比較好?) - 常用操作(入隊出隊)如何實現 - 執行緒安全是如何保證的
### 儲存結構 通過原始碼可以看到`ConcurrentQueue`採用了陣列+連結串列的組合模式,充分吸收了2種結構的優點。 具體來說,它的總體結構是一個連結串列,連結串列的每個節點是一個包含陣列的特殊物件,我們稱之為**Segment**(段或節,原話是`a queue is a linked list of small arrays, each node is called a segment.`),它裡面的陣列是儲存真實資料的地方,容量固定大小是32,每一個Segment有指向下一個Segment的的指標,以此形成連結串列結構。而佇列中維護了2個特殊的指標,他們分別指向佇列的首段(head segment)和尾段(tail segment),他們對入隊和出隊有著重要的作用。用一張圖來解釋佇列的內部結構: ![construction](https://imgkr.cn-bj.ufileos.com/f5a591f9-d964-4add-8be4-9df15ec663e0.png) >
嗯,畫圖畫到這裡突然聯想到,搞成雙向連結串列的話是不是就神似B+樹的葉子節點?技術就是這麼奇妙~ 段的核心定義為: ```c# /// /// private class for ConcurrentQueue. /// 連結串列節點(段) /// private class Segment { //實際儲存資料的容器 internal volatile T[] m_array; //儲存對應位置資料的狀態,當資料的對應狀態位標記為true時該資料才是有效的 internal volatile VolatileBool[] m_state; //下一段的指標 private volatile Segment m_next; //當前段在佇列中的索引 internal readonly long m_index; //兩個位置指標 private volatile int m_low; private volatile int m_high; //所屬的佇列例項 private volatile ConcurrentQueue m_source; } ``` 佇列的核心定義為: ```c# /// /// 執行緒安全的先進先出集合, /// public class ConcurrentQueue : IProducerConsumerCollection, IReadOnlyCollection { //首段 [NonSerialized] private volatile Segment m_head; //尾段 [NonSerialized] private volatile Segment m_tail; //每一段的大小 private const int SEGMENT_SIZE = 32; //擷取快照的運算元量 [NonSerialized] internal volatile int m_numSnapshotTakers = 0; } ```
### 常規操作 先從初始化一個佇列開始看起。 #### 建立佇列例項 與普通`Queue`不同的是,`ConcurrentQueue`不再支援初始化時指定佇列大小(capacity),僅僅提供一個無參建構函式和一個`IEnumerable`引數的建構函式。 ```c# /// /// Initializes a new instance of the class. /// public ConcurrentQueue() { m_head = m_tail = new Segment(0, this); } ``` 無參建構函式很簡單,建立了一個Segment例項並把首尾指標都指向它,此時佇列只包含一個Segment,它的索引是0,佇列容量是32。 繼續看一下Segment是如何被初始化的: ```c# /// /// Create and initialize a segment with the specified index. /// internal Segment(long index, ConcurrentQueue source) { m_array = new T[SEGMENT_SIZE]; m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false m_high = -1; Contract.Assert(index >= 0); m_index = index; m_source = source; } ``` Segment只提供了一個建構函式,接受的引數分別是佇列索引和佇列例項,它建立了一個長度為32的陣列,並建立了與之對應的狀態陣列,然後初始化了位置指標(m_low=0,m_high=-1,此時表示一個空的Segment)。 到這裡,一個併發佇列就建立好了。 > 使用集合建立佇列的過程和上面類似,只是多了兩個步驟:入隊和擴容,下面會重點描述這兩部分所以這裡不再過多介紹。
#### 元素入隊 先亮出原始碼: ```c# /// /// Adds an object to the end of the . /// /// The object to add to the end of the . The value can be a null reference /// (Nothing in Visual Basic) for reference types. /// public void Enqueue(T item) { SpinWait spin = new SpinWait(); while (true) { Segment tail = m_tail; if (tail.TryAppend(item)) return; spin.SpinOnce(); } } ``` 通過原始碼可以看到,入隊操作是在隊尾(m_tail)進行的,它嘗試在最後一個Segment中追加指定的元素,如果成功了就直接返回,失敗的話就自旋等待,直到成功為止。那什麼情況下會失敗呢?這就要繼續看看是如何追加元素的: ```c# internal bool TryAppend(T value) { //先判斷一下高位指標有沒有達到陣列邊界(也就是陣列是否裝滿了) if (m_high >= SEGMENT_SIZE - 1) { return false; } int newhigh = SEGMENT_SIZE; try { } finally { //使用原子操作讓高位指標加1 newhigh = Interlocked.Increment(ref m_high); //如果陣列還有空位 if (newhigh <= SEGMENT_SIZE - 1) { //把資料放到陣列中,同時更新狀態 m_array[newhigh] = value; m_state[newhigh].m_value = true; } //陣列滿了要觸發擴容 if (newhigh == SEGMENT_SIZE - 1) { Grow(); } } return newhigh <= SEGMENT_SIZE - 1; } ``` **所以,只有當尾段m_tail裝滿的情況下追加元素才會失敗,這時候必須要等待下一個段產生,也就是擴容(細細品一下Grow這個詞真的很妙),自旋就是在等擴容完成才能有地方放資料。而在儲存資料的時候,通過原子自增操作保證了同一個位置只會有一個數據被寫入,從而實現了執行緒安全。** > 注意:這裡的裝滿並不是指陣列每個位置都有資料,而是指最後一個位置已被使用。 繼續看一下擴容是怎麼一個過程: ```c# /// /// Create a new segment and append to the current one /// Update the m_tail pointer /// This method is called when there is no contention /// internal void Grow() { //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow m_next = newSegment; Contract.Assert(m_source.m_tail == this); m_source.m_tail = m_next; } ``` **在普通佇列中,擴容是通過建立一個更大的陣列然後把資料拷貝過去實現擴容的,這個操作比較耗時。而在併發佇列中就非常簡單了,首先建立一個新Segment,然後把當前Segment的next指向它,最後掛到佇列的末尾去就可以了,全部是指標操作非常高效。**而且從程式碼註釋中可以看到,這裡不會出現執行緒競爭的情況,因為其他執行緒都因為位置不夠被阻塞都在自旋等待中。
#### 元素出隊 還是先亮出原始碼: ```c# public bool TryDequeue(out T result) { while (!IsEmpty) { Segment head = m_head; if (head.TryRemove(out result)) return true; //since method IsEmpty spins, we don't need to spin in the while loop } result = default(T); return false; } ``` 可以看到只有在佇列不為空(IsEmpty==false)的情況下才會嘗試出隊操作,而出隊是在首段上進行操作的。關於如何判斷佇列是否為空總結就一句話:**當首段m_head不包含任何資料且沒有下一段的時候佇列才為空**,詳細的判斷過程原始碼註釋中寫的很清楚,限於篇幅不詳細介紹。 出隊的本質是從首段中移除低位指標所指向的元素,看一下具體實現步驟: ```c# internal bool TryRemove(out T result) { SpinWait spin = new SpinWait(); int lowLocal = Low, highLocal = High; //判斷當前段是否為空 while (lowLocal <= highLocal) { //判斷低位指標位置是否可以移除 if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) { SpinWait spinLocal = new SpinWait(); //判斷元素是否有效 while (!m_state[lowLocal].m_value) { spinLocal.SpinOnce(); } //取出元素 result = m_array[lowLocal]; //釋放引用關係 if (m_source.m_numSnapshotTakers <= 0) { m_array[lowLocal] = default(T); } //判斷當前段的元素是否全部被移除了,要丟棄它 if (lowLocal + 1 >= SEGMENT_SIZE) { spinLocal = new SpinWait(); while (m_next == null) { spinLocal.SpinOnce(); } Contract.Assert(m_source.m_head == this); m_source.m_head = m_next; } return true; } else { //執行緒競爭失敗,自旋等待並重置 spin.SpinOnce(); lowLocal = Low; highLocal = High; } }//end of while result = default(T); return false; } ``` **首先,只有當前Segment不為空的情況下才嘗試移除元素,否則就直接返回false。然後通過一個原子操作`Interlocked.CompareExchange`判斷當前低位指標上是否有其他執行緒同時也在移除,如果有那就進入自旋等待,沒有的話就從這個位置取出元素並把低位指標往前推進一位。如果當前佇列沒有正在進行擷取快照的操作,那取出元素後還要把這個位置給釋放掉。當這個Segment的所有元素都被移除掉了,這時候要把它丟棄,簡單來說就是讓佇列的首段指標指向它的下一段即可,丟棄的這一段等著GC來收拾它。** 這裡稍微提一下Interlocked.CompareExchange,它的意思是比較和交換,也就是更為大家所熟悉的CAS(Compare-and-Swap),它主要做了以下2件事情: - 比較m_low和lowLocal的值是否相等 - 如果相等則m_low=lowLocal+1,如果不相等就什麼都不做,不管是否相等,始終返回m_low的原始值 整個操作是原子性的,對CPU而言就是一條指令,這樣就可以保證當前位置只有一個執行緒執行出隊操作。 > 還有一個`TryPeek()`方法和出隊類似,它是從隊首獲取一個元素但是無需移除該元素,可以看做Dequeue的簡化版,不再詳細介紹。
#### 獲取佇列中元素的數量 與普通`Queue`不同的是,`ConcurrentQueue`並沒有維護一個表示佇列中元素個數的計數器,那就意味著要得到這個數量必須實時去計算。我們看一下計算過程: ```c# public int Count { get { Segment head, tail; int headLow, tailHigh; GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) { return tailHigh - headLow + 1; } int count = SEGMENT_SIZE - headLow; count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1)); count += tailHigh + 1; return count; } } ``` 大致思路是,先計算(GetHeadTailPositions)出首段的低位指標和尾段的高位指標,這中間的總長度就是我們要的數量,然後分成3節依次累加每一個Segment包含的元素個數得到最終的佇列長度,可以看到這是一個開銷比較大的操作。 **正因為如此,微軟官方推薦使用`IsEmpty`屬性來判斷佇列是否為空,而不是使用佇列長度`Count==0`來判斷,使用`ConcurrentStack`也是一樣。**
#### 擷取快照(take snapshot) 所謂的take snapshot就是指一些格式轉換的操作,例如`ToArray()`、`ToList()`、`GetEnumerator()`這種型別的方法。在前面佇列的核心定義中我們提到有一個`m_numSnapshotTakers`欄位,這時候就派上用場了。下面以比較典型的`ToList()`原始碼舉例說明: ```c# private List ToList() { // Increments the number of active snapshot takers. This increment must happen before the snapshot is // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. Interlocked.Increment(ref m_numSnapshotTakers); List list = new List(); try { Segment head, tail; int headLow, tailHigh; GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) { head.AddToList(list, headLow, tailHigh); } else { head.AddToList(list, headLow, SEGMENT_SIZE - 1); Segment curr = head.Next; while (curr != tail) { curr.AddToList(list, 0, SEGMENT_SIZE - 1); curr = curr.Next; } tail.AddToList(list, 0, tailHigh); } } finally { // This Decrement must happen after copying is over. Interlocked.Decrement(ref m_numSnapshotTakers); } return list; } ``` 可以看到,ToList的邏輯和Count非常相似,都是先計算出兩個首尾位置指標,然後把佇列分為3節依次遍歷處理,最大的不同之處在於方法的開頭和結尾分別對`m_numSnapshotTakers`做了一個原子操作。 在方法的第一行,使用`Interlocked.Increment`做了一次遞增,這時候表示佇列正在進行一次擷取快照操作,在處理完後又在finally中用`Interlocked.Decrement`做了一次遞減表示當前操作已完成,這樣確保了在進行快照時不被出隊影響。感覺這塊很難描述的特別好,所以保留了原始的英文註釋,大家慢慢體會。 到這裡,基本把ConcurrentQueue的核心說清楚了。
### 總結一下 回到文章開頭提出的幾個問題,現在應該有了很清晰的答案: - 儲存結構 -- 採用陣列和連結串列的組合形式 - 如何初始化 -- 建立固定大小的段,無需指定初始容量 - 常用操作如何實現 -- 尾段入隊,首段出隊 - 執行緒安全問題 -- 使用SpinWait自旋等待和原子操作實現 以上所述均是個人理解,如果有錯誤的地方還請不吝指正,以免誤導他人。 推薦相關閱讀,篇篇都是乾貨:https://www.cnblogs.com/lucifer1982/category/126755.html