別再問我ConcurrentHashMap了
以下ConcurrentHashMap以jdk8中為例進行分析,ConcurrentHashMap是一個執行緒安全、基於陣列+連結串列(或者紅黑樹)的kv容器,主要特性如下:
- 執行緒安全,陣列中單個slot元素個數超過8個時會將連結串列結構轉換成紅黑樹,注意樹節點之間還是有next指標的;
- 當元素個數超過N(
N = tab.length - tab.length>>>2,達到0.75閾值時
)個時觸發rehash,成倍擴容; - 當執行緒擴容時,其他執行緒put資料時會加入幫助擴容,加快擴容速度;
- put時對單個slot頭節點元素進行synchronized加鎖,ConcurrentHashMap中的加鎖粒度是針對slot節點的,rehash過程中加鎖粒度也是如此;
- get時一般是不加鎖。如果slot元素為連結串列,直接讀取返回即可;如果slot元素為紅黑樹,並且此時該樹在進行再平衡或者節點刪除操作,讀取操作會按照樹節點的next指標進行讀取,也是不加鎖的(因為紅黑樹中節點也是有連結串列串起來的);如果該樹並沒有進行平衡或者節點刪除操作,那麼會用CAS加讀鎖,防止讀取過程中其他執行緒該樹進行更新操作(主要是防止破壞紅黑樹節點之間的連結串列特性),破壞“讀檢視”。
ConcurrentHashMap預設陣列長度16,map最大容量為MAXIMUM_CAPACITY = 1 << 30
。建立ConcurrentHashMap並不是涉及陣列的初始化,陣列初始化是在第一次put資料才進行的。(注意:JDK1.8中捨棄了之前的分段鎖技術,改用CAS+Synchronized機制)
Node結構
ConcurrentHashMap中一個重要的類就是Node,該類儲存鍵值對,所有插入ConcurrentHashMap的資料都包裝在這裡面。它與HashMap中的定義很相似,但是有一些差別是ConcurrentHashMap的value和next屬性都是volatile的(保證了get資料時直接返回即可,volatile保證了更新的可見性
),且不允許呼叫setValue方法直接改變Node的value域,增加了find方法輔助map.get()方法,可在get方法返回的結果中更改對應的value值。
1 static class Node<K,V> implements Map.Entry<K,V> { 2 final int hash; 3 final K key; 4 volatile V val; 5 volatile Node<K,V> next; 6 }
ConcurrentHashMap定義了三個原子操作,用於對陣列指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的執行緒安全。
1 //獲得在i位置上的Node節點 2 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { 3 return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); 4 } 5 //利用CAS演算法設定i位置上的Node節點。之所以能實現併發是因為他指定了原來這個節點的值是多少 6 //在CAS演算法中,會比較記憶體中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改 7 //因此當前執行緒中的值並不是最新的值,這種修改可能會覆蓋掉其他執行緒的修改結果 8 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, 9 Node<K,V> c, Node<K,V> v) { 10 return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); 11 } 12 //利用volatile方法設定節點位置的值 13 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { 14 U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
下面就按照ConcurrentHashMap的 put / get / remove 來分析下其實現原理,中間涉及rehash、紅黑樹轉換等。
put流程
put操作流程如下:
- 首先根據key的hashCode計算hash,然後根據hash計算應該在陣列中儲存位置,如果資料為null,新建陣列;
- 然後通過tabAt(&操作)直接獲取對應slot。如果slot為null,則新建kv節點(Node型別)放到slot;
- 如果當前slot節點的hash值等於MOVED(等於-1),表示其型別為ForwardingNode,證明其他執行緒在進行rehash擴容操作,當前執行緒也會幫助一起進行擴容操作;
- 然後對slot節點進行synchronized加鎖,如果slot節點hash值大於等於0,表示當前slot對應元素為連結串列結構,遍歷當前連結串列,如果key存在則更新,否則新增到連結串列尾部;如果slot節點型別為TreeBin(其hash值為-2),表示slot對應元素為紅黑樹,則在紅黑樹中進行更新節點或者新增節點操作,注意,最後如果樹不平衡會進行樹的再平衡操作,此時對樹root節點加CAS寫鎖。
- 最後,如果新添加了節點,會統計map size值;如果當前map數量超過了閾值(
N = tab.length - tab.length>>>2
)會觸發rehash擴容,按照成倍擴容。
注意:因為往map中新增元素和增加元素統計值是兩個步驟,不是原子的,所以獲取map.size()時可能不是準確值。
對key的hashCode計算hash
存到map中的key並不是直接按照hashCode計算的,因為hashCode有可能為負的,並且不合理的hashCode實現可能導致較多衝突,因此ConcurrentHashMap中會對key對hashCode進行hash操作:
1 // int hash = spread(key.hashCode()); 2 // HASH_BITS = 0x7fffffff 符號位設定為0 3 static final int spread(int h) { 4 return (h ^ (h >>> 16)) & HASH_BITS; 5 }
紅黑樹節點比較
既然使用到了紅黑樹,這就涉及到節點的大小比較問題(節點資料包含key、value資訊)。進行節點的大小比較時,首先是比較節點的hash值,注意hash值不是hashCode,因為hash值是物件hashCode與自己無符號右移16位進行異或後的值。如果節點的hash值相等,判斷節點的key物件是否實現了Comparable介面,實現的話就是用Comparable邏輯比較節點之間的大小。如果key物件未實現Comparable介面,則呼叫tieBreakOrder方法進行判斷:
1 // dir = tieBreakOrder(k, pk); k/pk,帶比較兩個節點,命名還是挺有意思的 2 static int tieBreakOrder(Object a, Object b) { 3 int d; 4 if (a == null || b == null || 5 (d = a.getClass().getName(). 6 compareTo(b.getClass().getName())) == 0) 7 d = (System.identityHashCode(a) <= System.identityHashCode(b) ? 8 -1 : 1); 9 return d; 10 }
這裡呼叫了System.identityHashCode,將由預設方法hashCode()返回,如果物件的hashCode()被重寫,則System.identityHashCode和hashCode()的返回值就不一樣了。
put原始碼
1 final V putVal(K key, V value, boolean onlyIfAbsent) { 2 // key value非空 3 if (key == null || value == null) throw new NullPointerException(); 4 int hash = spread(key.hashCode()); 5 // slot對應元素個數,連結串列轉換成紅黑樹時用 6 int binCount = 0; 7 for (Node<K,V>[] tab = table;;) { 8 Node<K,V> f; int n, i, fh; 9 if (tab == null || (n = tab.length) == 0) 10 tab = initTable(); 11 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 12 if (casTabAt(tab, i, null, 13 new Node<K,V>(hash, key, value, null))) 14 break; // no lock when adding to empty bin 15 } 16 else if ((fh = f.hash) == MOVED) 17 // 在rehash擴容,幫助擴容,擴容完成之後才能繼續進行put操作 18 tab = helpTransfer(tab, f); 19 else { 20 V oldVal = null; 21 synchronized (f) { // 加鎖 22 if (tabAt(tab, i) == f) { // 可能已經被更新需要再次進行判斷 23 if (fh >= 0) { // 節點更新或插入 24 binCount = 1; 25 for (Node<K,V> e = f;; ++binCount) { 26 K ek; 27 if (e.hash == hash && 28 ((ek = e.key) == key || 29 (ek != null && key.equals(ek)))) { 30 oldVal = e.val; 31 if (!onlyIfAbsent) 32 e.val = value; 33 break; 34 } 35 Node<K,V> pred = e; 36 if ((e = e.next) == null) { 37 pred.next = new Node<K,V>(hash, key, 38 value, null); 39 break; 40 } 41 } 42 } 43 else if (f instanceof TreeBin) { // 紅黑樹更新或插入 44 Node<K,V> p; 45 binCount = 2; 46 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 47 value)) != null) { 48 oldVal = p.val; 49 if (!onlyIfAbsent) 50 p.val = value; 51 } 52 } 53 } 54 } 55 if (binCount != 0) { 56 if (binCount >= TREEIFY_THRESHOLD) 57 treeifyBin(tab, i); 58 if (oldVal != null) 59 return oldVal; 60 break; 61 } 62 } 63 } 64 // 增加統計值,可能觸發rehash擴容 65 addCount(1L, binCount); 66 return null; 67 } 68 69 private final void addCount(long x, int check) { 70 CounterCell[] as; long b, s; 71 /** 72 * counterCells非空表示當前put併發較大,按照counterCells進行分執行緒統計 73 * 參考LongAddr思想 74 */ 75 if ((as = counterCells) != null || 76 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 77 CounterCell a; long v; int m; 78 boolean uncontended = true; 79 if (as == null || (m = as.length - 1) < 0 || 80 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 81 !(uncontended = 82 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 83 fullAddCount(x, uncontended); 84 return; 85 } 86 if (check <= 1) 87 return; 88 s = sumCount(); 89 } 90 if (check >= 0) { 91 Node<K,V>[] tab, nt; int n, sc; 92 // 大於等於閾值數時進行擴容操作 93 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 94 (n = tab.length) < MAXIMUM_CAPACITY) { 95 int rs = resizeStamp(n); 96 if (sc < 0) { 97 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 98 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 99 transferIndex <= 0) 100 break; 101 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 102 transfer(tab, nt); 103 } 104 else if (U.compareAndSwapInt(this, SIZECTL, sc, 105 (rs << RESIZE_STAMP_SHIFT) + 2)) 106 transfer(tab, null); 107 s = sumCount(); 108 } 109 } 110 }
get流程
get方法比較簡單,給定一個key來確定value的時候,必須滿足兩個條件hash值相同同時 key相同(equals) ,對於節點可能在連結串列或樹上的情況,需要分別去查詢。
get時一般是不加鎖(Node節點中value資料型別是volatile的,保證了記憶體可見性)。如果slot元素為連結串列,直接讀取返回即可;如果slot元素為紅黑樹,並且此時該樹在進行再平衡或者節點刪除操作,讀取操作會按照樹節點的next指標進行讀取,也是不加鎖的;如果該樹並沒有進行平衡或者節點刪除操作,那麼會用CAS加讀鎖,防止讀取過程中其他執行緒該樹進行更新操作,破壞“讀檢視”。
remove流程
remove流程就是根據key找到對應節點,將該節點從連結串列(更改節點前後關係)或者紅黑樹移除的過程,注意,從紅黑樹中刪除元素後,不會將紅黑樹轉換為列表的,只能在put元素時列表可能有轉換紅黑樹操作,不會有反向操作。
注意:hashMap有自動rehash擴容機制,但是當元素remove之後並沒有自動縮容機制,如果陣列經過多次擴容變得很大,並且當前元素較少,請將這些元素轉移到一個新的HashMap中。
rehash流程
rehash時是成倍擴容(老table和新tableNew),對於table中i位置的所有元素,擴容後會被分配到i和i+table.length這兩個位置中。rehash主要的流程transfer方法中,具體不再展開。
推薦閱讀:
-
ConcurrentHashMap竟然也有死迴圈問題?
-
你的ThreadLocal執行緒安全麼
更多文件可掃描以下二維碼:
&n