ConcurrentHashMap原始碼解析 JDK8
一、簡介
上篇文章詳細介紹了HashMap的原始碼及原理,本文趁熱打鐵繼續分析ConcurrentHashMap的原理。
首先在看本文之前,希望對HashMap有一個詳細的瞭解。不然看直接看ConcurrentHashMap的原始碼還是有些費勁的。
相信對HashMap,HashTable有一定了解,應該知道HashMap是不具備執行緒安全性的,在resize時會丟資料(JDK8),而HashTable雖然保證了執行緒安全性,但是其是通過給每個方法加Synchronized關鍵字達到的同步目的。但是都知道Synchronized在競爭激烈的多執行緒併發環境中,在效能上的表現是非常不如人意的。那在高併發環境中HashMap如何保證執行緒安全而又不浪費太多效能呢?答案就是Java J.U.C併發包中的ConcurrentHashMap。
依然開局一張圖。JDK8中的ConcurrentHashMap資料結構。
呃呵,和HashMap的結構是一樣的,沒錯在資料結構層面,ConcurrentHashMap和HashMap是完全一樣的。有了這個基礎繼續往下看。
二、歷史版本
ConcurrentHashMap的歷史版本大致分界線在JDK8。也就是可以分為JDK8和JDK8以前版本。
資料結構的區別
在JDK8之前HashMap沒有引入紅黑樹,同樣的ConcurrentHashMap也沒有引入紅黑樹。而且ConcurrentHashMap採用的是分段陣列的底層資料結構。
在JDK7中的資料結構。
從上圖我們不難看出其在資料結構方面的差別。
鎖的區別
JDK7中為了提高併發效能採用了這種分段的設計。所以在JDK7中ConcurrentHashMap採用的是分段鎖,也就是在每個Segment上加ReentrantLock實現的執行緒安全線。關於ReetrantLock後面有時間會介紹,大致來說ReetrantLoack是比Synchronized更細粒度的一種鎖。使用得當的話其效能要比Synchronized表現要好,但是如果實現不得當容易造成死鎖。
這種基於Segment和ReetrantLock的設計相對HashTable來說大大提高了併發效能。也就是說多個執行緒可以併發的操作多個Segment,而HashTable是通過給每個方法加Synchronized即將多執行緒序列而實現的。所以在一定程度上提高了併發效能。但是這種效能的提升表現相對JDK8來說顯得不值一提。
如果說JDK7 ConcurrentHashMap相對HashTable來說是序列到多個執行緒併發的改進。而JDK8則是通過比Segment更細粒度的併發控制大大提高了其並發表現。
JDK8中ConcurrentHashMap採用的是CAS+Synchronized鎖並且鎖粒度是每一個桶。簡單來說JDK7中鎖的粒度是Segment,JDK8鎖粒度細化到了桶級別。可想而知鎖粒度是大大提到了。輔之以程式碼的優化,JDK8中的ConcurrentHashMap在效能上的表現非常優秀。
簡單總結一下,從HashTable到JDK7 ConcurrentHashMap再到JDK8 ConcurrentHashMap。是從同步到併發再到高併發的進步。
三、基礎知識
3.1、常量
//正在擴容,對應fwd型別的節點的hash static final int MOVED = -1; // hash for forwarding nodes //當前陣列 transient volatile Node<K,V>[] table; //擴容時用到的,擴容後的陣列。 private transient volatile Node<K,V>[] nextTable; //1,大於零,表示size * 0.75。 //2,等於-1,表示正在初始化。 //3,-(n + 1),表示正在執行擴容的執行緒其只表示基數,而不是真正的數量,需要計算得出的哦 private transient volatile int sizeCtl;
3.2、Unsafe類方法
1 @SuppressWarnings("unchecked") //transient volatile Node<K,V>[] table; tab變數確實是volatile 2 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {//獲取table中索引 i 處的元素。 3 return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);//如果tab是volatile變數,則該方法保證其可見性。 4 } 5 6 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,//通過CAS設定table索引為 i 處的元素。 7 Node<K,V> c, Node<K,V> v) { 8 return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); 9 } 10 //transient volatile Node<K,V>[] table; tab變數確實是volatile 11 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {//修改table 索引 i 處的元素。 12 U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);//如果tab是volatile變數,則該方法保證其可見性。 13 }
我們不難看出 以上三個方法都是呼叫的Unsafe(U)類中的方法,Unsafe類中定義了大量對記憶體的操作方法,是native的,不建議開發者直接使用。
tabAt和setTabAt最終呼叫的兩個方法分別是 U.getObjectVolatile()和U.putObjectVolatile 顧名思義其是通過volatile保證的tab的可見性(Volatile只保證可見性不保證原子性哦)。前提是tab變數是Volatile修飾的變數。我們通過呼叫棧,最紅可以看到其實tab就是ConcurrentHashMap中的table。而這個變數是這麼定義的。
transient volatile Node<K,V>[] table;
可見其確實是Volatile修飾的變數。
再看
casTabAt方法,這個就是CAS方法了。
CAS:Compare and Swap三個單詞的縮寫,即:比較交換的意思。CAS在Java中又稱之為樂觀鎖即我們總認為是沒有鎖的。
while(true){ CAS(); }
一般的通過上述用法達到自旋的目的。CAS一般通過自旋達到自旋鎖的目的,即認為沒有鎖,失敗重試,這種思路。更多內容請自行百度。CAS很重要哦。
四、put過程原始碼
1 public V put(K key, V value) { 2 return putVal(key, value, false); 3 } 4 5 /** Implementation for put and putIfAbsent */ 6 final V putVal(K key, V value, boolean onlyIfAbsent) { 7 if (key == null || value == null) throw new NullPointerException(); 8 int hash = spread(key.hashCode());//hash,對hashcode再雜湊 9 int binCount = 0; 10 for (Node<K,V>[] tab = table;;) {//迭代桶陣列,自旋 11 Node<K,V> f; int n, i, fh; 12 if (tab == null || (n = tab.length) == 0)//懶載入。如果為空,則進行初始化 13 tab = initTable();//初始化桶陣列 14 //(n - 1) & hash)計算下標,取值,為空即無hash碰撞 15 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 16 if (casTabAt(tab, i, null, 17 new Node<K,V>(hash, key, value, null)))//通過cas插入新值 18 break; // no lock when adding to empty bin 19 } 20 //判斷是否正在擴容。如果正在擴容,當前執行緒幫助進行擴容。 21 //每個執行緒只能同時負責一個桶上的資料遷移,並且不影響其它桶的put和get操作。 22 //(很牛逼的思路,能這麼做建立在更細粒度的鎖基礎上) 23 else if ((fh = f.hash) == MOVED) 24 tab = helpTransfer(tab, f); 25 else {//put5,存在hash碰撞 26 V oldVal = null; 27 //此處,f在上面已經被賦值,f為當前下標桶的首元素。對連結串列來說是連結串列頭對紅黑樹來說是紅黑樹的頭元素。 28 synchronized (f) { 29 //再次檢查當前節點是否有變化,有變化進入下一輪自旋 30 //為什麼再次檢查?因為不能保證,當前執行緒到這裡,有沒有其他執行緒對該節點進行修改 31 if (tabAt(tab, i) == f) { 32 if (fh >= 0) {//當前桶為連結串列 33 binCount = 1; 34 for (Node<K,V> e = f;; ++binCount) {//迭代連結串列節點 35 K ek; 36 if (e.hash == hash &&//key相同,覆蓋(onlyIfAbsent有什麼用?) 37 ((ek = e.key) == key || 38 (ek != null && key.equals(ek)))) { 39 oldVal = e.val; 40 if (!onlyIfAbsent) 41 e.val = value; 42 break; 43 } 44 Node<K,V> pred = e; 45 //找到連結串列尾部,插入新節點。(什麼這裡不用CAS?因為這在同步程式碼塊裡面) 46 if ((e = e.next) == null) { 47 pred.next = new Node<K,V>(hash, key, 48 value, null); 49 break; 50 } 51 } 52 } 53 else if (f instanceof TreeBin) {//當前桶為紅黑樹 54 Node<K,V> p; 55 binCount = 2; 56 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 57 value)) != null) {//想紅黑樹插入新節點 58 oldVal = p.val; 59 if (!onlyIfAbsent) 60 p.val = value; 61 } 62 } 63 } 64 } 65 if (binCount != 0) { 66 //樹化。binCount > 8,進行樹化,連結串列轉紅黑樹 67 if (binCount >= TREEIFY_THRESHOLD) 68 //如果容量 < 64則直接進行擴容;不轉紅黑樹。 69 //(你想想,假如容量為16,你就插入了9個元素,巧了,都在同一個桶裡面, 70 //如果這時進行樹化,時間複雜度會增加,效能下降,不如直接進行擴容,空間換時間) 71 treeifyBin(tab, i); 72 if (oldVal != null) 73 return oldVal; 74 break; 75 } 76 } 77 } 78 addCount(1L, binCount);//擴容。addCount內部會進行判斷要不要擴容 79 return null; 80 }
總結以上過程
1,懶載入,未初始化則初始化table 2,hash,hashcode再雜湊,並計算下標 3,無碰撞,通過CAS插入 4,有碰撞 4.1、如果正在擴容,協助其它執行緒去擴容 4.2、如果是連結串列,插入連結串列 4.3、如果是紅黑樹,插入紅黑樹 4.4、如果連結串列長度超過8,樹化 4.5,如果key已經存在,覆蓋舊值 5,需要擴容,則擴容
相比HashMap過程多了一個協助擴容。
以上原始碼需要注意的是
1 for (Node<K,V>[] tab = table;;) {//迭代桶陣列,自旋 2 3 }
這是一個自旋的過程,如果CAS修改失敗會進入下一輪自旋。很久以前看這段原始碼的時候,我總是在想,CAS失敗了不就丟資料了嗎?所以這個自旋,也稱之為自旋鎖會保證資料一定能插入成功。
說說上面鎖競爭的情況,以上過程我們不難發現對table的修改都是通過CAS操作實現的。比如下面這行程式碼,如果已經有執行緒正在操作 i 位置的元素,則意味著本輪自旋將會失敗,繼續自旋,當其他執行緒修改完成,本執行緒再次執行到tabAt以為是Volatile操作,其他執行緒的修改對本執行緒立即可見(詳見Volatile關鍵字記憶體語義的內容)。本執行緒通過tabAt發現該處已經存在元素,即發生碰撞,繼續往下執行。
1 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 2 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//通過cas插入新值 3 break; // no lock when adding to empty bin 4 }
執行緒的排程需要作業系統從使用者態轉為核心態,這是非常重量級的操作。CAS+自旋組成的自旋鎖保證了執行緒不會進入阻塞態。
然後繼續往下看
synchronized (f) { //再次檢查當前節點是否有變化,有變化進入下一輪自旋 //為什麼再次檢查?因為不能保證,當前執行緒執行到這裡,有沒有其他執行緒對該節點進行修改 if (tabAt(tab, i) == f) {
先看這行程式碼 synchronized (f) 這個f是一個桶的頭元素。也就是說在JDK8中synchronized鎖僅僅只鎖鏈表頭或者紅黑樹的頭(其實就是鎖一個桶,因為要訪問連結串列或者紅黑樹總要從頭開始訪問吧)
再看 if (tabAt(tab, i) == f) {} 其實就是雙重檢測(參考單例的雙重檢測),為什麼要再檢查一遍呢?因為不能保證當前執行緒執行到這裡,有沒有其他執行緒已經對該節點進行了修改。
initTable()
1 private final Node<K,V>[] initTable() { 2 Node<K,V>[] tab; int sc; 3 while ((tab = table) == null || tab.length == 0) { 4 // 賦值sc。並當sizeCtl == -1 即當前有執行緒正在執行初始化 5 if ((sc = sizeCtl) < 0) 6 //yield()暫停當前正在執行的執行緒,執行其他執行緒 7 //(這是一個通知,但是這是不一定會讓當前執行緒停止,要取決於執行緒排程器) 8 //就是我想讓出資源,但是這只是一廂情願的事情,執行緒排程器會考慮你的方法,但是不一定採納。 9 Thread.yield(); 10 //修改 sizeCtl 的值為 -1。 SIZECTL 為 sizeCtl 的記憶體地址。 11 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 12 try { 13 //執行初始化過程 14 if ((tab = table) == null || tab.length == 0) { 15 //sc在上面已經賦值,=原來 sizeCtl的值。是非討厭JDK原始碼這種賦值方式。 16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 17 @SuppressWarnings("unchecked") 18 //建立一個sc長度的table。 19 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 20 table = tab = nt; 21 sc = n - (n >>> 2); 22 } 23 } finally { 24 //初始化完成, sizeCtl重新賦值為當前陣列的長度。 25 sizeCtl = sc; 26 } 27 break; 28 } 29 } 30 return tab; 31 }
以上過程,同樣是通過CAS實現的初始化控制,保證只有一個執行緒去執行初始化。
helpTransfer(tab, f);方法我們後面介紹完擴容再說。
看完以上put過程,我們能發現,JDK8通過CAS+自旋鎖將鎖的粒度控制在每一個桶上,相對於JDK7中Segment鎖,鎖粒度提高了很多。並且CAS+自旋鎖保證了不會出現執行緒的切花這種重量級的操作。
五、擴容
1 //tab舊桶陣列,nextTab新桶陣列 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 3 int n = tab.length, stride; 4 //控制併發數,控制CPU的資源 5 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 6 stride = MIN_TRANSFER_STRIDE; // subdivide range 7 if (nextTab == null) { // initiating//新陣列為空,則初始化新陣列 8 try { 9 @SuppressWarnings("unchecked") 10 //擴容為原來的兩倍 n << 1 11 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 12 nextTab = nt; 13 } catch (Throwable ex) { // try to cope with OOME 14 sizeCtl = Integer.MAX_VALUE; 15 return; 16 } 17 nextTable = nextTab; 18 transferIndex = n; 19 } 20 int nextn = nextTab.length; 21 //在這裡面進行new Node將node.hash置為-1。表示該桶正在進行移動。 22 //(這裡很重要的一點是,只鎖表頭,所以只需要將連結串列(或者紅黑樹)頭結點.hash置為-1即可) 23 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 24 //advance是控制是否繼續進行移動的條件,當advance == false,表示正在移動一個桶。 25 //true表示可以繼續進行下一個桶的移動 26 boolean advance = true; 27 boolean finishing = false; // to ensure sweep before committing nextTab 28 for (int i = 0, bound = 0;;) {//自旋 29 Node<K,V> f; int fh; 30 while (advance) {//start 31 int nextIndex, nextBound; 32 //當前桶是不是已經移動完了 33 if (--i >= bound || finishing) 34 advance = false; 35 //兩個停止移動的條件。移動完了。(這個是真正停止的條件。下面那個條件會進行一次檢查) 36 else if ((nextIndex = transferIndex) <= 0) { 37 i = -1; 38 advance = false; 39 } 40 else if (U.compareAndSwapInt 41 (this, TRANSFERINDEX, nextIndex, 42 nextBound = (nextIndex > stride ? 43 nextIndex - stride : 0))) { 44 bound = nextBound; 45 i = nextIndex - 1; 46 advance = false; 47 } 48 } 49 if (i < 0 || i >= n || i + n >= nextn) { 50 int sc; 51 if (finishing) {//結束擴容 52 nextTable = null; 53 table = nextTab; 54 sizeCtl = (n << 1) - (n >>> 1); 55 return; 56 } 57 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 58 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 59 return; 60 finishing = advance = true; 61 i = n; // recheck before commit 再次檢查一遍,防止有桶中還有資料沒移動。 62 } 63 }//end 從start到end可看可不看就是條件控制,包括結束條件的控制,移動進度的控制等。 64 //該桶沒資料 65 else if ((f = tabAt(tab, i)) == null) 66 //將oldtab中的該桶設定為fwd節點,hash=-1 67 advance = casTabAt(tab, i, null, fwd); 68 //已經移動過的桶其hash=-1 69 else if ((fh = f.hash) == MOVED) 70 advance = true; // already processed 71 else { 72 synchronized (f) {//上鎖 73 if (tabAt(tab, i) == f) { 74 //ln新連結串列,不需要移動的節點重新組組織成的連結串列。 75 //hn新連結串列,需要移動的節點重新組織成的連結串列 76 Node<K,V> ln, hn; 77 if (fh >= 0) {//連結串列 78 int runBit = fh & n; 79 Node<K,V> lastRun = f; 80 //start 81 //從start,到end之間。不看也行。實在費腦子。其實這段程式碼寫的有點讓人費解 82 //主要是不認真看不知道作者的意圖。本意是這樣的。判斷是不是可以從某個節點n開始 83 //後面的節點是不是都是和節點n一樣,移動的目標桶一樣的。 84 //如果是一樣的,則後面的這些節點就不用移動了,只需要移動n節點即可。 85 //(注意連結串列的引用,next指標就把後面的都帶過去了) 86 //想一個極端情況,如果在這裡迭代後發現,所有節點,擴容後資料移動的目標桶都是一樣的。 87 //則只需要移動頭結點即可。不用重新拼接連結串列了。 88 for (Node<K,V> p = f.next; p != null; p = p.next) { 89 int b = p.hash & n; 90 if (b != runBit) { 91 runBit = b; 92 lastRun = p; 93 } 94 } 95 if (runBit == 0) {// runBit== 0 表示該節點不需要移動 96 ln = lastRun; 97 hn = null; 98 } 99 else { 100 hn = lastRun; 101 ln = null; 102 }//end 103 for (Node<K,V> p = f; p != lastRun; p = p.next) { 104 int ph = p.hash; K pk = p.key; V pv = p.val; 105 if ((ph & n) == 0) 106 ln = new Node<K,V>(ph, pk, pv, ln); 107 else 108 hn = new Node<K,V>(ph, pk, pv, hn); 109 } 110 setTabAt(nextTab, i, ln); 111 setTabAt(nextTab, i + n, hn); 112 setTabAt(tab, i, fwd); 113 advance = true; 114 } 115 else if (f instanceof TreeBin) {//紅黑樹 116 TreeBin<K,V> t = (TreeBin<K,V>)f; 117 TreeNode<K,V> lo = null, loTail = null; 118 TreeNode<K,V> hi = null, hiTail = null; 119 int lc = 0, hc = 0; 120 for (Node<K,V> e = t.first; e != null; e = e.next) { 121 int h = e.hash; 122 TreeNode<K,V> p = new TreeNode<K,V> 123 (h, e.key, e.val, null, null); 124 if ((h & n) == 0) { 125 if ((p.prev = loTail) == null) 126 lo = p; 127 else 128 loTail.next = p; 129 loTail = p; 130 ++lc; 131 } 132 else { 133 if ((p.prev = hiTail) == null) 134 hi = p; 135 else 136 hiTail.next = p; 137 hiTail = p; 138 ++hc; 139 } 140 } 141 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 142 (hc != 0) ? new TreeBin<K,V>(lo) : t; 143 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 144 (lc != 0) ? new TreeBin<K,V>(hi) : t; 145 setTabAt(nextTab, i, ln); 146 setTabAt(nextTab, i + n, hn); 147 setTabAt(tab, i, fwd); 148 advance = true; 149 } 150 } 151 } 152 } 153 } 154 }
5.1、擴容前準備階段
ForwardingNode
1 static final class ForwardingNode<K,V> extends Node<K,V> { 2 final Node<K,V>[] nextTable; 3 ForwardingNode(Node<K,V>[] tab) { 4 super(MOVED, null, null, null); 5 this.nextTable = tab; 6 } 7 }
看一下這個內部類,其實呢其就是一個起到標識作用的節點,該節點看上面程式碼可知,該節點最主要的特點就是hash=MOVED=-1。hash=-1的節點在ConcurrentHashMap中表示該桶是被擴容過程遷移過的桶。然後當前執行緒判斷如果該桶已經被遷移。無論put還是get都去新的陣列中操作。還有一點很重要,還可以通過ForwardingNode中 nextTable獲取到新的陣列。
1 //該桶沒資料 2 else if ((f = tabAt(tab, i)) == null) 3 //將oldtab中的該桶設定為fwd節點,hash=-1 4 advance = casTabAt(tab, i, null, fwd);
看上面程式碼,先判斷該桶還有沒有資料。沒資料不用遷移,等同於已經遷移完了。其他執行緒put會直接put到新的陣列中。
1 //已經移動過的桶其hash=-1; 2 else if ((fh = f.hash) == MOVED) 3 advance = true; // already processed
如果該桶已經移動則跳過。
到此我們能看出什麼?主要是已經移動完的設定成fwd節點,其它執行緒看到該桶已經移動,則會到新的table中操作。如果未移動,還直接操作當前table,因為就算put,待會處理到該桶,一樣移動到新桶,也沒啥影響。如果是正在移動的接下來會看到加了Synchronized鎖,保證只有一個執行緒能操作當前桶。簡直不要太妙。
5.2、擴容過程
畫重點,擴容過程
1 synchronized (f) {//上鎖 2 if (tabAt(tab, i) == f) { 3 //ln新連結串列,不需要移動的節點重新組組織成的連結串列。 4 //hn新連結串列,需要移動的節點重新組織成的連結串列 5 Node<K,V> ln, hn; 6 if (fh >= 0) {//連結串列 7 int runBit = fh & n; 8 Node<K,V> lastRun = f; 9 //start 10 //從start,到end之間。不看也行。實在費腦子。其實這段程式碼寫的有點讓人費解 11 //主要是不認真看不知道作者的意圖。本意是這樣的。判斷是不是可以從某個節點n開始 12 //後面的節點是不是都是和節點n一樣,移動的目標桶一樣的。 13 //如果是一樣的,則後面的這些節點就不用移動了,只需要移動n節點即可。 14 //(注意連結串列的引用,next指標就把後面的都帶過去了) 15 //想一個極端情況,如果在這裡迭代後發現,所有節點,擴容後資料移動的目標桶都是一樣的。 16 //則只需要移動頭結點即可。不用重新拼接連結串列了。 17 for (Node<K,V> p = f.next; p != null; p = p.next) { 18 int b = p.hash & n; 19 if (b != runBit) { 20 runBit = b; 21 lastRun = p; 22 } 23 } 24 if (runBit == 0) {// runBit== 0 表示該節點不需要移動 25 ln = lastRun; 26 hn = null; 27 } 28 else { 29 hn = lastRun; 30 ln = null; 31 }//end 32 for (Node<K,V> p = f; p != lastRun; p = p.next) { 33 int ph = p.hash; K pk = p.key; V pv = p.val; 34 if ((ph & n) == 0) 35 ln = new Node<K,V>(ph, pk, pv, ln); 36 else 37 hn = new Node<K,V>(ph, pk, pv, hn); 38 } 39 setTabAt(nextTab, i, ln); 40 setTabAt(nextTab, i + n, hn); 41 setTabAt(tab, i, fwd); 42 advance = true; 43 } 44 else if (f instanceof TreeBin) {//紅黑樹 45 //紅黑樹跳過 46 } 47 } 48 }
5.2.1、併發控制
首先擴容過程是在synchronized同步程式碼塊中的。並且只鎖了一個表頭。可看到沒有鎖新陣列nextTab的桶。想想,oldTab(tab變數)和nextTab都是多個執行緒共享的變數,為什麼只有只鎖了oldTab正在操作的桶?如果有多個執行緒向nextTab同時遷移資料怎麼辦?會不會存線上程安全性問題?
TIPS: 統一術語 tab = oldTab = table(舊陣列) newTab = nextTab(擴容後新陣列) oldIndex即在oldTab中的索引位 newIndex即在newTab中的位置
在上一篇文章中介紹HashMap的時候詳細介紹了HashMap擴容中,oldTab舊桶遷移向newTab只有兩個目標桶。再簡單回顧一遍。
上面這張圖形象的展示了舊桶在擴容後的兩個去向:1,索引位原地不動,2,索引位為oldCap+oldIndex。(關於為什麼是這兩個去向,在HashMap擴容中已經詳細介紹了)
如果你還沒懂我的疑問,請參考下面這個圖。
前提,ConcurrentHashMap是併發擴容,可以有多個執行緒同時擴容,其次如果如上圖紅線那樣,oldTab中有多個桶中的資料遷移到newTab中的同一個桶中,如果出現這種情況就意味著存線上程安全性問題。
從上圖5-1中,兩個資料遷移的方向可知,擴容前,oldIndex不同就表示不在一個桶,擴容後的兩個去向如果oldIndex不一樣,也一定不在同一個桶。所以不會出現5-2圖中紅線的那種情況,也就說明在擴容過程中不需要鎖newTab。佩服+2
5.2.2、資料遷移
//ln新連結串列,不需要移動的節點重新組組織成的連結串列。 //hn新連結串列,需要移動的節點重新組織成的連結串列 Node<K,V> ln, hn;
int runBit = fh & n;
看兩個變數,上面說過擴容後,舊桶中的資料只有兩個遷移的方向。ln正是資料遷移後索引位依然是oldIndex的資料的連結串列,hn是遷移後需要遷移到oldCap + oldIndex索引位的連結串列。
關注一下runBit變數,如果 runBit == 0 成立則說明遷移後桶的索引位依然是oldIndex。詳見HashMap擴容分析。
重點關注一下start到end之間的程式碼
關於這段程式碼,首選我們假設一種極端情況,如果當前正在移動的桶中的資料在rehash之後,資料遷移的目標桶除了第一個節點的目標桶是oldIndex之外,後面的資料的目標桶都是oldIndex + oldCap。我們還需要處理後面的節點嗎?不需要,因為只需要將第二個節點移動到newTab的oldIndex + oldCap位置即可。第二個元素也就是lastRun變數。相對於HashMap完全的將資料組織成兩個連結串列,這也算得上是一個性能上的優化吧。
接著往下看
程式碼段1:
1 for (Node<K,V> p = f.next; p != null; p = p.next) { 2 int b = p.hash & n; 3 if (b != runBit) {//相同跳過 4 runBit = b; 5 lastRun = p; 6 } 7 }
以上程式碼通過對連結串列的一次掃描決定了lastRun。
程式碼段2:
1 if (runBit == 0) {// runBit== 0 表示該節點不需要移動 2 ln = lastRun; 3 hn = null; 4 } 5 else { 6 hn = lastRun; 7 ln = null; 8 }//end
根據lastRun指向的節點的runBit決定後續節點在擴容後是oldIndex + oldCap還是oldIndex。
程式碼段3:
1 for (Node<K,V> p = f; p != lastRun; p = p.next) { 2 int ph = p.hash; K pk = p.key; V pv = p.val; 3 if ((ph & n) == 0) 4 ln = new Node<K,V>(ph, pk, pv, ln); 5 else 6 hn = new Node<K,V>(ph, pk, pv, hn); 7 }
上述程式碼會重新組織兩個新連結串列。注意這個迭代到lastRun位置結束,因為以上過程已經確定了lastRun的歸屬。
看一下 ln = new Node<K,V>(ph, pk, pv, ln); 重新組織連結串列的程式碼,也就是ln會成為新new出來的node的下一個節點。
這樣有什麼問題?問題就是節點在舊桶中的相對順序在新桶中將相反。也就是next的指標翻轉一下。可以看一下node的建構函式就明瞭了。
演示擴容過程
假設當前擴容前oldCap即oldTab的長度為2,擴容後newCap即newTab的長度為4。如下圖看擴容過程,橘色的代表遷移後索引位依然是oldIndex,綠色代表擴容後索引位為oldIndex + oldCap。
上述程式碼段1迭代找到了lastRun即指向node(11),程式碼段2將lastRun賦值給hn。程式碼段3執行過程如下
1,將node(1)拼接到ln 2,將node(3)拼接到hn,此時注意,hn已經lastRun指向的節點node(11),此時hn=3—>11—>15—>19—>null 3,處理node(5)拼接到ln 4,處理...
對比JDK7 HashMap,JDK8 HashMap,JDK8 ConcurrentHashMap在擴容後對節點相對順序的保證方面,JDK7 HashMap是完全倒序。JDK8 HashMap不改變相對順序。JDK8 ConcurrentHashMap 保證部分節點的相對順序,其餘的倒序。
題外話,從程式碼風格和死路上,猜測一下ConcurrentHashMap應該是來自JDK7的HashMap。
1 setTabAt(nextTab, i, ln); 2 setTabAt(nextTab, i + n, hn); 3 setTabAt(tab, i, fwd); 4 advance = true;
ln和hn兩個連結串列各回各家各找各媽。
回過頭來再看put方法中的幫助擴容
1 else if ((fh = f.hash) == MOVED) 2 tab = helpTransfer(tab, f);
在put方法中有這樣一行判斷,當f.hash = MOVED即當前HashMap正在擴容中,則當前執行緒會去嘗試幫助擴容。
1 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { 2 Node<K,V>[] nextTab; int sc; 3 if (tab != null && (f instanceof ForwardingNode) &&//條件判斷 4 (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//從fwd節點中取出新table 5 int rs = resizeStamp(tab.length); 6 while (nextTab == nextTable && table == tab && 7 (sc = sizeCtl) < 0) { 8 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 9 sc == rs + MAX_RESIZERS || transferIndex <= 0) 10 break; 11 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//修改sizeCtl = sizeCtl + 1,表示多了一個執行緒參與擴容 12 transfer(tab, nextTab); 13 break; 14 } 15 } 16 return nextTab; 17 } 18 return table; 19 }
在helpTransfer方法中會首先做一系列判斷,通過fwd節點獲取到nextTab即新的陣列。通過CAS 實現sizeCtl++操作,表示多了一個執行緒進行擴容,因為在擴容方法中對擴容執行緒數量有控制。
最後的最後,擴容的時機
說一下觸發擴容的操作,總的來說就是put操作,但是有兩個時機很重要,其一就是addCount方法中,每次put一個元素,在addCount方法中都會判斷需不需要進行擴容。另外就是treeifyBin方法中,如果桶中資料超過了8個並且陣列長度<64則不會進行樹化,而是進行擴容。關於這個在HashMap原始碼介紹中也有介紹。你想想,假如容量為16,你就插入了9個元素,巧了,都在同一個桶裡面,如果這時進行樹化,樹化本身就是一個耗時的過程。時間複雜度會增加,效能下降,不如直接進行擴容,空間換時間。
終於擴容過程寫完了。很經典,想讀懂也很費勁。
六、get過程原始碼
1 public V get(Object key) { 2 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 3 int h = spread(key.hashCode());//hash 4 if ((tab = table) != null && (n = tab.length) > 0 && 5 (e = tabAt(tab, (n - 1) & h)) != null) {//取桶 6 if ((eh = e.hash) == h) {//key相同直接返回 7 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 8 return e.val; 9 } 10 else if (eh < 0)//hash < 0 表示正在擴容 11 //在這裡需要非常注意的一點,擴容後的桶會放入fwd節點 12 //該節點hash = MOVED,fwd.nextTable為擴容後新的陣列。 13 return (p = e.find(h, key)) != null ? p.val : null; 14 while ((e = e.next) != null) {//迭代連結串列 15 if (e.hash == h && 16 ((ek = e.key) == key || (ek != null && key.equals(ek)))) 17 return e.val; 18 } 19 } 20 return null; 21 }
get原始碼只關注下面這行
return (p = e.find(h, key)) != null ? p.val : null;
當該桶已經被移動,則通過e.find方法去nextTab新陣列查詢。首先在5章節resize擴容方法中,已經擴容的桶會被塞進去一個ForwardingNode節點 setTabAt(tab, i, fwd); 繼續看resize方法中ForwardingNode的初始化會發現是這樣初始化的 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); ,看它的構造方法
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }
不難發下其初始化方法接收一個nextTab也就是擴容後的新陣列,並將該陣列賦值給其內部變數nextTable。也就是說當get發現桶已經擴容後,我們可以從fwd節點中找到新的陣列。並從新的陣列中找到新的目標桶並進行元素查詢。
看了以上程式碼,回到 e.find(h, key)) ,需要明確的是e就是ForwardingNode節點。看看find方法
1 Node<K,V> find(int h, Object k) { 2 // loop to avoid arbitrarily deep recursion on forwarding nodes 3 outer: for (Node<K,V>[] tab = nextTable;;) {//迭代nextTable 4 Node<K,V> e; int n; 5 if (k == null || tab == null || (n = tab.length) == 0 || 6 (e = tabAt(tab, (n - 1) & h)) == null) 7 return null; 8 for (;;) { 9 int eh; K ek; 10 if ((eh = e.hash) == h && 11 ((ek = e.key) == k || (ek != null && k.equals(ek)))) 12 return e; 13 if (eh < 0) { 14 if (e instanceof ForwardingNode) { 15 tab = ((ForwardingNode<K,V>)e).nextTable; 16 continue outer; 17 } 18 else 19 return e.find(h, k); 20 } 21 if ((e = e.next) == null) 22 return null; 23 } 24 } 25 }
OK,很明確了,確實是從nextTable中查詢的。
得出一個結論,ConcurrentHashMap擴容不影響get操作。也就是在擴容過程中可以併發讀。
七、ConcurrentHashMap併發控制
詳細看了ConcurrentHashMap put resize get過程的原始碼,本章從整體上看一下ConcurrentHashMap的併發控制。
下面結合圖片我們看一下ConcurrentHashMap的併發過程。
如上圖,執行緒1進行put操作,這時發現size > sizeCtl。開始進行擴容
此時執行緒1已經完成oldTab中索引[2,16)中的擴容。正在進行索引為1的桶的擴容。接下來執行緒2執行get。
執行緒2根據get邏輯和key的hash,可能訪問的三種情況如上圖所示
情況一:訪問藍色號桶,即未擴容的桶。該桶還未進行擴容,所以在桶中找到對應元素,返回。
情況二:訪問綠色桶,即正在擴容的桶。該桶正在擴容,在擴容過程中,執行緒1持有Synchronized鎖,執行緒2只能自旋等待。
情況三:訪問橘色桶,該桶已擴容的桶。該桶已擴容,oldTab中是fwd節點,hash=-1,所以執行fwd節點的find邏輯,fwd節點持有newTab(nextTable),所以執行緒2去newTab中查詢對應元素,返回。
如上圖4,當執行緒1進行擴容時,執行緒3進來執行put,同樣存在三種可能的情況
情況一:訪問藍色桶,即未擴容的桶。正常執行put邏輯。
情況二:訪問綠色桶,即正擴容的桶。因為線層1持有Synchronized鎖,執行緒3將一直自旋,等待擴容結束。
情況三:訪問橘色桶,即已擴容的桶。因為已擴容的桶,在oldTab中是fwd節點,hash = -1 = MOVED,所以執行緒3執行幫助擴容的邏輯。等待擴容完成,執行緒3繼續完成put邏輯。
OK,以上就是ConcurrentHashMap關於get put resize的併發控制,從以上過程可見,存在鎖競爭的情況很有限,即使存在鎖競爭,也是進行自旋,而不會阻塞執行緒。可見ConcurrentHashMap能做到高效的併發讀。
在put過程中,因為如果存線上程正在已經擴容,則幫助進行擴容(協助擴容這塊,有一個步長的概念,同時進行擴容的執行緒和table的長度有關)。如果當前桶正在進行擴容,則被Synchronized鎖拒之門外,自旋等待擴容結束。如果訪問的是未擴容的桶,則執行正常的put邏輯。可見整個過程中,由於鎖的粒度很小,put做到了高效的併發寫,也做到了高效的擴容。
總之一句話ConcurrentHashMap的高併發是通過 CAS樂觀鎖 + 自旋鎖 + 細粒度 保證的。
如有錯誤的地方還請留言指正。
原創不易,轉載請註明原文地址:https://www.cnblogs.com/hello-shf/p/12183263.h