[.net 多執行緒]ConcurrentBag原始碼分析
阿新 • • 發佈:2018-11-15
ConcurrentBag根據操作執行緒,對不同執行緒分配不同的佇列進行資料操作。這樣,每個佇列只有一個執行緒在操作,不會發生併發問題。其內部實現運用了net4.0新加入的ThreadLocal執行緒本地儲存功能。各個佇列間通過連結串列維護。
其內部結構如下:
1、獲取執行緒本地佇列:
1 /// <summary> 2 /// 獲取當前執行緒的佇列 3 /// </summary> 4 /// <param name="forceCreate">如果執行緒沒有持有佇列,是否新建</param> 5 /// <returns></returns>獲取當前執行緒持有的佇列6 private ThreadLocalList<T> GetThreadList(bool forceCreate) 7 { 8 //嘗試獲取執行緒本地佇列列表(參考ThreadLocal),此處的m_locals不同執行緒持有不同例項 9 //如果獲取為空,則說明執行緒是第一次執行此函式,需要分配一個佇列 10 ThreadLocalList<T> unownedList = this.m_locals.Value; 11 if (unownedList != null) 12 { 13 return unownedList;14 } 15 if (forceCreate) 16 { 17 //獲取當前本地佇列鎖,防止在凍結佇列時產生衝突(參考FreezeBag函式) 18 object globalListsLock = this.GlobalListsLock; 19 lock (globalListsLock) 20 { 21 //獲取本地佇列 22 //如果沒有建立過佇列,則建立一個新的佇列;否則儘量分配已有的執行緒終止的佇列 23 if (this.m_headList == null) 24 { 25 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 26 this.m_headList = unownedList; 27 this.m_tailList = unownedList; 28 } 29 else 30 { 31 //獲取無主佇列,不分配新佇列 32 unownedList = this.GetUnownedList(); 33 if (unownedList == null) 34 { 35 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 36 this.m_tailList.m_nextList = unownedList; 37 this.m_tailList = unownedList; 38 } 39 } 40 this.m_locals.Value = unownedList; 41 return unownedList; 42 } 43 } 44 return null; 45 }
2、獲取無主佇列
1 /// <summary> 2 /// 獲取無主佇列 3 /// 如果當前佇列的持有執行緒已經終止,則為無主佇列 4 /// </summary> 5 /// <returns></returns> 6 private ThreadLocalList<T> GetUnownedList() 7 { 8 for (ThreadLocalList<T> list = this.m_headList; list != null; list = list.m_nextList) 9 { 10 if (list.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) 11 { 12 list.m_ownerThread = Thread.CurrentThread; 13 return list; 14 } 15 } 16 return null; 17 }獲取無主佇列
3、插入操作程式碼分析
1 /// <summary> 2 /// 向Bag新增元素 3 /// </summary> 4 /// <param name="item"></param> 5 6 [__DynamicallyInvokable] 7 public void Add(T item) 8 { 9 //獲取當前執行緒持有的佇列 10 ThreadLocalList<T> threadList = this.GetThreadList(true); 11 //向當前持有佇列新增資料 12 this.AddInternal(threadList, item); 13 } 14 15 /// <summary> 16 /// 向佇列新增資料 17 /// </summary> 18 /// <param name="list"></param> 19 /// <param name="item"></param> 20 private void AddInternal(ThreadLocalList<T> list, T item) 21 { 22 bool lockTaken = false; 23 try 24 { 25 //CAS原子操作,設定標誌位,與Steal和Freeze實現互斥 26 Interlocked.Exchange(ref list.m_currentOp, 1); 27 //如果m_needSync,則說明已經發起凍結操作,需要加鎖保證執行緒安全 28 if ((list.Count < 2) || this.m_needSync) 29 { 30 list.m_currentOp = 0; 31 Monitor.Enter(list, ref lockTaken); 32 } 33 list.Add(item, lockTaken); 34 } 35 finally 36 { 37 list.m_currentOp = 0; 38 if (lockTaken) 39 { 40 Monitor.Exit(list); 41 } 42 } 43 }插入操作
4、凍結Bag函式
1 /// <summary> 2 /// 凍結Bag,不能進行增,刪,獲取操作 3 /// </summary> 4 /// <param name="lockTaken"></param> 5 private void FreezeBag(ref bool lockTaken) 6 { 7 //獲取當前執行緒list鎖 8 Monitor.Enter(this.GlobalListsLock, ref lockTaken); 9 //設定同步標誌位,增,刪,獲取操作識別此標誌位,只有獲取鎖才能執行 10 this.m_needSync = true; 11 //獲取所有list的鎖 12 this.AcquireAllLocks(); 13 //等待所有操作執行完成 14 this.WaitAllOperations(); 15 }凍結bag
5、轉化成陣列
1 /// <summary> 2 /// 轉化為陣列 3 /// </summary> 4 /// <returns></returns> 5 [__DynamicallyInvokable] 6 public T[] ToArray() 7 { 8 T[] localArray; 9 //沒有資料返回空陣列 10 if (this.m_headList == null) 11 { 12 return new T[0]; 13 } 14 bool lockTaken = false; 15 try 16 { 17 //凍結bag 18 this.FreezeBag(ref lockTaken); 19 //轉化成List後直接轉成Array 20 localArray = this.ToList().ToArray(); 21 } 22 finally 23 { 24 this.UnfreezeBag(lockTaken); 25 } 26 return localArray; 27 } 28 29 /// <summary> 30 /// 轉化成list 31 /// </summary> 32 /// <returns></returns> 33 private List<T> ToList() 34 { 35 List<T> list = new List<T>(); 36 //獲取所有list,遍歷生成副本 37 for (ThreadLocalList<T> list2 = this.m_headList; list2 != null; list2 = list2.m_nextList) 38 { 39 for (Node<T> node = list2.m_head; node != null; node = node.m_next) 40 { 41 list.Add(node.m_value); 42 } 43 } 44 return list; 45 }轉化成陣列