1. 程式人生 > >深入解析ConcurrentHashMap:感受併發程式設計智慧

深入解析ConcurrentHashMap:感受併發程式設計智慧

> - 如果有一個整型變數count,多個執行緒併發讓count自增1,你會怎麼設計? > - 你知道如何讓多個執行緒協作完成一件事件嗎? ## 前言 很高興遇見你~ ConcurrentHashMap是個老生常談的集合類了,我們都知道多執行緒環境下不能直接使用HashMap,而需要使用ConcurrentHashMap,但有沒有了解過ConcurrentHashMap到底是如何實現執行緒安全的呢?他到底跟傳統的Hashtable和SynchronizeMap(沒聽過SynchronizeMap?他就是Collections.synchronizeMap方法返回的物件)到底好在哪? ConcurrentHashMap建立在HashMap的基礎上實現了執行緒安全,關於HashMap讀者可以參考這篇文章:[深入剖析HashMap](https://juejin.cn/post/6902793228026642446),從散列表的三大要素:雜湊函式、雜湊衝突、擴容方案、以及執行緒安全展開詳解HashMap的設計。關於HashMap的內容本文不再贅述,讀者若對HashMap的底層設計不瞭解,一定要先去閱讀前面的文章。ConcurrentHashMap中蘊含的併發程式設計智慧是非常值得我們學習的,正如文章開頭的兩個問題,你會如何解決呢?可能會直接上鎖或用更高效能的CAS,但ConcurrentHashMap給了我們更不一樣的解決方案。 本文的主要內容是講解ConcurrentHashMap中的併發設計,重點分析ConcurrentHashMap的四個方法原始碼:`putVal`、`initTable`、`addCount`、`transfer`。分析每個方法前會使用圖解介紹ConcurrentHashMap的核心思路。原始碼中我加了非常詳細的註釋,有時間仍建議讀者閱讀完原始碼,ConcurrentHashMap的併發智慧,都蘊含在原始碼中。 那麼我們開始吧~ ## CAS與自旋鎖 CAS是ConcurrentHashMap中的一個重點,也是ConcurrentHashMap提升效能的根基所在。在閱讀原始碼中,可以發現CAS無處不在。在介紹ConcurrentHashMap前,必須先介紹一下這兩個重點。 Java中的運算並不是原子操作,如`count++`可分為: 1. 獲取count副本count_ 2. 對count_進行自增 3. 把count_賦值給count 如果在第一步之後,count被其他的執行緒修改了,第三步的賦值會直接覆蓋掉其他執行緒的修改。synchronize可以解決這個問題,但上鎖為重量級操作,嚴重影響效能,CAS是更好的解決方案。 CAS的思路並不複雜。還是上面的例子:當我們需要對變數count進行自增時,我們可以認為沒有發生併發衝突,先儲存一個count副本,再對count進行自增,然後把副本和count本身進行比較,如果兩者相同,則證明沒有發生併發衝突,修改count的值;如果不同,則說明count在我們自增的過程中被修改了,把上述整個過程重新來一遍,直到修改成功為止,如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/627ded5a97984637b347b6227af62b6e~tplv-k3u1fbpfcp-zoom-1.image) 那,如果我們在判斷count==count_之後,count被修改了怎麼辦?比較賦值的操作作業系統會保證的原子性,保證不會出現這種情況。在java中常見的CAS方法有: ```java // 比較並替換 U.compareAndSwapInt(); U.compareAndSwapLong(); U.compareAndSwapObject(); ``` 在後續的原始碼中,我們會經常看到他們。通過這種思路,我們不需要給count變數上鎖。但如果**併發度過高,處理時間過長,則會導致某些執行緒一直在迴圈自旋,浪費cpu資源**。 自旋鎖是利用CAS而設計的一種應用層面的鎖。如下程式碼: ```java // 0代表鎖釋放,1代表鎖被某個執行緒拿走了 int lock = 0; while(true){ if(lock==0){ int lock_ ; if(U.compareAndSwapInt(this,lock_,0,1)){ ... // 獲取鎖後的邏輯處理 // 最後釋放鎖 lock = 0; break; } } } ``` 上面就是很經典自旋鎖設計。判斷鎖是否被其他執行緒擁有,若沒有則嘗試使用CAS獲得鎖;前兩步失敗都會重新迴圈再次嘗試直到獲得鎖。最後邏輯處理完成要令`lock=0`來釋放鎖。衝突時間短的併發情景下這種方法可以大大提升效率。 CAS和自旋鎖在ConcurrentHashMap應用地非常廣泛,在原始碼中我們會經常看到他們的身影。同時這也是ConcurrentHashMap的設計核心所在。 ## ConcurrentHashMap的併發策略概述 Hashtable與SynchronizeMap採取的併發策略是對整個陣列物件加鎖,導致效能及其低下。jdk1.7之前,ConcurrentHashMap採用的是**鎖分段**策略來優化效能,如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/68acc68c3b5348488820c33775898196~tplv-k3u1fbpfcp-zoom-1.image) 相當於把整個陣列,拆分成多個小陣列。每次操作只需要鎖住操作的小陣列即可,不同的segment之間不互相影響,提高了效能。jdk1.8之後,對整個策略進行了重構:鎖的不是segment,而是節點,如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0f797e7579914354aeb4315d2f2e065e~tplv-k3u1fbpfcp-zoom-1.image) 鎖的粒度進一步被降低,併發的效率也提高了。jdk1.8做得優化不只是細化鎖粒度,還帶來了CAS+synchronize的設計。那麼下面,我們針對ConcurrentHashMap的常見方法:新增、刪除、擴容、初始化等進行詳解他的設計思路。 ## 新增資料:putVal() ConcurrentHashMap新增資料時,採取了CAS+synchronize結合策略。首先會判斷該節點是否為null,如果為null,嘗試使用CAS新增節點;如果新增失敗,說明發生了併發衝突,再對節點進行上鎖並插入資料。在併發較低的情景下無需加鎖,可以顯著提高效能。同時只會CAS嘗試一次,也不會造成執行緒長時間等待浪費cpu時間的情況。 ConcurrentHashMap的put方法**整體**流程如下(並不是全部流程): ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/a3cd949ae55948d4bff42f345a108f63~tplv-k3u1fbpfcp-zoom-1.image) 1. 首先會判斷陣列是否已經初始化,如若未初始化,會先去初始化陣列; 2. 如果當前要插入的節點為null,嘗試使用CAS插入資料; 3. 如果不為null,則判斷節點hash值是否為-1;-1表示陣列正在擴容,會先去協助擴容,再回來繼續插入資料。(協助擴容後面會講) 4. 最後會執行上鎖,並插入資料,最後判斷是否需要返回舊值;如果不是覆蓋舊值,需要更新map中的節點數,也就是圖中的addCount方法。 ConcurrentHashMap是基於HashMap改造的,其中的插入資料、hash演算法和HashMap都大同小異,這裡不再贅述。思路清晰之後,下面我們看原始碼分析: ```java final V putVal(K key, V value, boolean onlyIfAbsent) { // 不允許插入空值或空鍵 // 允許value空值會導致get方法返回null時有兩種情況: // 1. 找不到對應的key2. 找到了但是value為null; // 當get方法返回null時無法判斷是哪種情況,在併發環境下containsKey方法已不再可靠, // 需要返回null來表示查詢不到資料。允許key空值需要額外的邏輯處理,佔用了陣列空間,且並沒有多大的實用價值。 // HashMap支援鍵和值為null,但基於以上原因,ConcurrentHashMap是不支援空鍵值。 if (key == null || value == null) throw new NullPointerException(); // 高低位異或擾動hashcode,和HashMap類似 // 但有一點點不同,後面會講,這裡可以簡單認為一樣的就可以 int hash = spread(key.hashCode()); // bincount表示連結串列的節點數 int binCount = 0; // 嘗試多種方法迴圈處理,後續會有很多這種設計 for (Node[] tab = table;;) { Node f; int n, i, fh; // 情況一:如果陣列為空則進行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 情況二:目標下標物件為null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 重點:採用CAS進行插入 if (casTabAt(tab, i, null,new Node(hash, key, value, null))) break; } // 情況三:陣列正在擴容,幫忙遷移資料到新的陣列 // 同時會新陣列,下次迴圈就是插入到新的陣列 // 關於擴容的內容後面再講,這裡理解為正在擴容即可 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 情況四:直接對節點進行加鎖,插入資料 // 下面程式碼很多,但邏輯和HashMap插入資料大同小異 // 因為已經上鎖,不涉及併發安全設計 else { V oldVal = null; // 同步加鎖 synchronized (f) { // 重複檢查一下剛剛獲取的物件有沒有發生變化 if (tabAt(tab, i) == f) { // 連結串列處理情況 if (fh >= 0) { binCount = 1; // 迴圈連結串列 for (Node e = f;; ++binCount) { K ek; // 找到相同的則記錄舊值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 判斷是否需要更新數值 if (!onlyIfAbsent) e.val = value; break; } Node pred = e; // 若未找到則插在連結串列尾 if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } // 紅黑樹處理情況 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } // 判斷是否需要轉化為紅黑樹,和返回舊數值 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 總數+1;這是一個非常硬核的設計 // 這是ConcurrentHashMap設計中的一個重點,後面我們詳細說 addCount(1L, binCount); return null; } // 這個方法和HashMap static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } ``` 我們注意到原始碼中有兩個關鍵方法:初始化陣列的`initTable()`,修改map中節點總數的`addCount`。這兩個方法是如何實現執行緒安全的呢,我們繼續分析。 ## 初始化陣列:initTable() 初始化操作的重點是:**保證多個執行緒併發呼叫此方法,只能有一個執行緒成功**。ConcurrentHashMap採取了CAS+自旋的方法來解決併發問題,**整體**流程如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/553d76a939dc4ab8acbd4b0642b58d4a~tplv-k3u1fbpfcp-zoom-1.image) 1. 首先會判斷陣列是否為null,如果否說明另一個執行緒初始化結束了,直接返回該陣列; 2. 第二步判斷是否正在初始化,如果是會讓出cpu執行時間,當前執行緒自旋等待 3. 如果陣列為null,且沒有另外的執行緒正在初始化,那麼會嘗試獲取自旋鎖,獲取成功則進行初始化,獲取失敗則表示發生了併發衝突,繼續迴圈判斷。 ConcurrentHashMap並沒有直接採用上鎖的方式,而是採用CAS+自旋鎖的方式,提高了效能。自旋鎖保證了只有一個執行緒能真正初始化陣列,同時又無需承擔synchronize的高昂代價,一舉兩得。在看原始碼分析之前,我們先來了解一下ConcurrentHashMap中一個關鍵的變數:**sizeCtl** 。 `sizeCtl`預設為0,在正常情況下,他表示ConcurrentHashMap的閾值,是一個正數。當陣列正在擴容時,他的值為-1,表示當前正在初始化,其他執行緒只需要判斷`sizeCtl==-1` ,就知道當前陣列正在初始化。但當ConcurrentHashMap正在擴容時,sizeCtl是一個表示當前有多少個執行緒正在協助擴容的**負數** ,我們下面講到擴容時再分析。我們直接來看`initTable()`的原始碼分析: ```java private final Node[] initTable() { Node[] tab; int sc; // 這裡的迴圈是採用自旋的方式而不是上鎖來初始化 // 首先會判斷陣列是否為null或長度為0 // 沒有在建構函式中進行初始化,主要是涉及到懶載入的問題 while ((tab = table) == null || tab.length == 0) { // sizeCtl是一個非常關鍵的變數; // 預設為0,-1表示正在初始化,<-1表示有多少個執行緒正在幫助擴容,>0表示閾值 if ((sc = sizeCtl) < 0) Thread.yield(); // 讓出cpu執行時間 // 通過CAS設定sc為-1,表示獲得自選鎖 // 其他執行緒則無法進入初始化,進行自選等待 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 重複檢查是否為空 if ((tab = table) == null || tab.length == 0) { int n = (sc >
0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n]; table = tab = nt; // 設定sc為閾值,n>>>2表示1/4*n,也就相當於0.75n sc = n - (n >>> 2); } } finally { // 把sc賦值給sizeCtl sizeCtl = sc; } break; } } // 最後返回tab陣列 return tab; } ``` 下面我們繼續看一下`addCount()`方法如何實現併發安全。 ## 修改節點總數:addCount() addCount方法的目標很簡單,就是把ConcurrentHashMap的節點總數進行+1,也就是我在文章開頭提出的問題。ConcurrentHashMap的作者設計了一套非常嚴謹的架構來保證併發安全與高效能。 ConcurrentHashMap並不是一個單獨的size變數,他把size進行拆分,如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b7267df275974d75a550c6faad05b8ad~tplv-k3u1fbpfcp-zoom-1.image) 這樣ConcurrentHashMap的節點數size就等於這些拆分開的size1、size2...的總和。這樣拆分有什麼好處呢?好處就是每個執行緒可以單獨修改對應的變數。如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/770d690951e44c44aaee8a223b05a587~tplv-k3u1fbpfcp-zoom-1.image) 兩個執行緒可以同時進行自增操作,且完全沒有任何的效能消耗,是不是一個非常神奇的思路?而當需要獲取節點總數時,只需要把全部加起來即可。在ConcurrentHashMap中每個size被用一個CounterCell物件包裝著,CounterCell類很簡單: ```java static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } ``` 僅僅只是對value值使用volatile關鍵字進行修飾。不知道volatile關鍵字?可以參考這篇文章[一文搞懂 | Java中volatile關鍵字](https://juejin.cn/post/6894579052577816584),簡單來說就是保證當前執行緒對value的修改其他執行緒馬上可以知道。ConcurrentHashMap使用一個數組來儲存CounterCell,如下: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/88ec562d32ac43cba1007bcd1f25e1d1~tplv-k3u1fbpfcp-zoom-1.image) 那麼每個執行緒如何分配到對應的自己的CounterCell呢?ConcurrentHashMap中採用了類似HashMap的思路,獲取執行緒隨機數,再對這個隨機數進行取模得到對應的CounterCell。獲取到對應的CounterCell之後,當前執行緒會嘗試使用CAS進行修改,如果修改失敗,則重新獲取執行緒隨機數,換一個CounterCell再來一次,直到修改成功。 以上就是addCount方法的核心思路,但原始碼的設計會複雜一點,還必須考慮CounterCell陣列的初始化、CounterCell物件的建立、CounterCell陣列的擴容。ConcurrentHashMap還保留了一個basecount,每個執行緒會首先使用CAS嘗試修改basecount,如果修改失敗,才會下發到counterCell陣列中。整體的流程如下: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d26f6a46b3a24b61a54cb5a0d8f4e6fc~tplv-k3u1fbpfcp-zoom-1.image) 1. 當前執行緒首先會使用CAS修改basecount的值,修改失敗則進入陣列分配CounterCell修改; 2. 判斷CounterCell陣列是否為空, 1. 如果CounterCell陣列為空,則初始化陣列 2. 如果CounterCell陣列不為空,使用執行緒隨機數找到下標 1. 如果該下標的的counterCell物件還沒初始化,則先建立一個CounterCell,這一步在圖中我沒有標出來。建立了CounterCell之後還需要考慮是否需要陣列擴容 2. 如果counterCell物件不為null,使用CAS嘗試修改,失敗則重新來一次 3. 如果上面兩種情況都不滿足,則會回去再嘗試CAS修改一下basecount 看起來好像挺複雜,但只要抓住size變數分割成多個CounterCell這個核心概念即可,其他的步驟都是細節完善。我們可以看到整個思路完全沒有提到synchronize加鎖,ConcurrentHashMap的作者採用CAS+自旋鎖代替了synchronize,這使得在高併發情況下提升了非常大的效能。思路清晰之後,我們看原始碼也就簡單一些了。那接下來就 read the fucking code: ```java private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 如果陣列不為空 或者 陣列為空且直接更新basecount失敗 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 表示沒發生競爭 boolean uncontended = true; // 這裡有以下情況會進入fullAddCount方法: // 1. 陣列為null且直接修改basecount失敗 // 2. hash後的陣列下標CounterCell物件為null // 3. CAS修改CounterCell物件失敗 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 該方法保證完成更新,重點方法!! fullAddCount(x, uncontended); return; } // 如果長度<=1不需要擴容(說實話我覺得這裡有點奇怪) if (check <= 1) return; s = sumCount(); } if (check >= 0) { // 擴容相關邏輯,下面再講 } } ``` 前面原始碼嘗試直接修改basecount失敗後,就會進入fullAddCount方法: ```java private final void fullAddCount(long x, boolean wasUncontended) { int h; // 如果當前執行緒隨機數為0,強制初始化一個執行緒隨機數 // 這個隨機數的作用就類似於hashcode,不過他不需要被查詢 // 下面每次迴圈都重新獲取一個隨機數,不會讓執行緒都堵在同一個地方 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); // wasUncontended表示沒有競爭 // 如果為false表示之前CAS修改CounterCell失敗,需要重新獲取執行緒隨機數 wasUncontended = true; } // 直譯為碰撞,如果他為true,則表示需要進行擴容 boolean collide = false; // 下面分為三種大的情況: // 1. 陣列不為null,對應的子情況為CAS更新CounterCell失敗或者countCell物件為null // 2. 陣列為null,表示之前CAS更新baseCount失敗,需要初始化陣列 // 3. 第二步獲取不到鎖,再次嘗試CAS更新baseCount for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 第一種情況:陣列不為null if ((as = counterCells) != null && (n = as.length) > 0) { // 對應下標的CounterCell為null的情況 if ((a = as[(n - 1) & h]) == null) { // 判斷當前鎖是否被佔用 // cellsBusy是一個自旋鎖,0表示沒被佔用 if (cellsBusy == 0) { // 建立CounterCell物件 CounterCell r = new CounterCell(x); // 嘗試獲取鎖來新增一個新的CounterCell物件 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { CounterCell[] rs; int m, j; // recheck一次是否為null if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; // created=true表示建立成功 created = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 建立成功也就是+1成功,直接返回 if (created) break; // 拿到鎖後發現已經有別的執行緒插入資料了 // 繼續迴圈,重來一次 continue; } } // 到達這裡說明想建立一個物件,但是鎖被佔用 collide = false; } // 之前直接CAS改變CounterCell失敗,重新獲取執行緒隨機數,再迴圈一次 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 嘗試對CounterCell進行CAS else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // 如果發生過擴容或者長度已經達到虛擬機器最大可以核心數,直接認為無碰撞 // 因為已經無法再擴容了 // 所以併發執行緒數的理論最高值就是NCPU else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale // 如果上面都是false,說明發生了衝突,需要進行擴容 else if (!collide) collide = true; // 獲取自旋鎖,並進行擴容 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale // 擴大陣列為原來的2倍 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { // 釋放鎖 cellsBusy = 0; } collide = false; // 繼續迴圈 continue; } // 這一步是重新hash,找下一個CounterCell物件 // 上面每一步失敗都會來到這裡獲取一個新的隨機數 h = ThreadLocalRandom.advanceProbe(h); } // 第二種情況:陣列為null,嘗試獲取鎖來初始化陣列 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // recheck判斷陣列是否為null if (counterCells == as) { // 初始化陣列 CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 如果初始化完成,直接跳出迴圈, // 因為初始化過程中也包括了新建CounterCell物件 if (init) break; } // 第三種情況:陣列為null,但是拿不到鎖,意味著別的執行緒在新建陣列,嘗試直接更新baseCount else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) // 更新成功直接返回 break; } } ``` 原始碼的整體思路跟我們前面講的是差不多的,細節上使用了很多的CAS+自旋鎖來保證執行緒安全。上面的註釋非常詳細,這裡就不再贅述。當初閱讀原始碼看到這裡,不得不佩服ConcurrentHashMap作者,我們可能覺得一個CAS+synchronize就解決了,但是他卻想出了多執行緒同時更新的思路,配合CAS和自旋鎖,在高併發環境下極大提高了效能。 如果說把一個變數拆分成多個子變數,利用多執行緒協作是一個很神奇的思路,那麼多個執行緒同時協作完成擴容操作會不會更加神奇?ConcurrentHashMap不僅避開了併發的效能消耗,甚至利用上了併發的優勢,多個執行緒一起幫忙完成一件事。那接下來就來看看ConcurrentHashMap的擴容方案。 ## 擴容方案:transfer() 在講擴容之前,需要補充兩個知識點:siezeCtl和ForwardingNode。 sizeCtl在前面提到過,預設值為0,一般情況下表示ConcurrentHashMap的閾值,陣列初始化時值為-1,當陣列擴容時,表示為參與擴容的執行緒數。ConcurrentHashMap在擴容時把sizeCtl設定為一個很小的負數,並記住這個負數。執行緒參與擴容,該負數+1,執行緒退出該負數-1,這樣就可以記住執行緒數了。一個變數維護四個狀態,再次佩服ConcurrentHashMap的作者。 那這個負數設定為多少呢?有一個演算法。看擴容時sizeCtl的初始化程式碼: ```java int rs = resizeStamp(n);// 這裡n表示陣列的長度 sizeCtl = rs << RESIZE_STAMP_SHIFT +2 ; // RESIZE_STAMP_SHIFT是一個常量,值為16 static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); } ``` 我們一點一點來看這個演算法。 - `Integer.numberOfLeadingZeros(n)`這個方法表示獲取n最高位1前面0的數目,如8的32位二進位制為`00000000 0000000 00000000 00001000`。那麼返回就是28,前面有28個0。 - `RESIZE_STAMP_BITS-1`值為15,`1<>>RESIZE_STAMP_BITS == rs`就可以知道當前是否在擴容了。 然後再來看看`ForwardingNode`。看名字就知道他是一個節點類,他的作用是標記當前節點已經遷移完成。如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/fc02b3aec7c848149872539ef203c704~tplv-k3u1fbpfcp-zoom-1.image) ConcurrentHashMap會從後往前遍歷並遷移,已經遷移完成的節點會被賦值為ForwardingNode,表示該節點下的所有資料已經遷移完成。ForwardingNode和普通的節點相似,但他的hash值為`MOVED`,也就是-1。還記得前面putVal嗎?在插入的時候會判斷當前節點是否是ForwardingNode,如果是則先幫忙遷移;否則如果正在擴容,說明擴容工作還沒到達當前下標,那麼可以直接插入。 瞭解完sizeCtl和ForwardingNode,那麼就來看看ConcurrentHashMap的擴容方案。ConcurrentHashMap的擴容是多個執行緒協同工作的,提高了效率,如下圖: ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/fe487512ad0640a8882d7d86e4beaadc~tplv-k3u1fbpfcp-zoom-1.image) ConcurrentHashMap把整個陣列進行分段,每個執行緒負責一段。bound表示該執行緒範圍的下限,i表示當前正在遷移的下標。每一個遷移完成的節點都會被賦值ForwardingNode,表示遷移完成。stride表示執行緒遷移的“步幅”,當執行緒完成範圍內的任務後,就會繼續往前看看還有沒有需要遷移的,transferIndex就是記錄下個需要遷移的下標;當transferIndex==0時則表示不需要幫忙了。這就是ConcurrentHashMap擴容方案的核心思路了 。保證執行緒安全的思路和前面介紹的方法大同小異,都是通過 CAS+自旋鎖+synchronize來實現的。 另外ConcurrentHashMap遷移連結串列與二叉樹的思路與HashMap略有不同,這裡就不展開講了,瞭解了HashMap看ConcurrentHashMap的原始碼很容易理解他的思路,也是大同小異。擴容方案就不打算畫整體流程圖了,只要瞭解核心思路,其他都是細節的邏輯控制。我們直接來看原始碼分析。 首先要看到addCount方法,這個方法我們前面介紹過他自增的邏輯,但是下半部分擴容的邏輯我們沒有介紹,現在來看一下: ```java private final void addCount(long x, int check) { ... // 總數+1邏輯 // 這部分的邏輯主要是判斷是否需要擴容 // 同時保證只有一個執行緒能夠建立新的陣列 // 其他的執行緒只能輔助遷移資料 if (check >= 0) { Node[] tab, nt; int n, sc; // 當長度達到閾值且長度並未達到最大值時進行下一步擴容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 這個數配合後續的sizeCtr計算 // 他的格式是第16位肯定為1,低15位表示n前面連續的0個數,我們前面介紹過 int rs = resizeStamp(n); // 小於0表示正在擴容或者正在初始化,否則進入下一步搶佔鎖進行建立新陣列 if (sc < 0) { // 如果正在遷移右移16位後一定等於rs // ( sc == rs + 1 ||sc == rs + MAX_RESIZERS)這兩個條件我認為不可能為true // 有興趣可以點選下方網站檢視 // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427 // nextTable==null說明下個數組還未建立 // transferIndex<=0說明遷移已經夠完成了 // 符合以上情況的重新迴圈自旋 if ((sc >
>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 幫忙遷移,sc+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 搶佔鎖進行擴容 // 對rs檢驗碼進行左移16位再+2,這部分我們在上面介紹過 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 搶佔自旋鎖成功,進行擴容 transfer(tab, null); // 更新節點總數,繼續迴圈 s = sumCount(); } } } ``` 上面的方法重點是利用sizeCtl充當自旋鎖,保證只有一個現場能建立新的陣列,而其他的執行緒只能協助遷移陣列。那麼下面的方法就是擴容方案的重點方法: ```java // 這裡的兩個引數:tab表示舊陣列,nextTab表示新陣列 // 建立新陣列的執行緒nextTab==null,其他的執行緒nextTab等於第一個執行緒建立的陣列 private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; // stride表示每次前進的步幅,最低是16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果新的陣列還未建立,則建立新陣列 // 只有一個執行緒能進行建立陣列 if (nextTab == null) { try { @SuppressWarnings("unchecked") // 擴充套件為原陣列的兩倍 Node[] nt = (Node[])new Node[n << 1]; nextTab = nt; } catch (Throwable ex) { // 擴容失敗出現OOM,直接把閾值改成最大值 sizeCtl = Integer.MAX_VALUE; return; } // 更改concurrentHashMap的內部變數nextTable nextTable = nextTab; // 遷移的起始值為陣列長度 transferIndex = n; } int nextn = nextTab.length; // 標誌節點,每個遷移完成的陣列下標都會設定為這個節點 ForwardingNode fwd = new ForwardingNode(nextTab); // advance表示當前執行緒是否要前進 // finish表示遷移是否結束 // 官方的註釋表示在賦值為true之前,必須再重新掃描一次確保遷移完成,後面會講到 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab // i表示當前執行緒遷移資料的下標,bound表示下限,從後往前遷移 for (int i = 0, bound = 0;;) { Node f; int fh; // 這個迴圈主要是判斷是否需要前進,如果需要則CAS更改下個bound和i while (advance) { int nextIndex, nextBound; // 如果還未到達下限或者已經結束了,advance=false if (--i >= bound || finishing) advance = false; // 每一輪迴圈更新transferIndex的下標 // 如果下一個下標是0,表示已經無需繼續前進 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 利用CAS更改bound和i繼續前進遷移資料 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex >
stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // i已經達到邊界,說明當前執行緒的任務已經完成,無需繼續前進 // 如果是第一個執行緒需要更新table引用 // 協助的執行緒需要將sizeCtl減一再退出 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 如果已經更新完成,則更新table引用 if (finishing) { nextTable = null; table = nextTab; // 同時更新sizeCtl為閾值 sizeCtl = (n << 1) - (n >>> 1); return; } // 執行緒完成自己的遷移任務,將sizeCtl減一 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 這裡sc-2不等於校驗碼,說明此執行緒不是最後一個執行緒,還有其他執行緒正在擴容 // 那麼就直接返回,他任務已經完成了 // 最後一個執行緒需要重新把整個陣列再掃描一次,看看有沒有遺留的 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // finish設定為true表示已經完成 // 這裡把i設定為n,重新把整個陣列掃描一次 finishing = advance = true; i = n; // recheck before commit } } // 如果當前節點為null,表示遷移完成,設定為標誌節點 else if ((f = tabAt(tab, i)) == null) // 這裡的設定有可能會失敗,所以不能直接設定advance為true,需要再迴圈 advance = casTabAt(tab, i, null, fwd); // 當前節點是ForwardingNode,表示遷移完成,繼續前進 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 給頭節點加鎖,進行遷移 // 加鎖後下面的內容就不涉及併發控制細節了,就是純粹的資料遷移 // 思路和HashMap差不多,但也有一些不同,多了一個lastRun // 讀者可以閱讀一下下面原始碼,這部分比較容易理解 synchronized (f) { // 上鎖之後再判斷一次看該節點是否還是原來那個節點 // 如果不是則重新迴圈 if (tabAt(tab, i) == f) { Node ln, hn; // hash值大於等於0表示該節點是普通連結串列節點 if (fh >= 0) { int runBit = fh & n; Node lastRun = f; // ConcurrentHashMap並不是直接把整個連結串列分為兩個 // 而是先把尾部遷移到相同位置的一段先拿出來 // 例如該節點遷移後的位置可能為 1或5 ,而連結串列的情況是: // 1 -> 5 -> 1 -> 5 -> 5 -> 5 // 那麼concurrentHashMap會先把最後的三個5拿出來,lastRun指標指向倒數第三個5 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 判斷尾部整體遷移到哪個位置 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 這個node節點是改造過的 // 相當於使用頭插法插入到連結串列中 // 這裡的頭插法不須擔心連結串列環,因為已經加鎖了 if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } // 連結串列構造完成,把連結串列賦值給陣列 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 設定標誌物件,表示遷移完成 setTabAt(tab, i, fwd); advance = true; } // 樹節點的處理,和連結串列思路相同,不過他沒有lastRun,直接分為兩個連結串列,採用尾插法 else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } ``` 擴容是一個相對重量級的操作,他需要建立一個新的陣列再把原來的節點一個個搬過去,在高併發環境下,如果直接對整個表上鎖,會有很多執行緒被阻塞。而ConcurrentHashMap的設計使得多個執行緒可協同完成擴容操作,甚至擴容的同時還可以進行資料的讀取與插入,極大提高了效率。和前面的拆分size變數有異曲同工之妙:**利用多執行緒協同工作來提高效率** 。 關於擴容還有另外一個方法:`helpTransfer`。顧名思義,就是幫忙擴容,在putVal方法中,遇到ForwardingNode物件會呼叫此方法。看完前面的原始碼,這部分的原始碼就簡單多了,no bb,show the code: ```java final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; // 判斷當前節點為ForwardingNode,且已經建立新的陣列 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode)f).nextTable) != null) { int rs = resizeStamp(tab.length); // sizeCtl<0表示還在擴容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 校驗是否已經擴容完成或者已經推進到0,則不需要幫忙擴容 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 嘗試讓讓sc+1並幫忙擴容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } // 返回擴容之後的陣列 return nextTab; } // 若陣列尚未初始化或節點非ForwardingNode,返回原陣列 return table; } ``` 到此擴容方案的原始碼就分析完畢了。擴容方案的思路雖然簡單,但是需要有大量的邏輯控制來保證執行緒安全,所以原始碼量也非常多。關於ConcurrentHashMap的核心方法已經都分析完畢了,其他的如`remove`、`replace`等思路都和上面講過的大同小異,讀者可自行閱讀原始碼。 ## 最後 到這裡,關於concurrentHashMap的內容就基本講完了。以後跟面試官吹水,就不只是一句ConcurrentHashMap是安全的就沒有下文了。ConcurrentHashMap優秀的CAS+自旋鎖+synchronize併發設計,是整個框架的重點所在。 看完ConcurrentHashMap的原始碼有什麼用?當然是面試要問啊!《java程式設計思想》中提到,對於併發問題,如果不是專家,老老實實上個鎖,不要整這些花裡胡哨的。從ConcurrentHashMap的原始碼我們可以得知併發的問題,遠遠沒有我們想的那麼簡單,他是一個非常複雜的問題。學習ConcurrentHashMap,也並不是要學他寫一樣的程式碼,除了面試,我想更重要的一點是感受程式設計的智慧。ConcurrentHashMap作者神奇的設計、嚴謹的程式碼,讓我們得以擁有在併發環境下安全且高效能的ConcurrentHashMap可以使用。他的思想是,如果能在實際實踐中運用到一點點,都是莫大的收穫了。 現在,文章開頭的兩個問題,有答案了嗎? 希望文章對你有幫助~ > 全文到此,原創不易,覺得有幫助可以點贊收藏評論轉發。 > 筆者才疏學淺,有任何想法歡迎評論區交流指正。 > 如需轉載請評論區或私信交流。 > > 另外歡迎光臨筆者的個人部落格:[傳送門](https://qwerhuan.gitee.io)