1. 程式人生 > >別再問我ConcurrentHashMap了

別再問我ConcurrentHashMap了

以下ConcurrentHashMap以jdk8中為例進行分析,ConcurrentHashMap是一個執行緒安全、基於陣列+連結串列(或者紅黑樹)的kv容器,主要特性如下:

  • 執行緒安全,陣列中單個slot元素個數超過8個時會將連結串列結構轉換成紅黑樹,注意樹節點之間還是有next指標的;
  • 當元素個數超過N(N = tab.length - tab.length>>>2,達到0.75閾值時)個時觸發rehash,成倍擴容;
  • 當執行緒擴容時,其他執行緒put資料時會加入幫助擴容,加快擴容速度;
  • put時對單個slot頭節點元素進行synchronized加鎖,ConcurrentHashMap中的加鎖粒度是針對slot節點的,rehash過程中加鎖粒度也是如此;
  • get時一般是不加鎖。如果slot元素為連結串列,直接讀取返回即可;如果slot元素為紅黑樹,並且此時該樹在進行再平衡或者節點刪除操作,讀取操作會按照樹節點的next指標進行讀取,也是不加鎖的(因為紅黑樹中節點也是有連結串列串起來的);如果該樹並沒有進行平衡或者節點刪除操作,那麼會用CAS加讀鎖,防止讀取過程中其他執行緒該樹進行更新操作(主要是防止破壞紅黑樹節點之間的連結串列特性),破壞“讀檢視”。

ConcurrentHashMap預設陣列長度16,map最大容量為MAXIMUM_CAPACITY = 1 << 30。建立ConcurrentHashMap並不是涉及陣列的初始化,陣列初始化是在第一次put資料才進行的。(注意:JDK1.8中捨棄了之前的分段鎖技術,改用CAS+Synchronized機制)

Node結構

ConcurrentHashMap中一個重要的類就是Node,該類儲存鍵值對,所有插入ConcurrentHashMap的資料都包裝在這裡面。它與HashMap中的定義很相似,但是有一些差別是ConcurrentHashMap的value和next屬性都是volatile的(保證了get資料時直接返回即可,volatile保證了更新的可見性),且不允許呼叫setValue方法直接改變Node的value域,增加了find方法輔助map.get()方法,可在get方法返回的結果中更改對應的value值。

1 static class Node<K,V> implements Map.Entry<K,V> {
2     final int hash;
3     final K key;
4     volatile V val;
5     volatile Node<K,V> next;
6 }

ConcurrentHashMap定義了三個原子操作,用於對陣列指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的執行緒安全。

 1 //獲得在i位置上的Node節點  
 2 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {  
 3    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
 4 }  
 5 //利用CAS演算法設定i位置上的Node節點。之所以能實現併發是因為他指定了原來這個節點的值是多少  
 6 //在CAS演算法中,會比較記憶體中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改  
 7 //因此當前執行緒中的值並不是最新的值,這種修改可能會覆蓋掉其他執行緒的修改結果
 8 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,  
 9                                    Node<K,V> c, Node<K,V> v) {  
10    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  
11 }  
12 //利用volatile方法設定節點位置的值  
13 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {  
14    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);  

下面就按照ConcurrentHashMap的 put / get / remove 來分析下其實現原理,中間涉及rehash、紅黑樹轉換等。

put流程

put操作流程如下:

  • 首先根據key的hashCode計算hash,然後根據hash計算應該在陣列中儲存位置,如果資料為null,新建陣列;
  • 然後通過tabAt(&操作)直接獲取對應slot。如果slot為null,則新建kv節點(Node型別)放到slot;
  • 如果當前slot節點的hash值等於MOVED(等於-1),表示其型別為ForwardingNode,證明其他執行緒在進行rehash擴容操作,當前執行緒也會幫助一起進行擴容操作;
  • 然後對slot節點進行synchronized加鎖,如果slot節點hash值大於等於0,表示當前slot對應元素為連結串列結構,遍歷當前連結串列,如果key存在則更新,否則新增到連結串列尾部;如果slot節點型別為TreeBin(其hash值為-2),表示slot對應元素為紅黑樹,則在紅黑樹中進行更新節點或者新增節點操作,注意,最後如果樹不平衡會進行樹的再平衡操作,此時對樹root節點加CAS寫鎖。
  • 最後,如果新添加了節點,會統計map size值;如果當前map數量超過了閾值(N = tab.length - tab.length>>>2)會觸發rehash擴容,按照成倍擴容。

注意:因為往map中新增元素和增加元素統計值是兩個步驟,不是原子的,所以獲取map.size()時可能不是準確值。

對key的hashCode計算hash

存到map中的key並不是直接按照hashCode計算的,因為hashCode有可能為負的,並且不合理的hashCode實現可能導致較多衝突,因此ConcurrentHashMap中會對key對hashCode進行hash操作:

1 // int hash = spread(key.hashCode());
2 // HASH_BITS = 0x7fffffff 符號位設定為0
3 static final int spread(int h) {
4     return (h ^ (h >>> 16)) & HASH_BITS;
5 }

紅黑樹節點比較

既然使用到了紅黑樹,這就涉及到節點的大小比較問題(節點資料包含key、value資訊)。進行節點的大小比較時,首先是比較節點的hash值,注意hash值不是hashCode,因為hash值是物件hashCode與自己無符號右移16位進行異或後的值。如果節點的hash值相等,判斷節點的key物件是否實現了Comparable介面,實現的話就是用Comparable邏輯比較節點之間的大小。如果key物件未實現Comparable介面,則呼叫tieBreakOrder方法進行判斷:

 1 // dir = tieBreakOrder(k, pk); k/pk,帶比較兩個節點,命名還是挺有意思的
 2 static int tieBreakOrder(Object a, Object b) {
 3     int d;
 4     if (a == null || b == null ||
 5         (d = a.getClass().getName().
 6          compareTo(b.getClass().getName())) == 0)
 7         d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
 8              -1 : 1);
 9     return d;
10 }

這裡呼叫了System.identityHashCode,將由預設方法hashCode()返回,如果物件的hashCode()被重寫,則System.identityHashCode和hashCode()的返回值就不一樣了。

put原始碼

  1 final V putVal(K key, V value, boolean onlyIfAbsent) {
  2     // key value非空
  3     if (key == null || value == null) throw new NullPointerException();
  4     int hash = spread(key.hashCode());
  5     // slot對應元素個數,連結串列轉換成紅黑樹時用
  6     int binCount = 0;
  7     for (Node<K,V>[] tab = table;;) {
  8         Node<K,V> f; int n, i, fh;
  9         if (tab == null || (n = tab.length) == 0)
 10             tab = initTable();
 11         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
 12             if (casTabAt(tab, i, null,
 13                          new Node<K,V>(hash, key, value, null)))
 14                 break;                   // no lock when adding to empty bin
 15         }
 16         else if ((fh = f.hash) == MOVED)
 17             // 在rehash擴容,幫助擴容,擴容完成之後才能繼續進行put操作
 18             tab = helpTransfer(tab, f);
 19         else {
 20             V oldVal = null;
 21             synchronized (f) { // 加鎖
 22                 if (tabAt(tab, i) == f) { // 可能已經被更新需要再次進行判斷
 23                     if (fh >= 0) { // 節點更新或插入
 24                         binCount = 1;
 25                         for (Node<K,V> e = f;; ++binCount) {
 26                             K ek;
 27                             if (e.hash == hash &&
 28                                 ((ek = e.key) == key ||
 29                                  (ek != null && key.equals(ek)))) {
 30                                 oldVal = e.val;
 31                                 if (!onlyIfAbsent)
 32                                     e.val = value;
 33                                 break;
 34                             }
 35                             Node<K,V> pred = e;
 36                             if ((e = e.next) == null) {
 37                                 pred.next = new Node<K,V>(hash, key,
 38                                                           value, null);
 39                                 break;
 40                             }
 41                         }
 42                     }
 43                     else if (f instanceof TreeBin) { // 紅黑樹更新或插入
 44                         Node<K,V> p;
 45                         binCount = 2;
 46                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
 47                                                        value)) != null) {
 48                             oldVal = p.val;
 49                             if (!onlyIfAbsent)
 50                                 p.val = value;
 51                         }
 52                     }
 53                 }
 54             }
 55             if (binCount != 0) {
 56                 if (binCount >= TREEIFY_THRESHOLD)
 57                     treeifyBin(tab, i);
 58                 if (oldVal != null)
 59                     return oldVal;
 60                 break;
 61             }
 62         }
 63     }
 64     // 增加統計值,可能觸發rehash擴容
 65     addCount(1L, binCount);
 66     return null;
 67 }
 68 
 69 private final void addCount(long x, int check) {
 70     CounterCell[] as; long b, s;
 71     /**
 72      * counterCells非空表示當前put併發較大,按照counterCells進行分執行緒統計
 73      * 參考LongAddr思想
 74      */
 75     if ((as = counterCells) != null ||
 76         !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 77         CounterCell a; long v; int m;
 78         boolean uncontended = true;
 79         if (as == null || (m = as.length - 1) < 0 ||
 80             (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
 81             !(uncontended =
 82               U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
 83             fullAddCount(x, uncontended);
 84             return;
 85         }
 86         if (check <= 1)
 87             return;
 88         s = sumCount();
 89     }
 90     if (check >= 0) {
 91         Node<K,V>[] tab, nt; int n, sc;
 92         // 大於等於閾值數時進行擴容操作
 93         while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
 94                (n = tab.length) < MAXIMUM_CAPACITY) {
 95             int rs = resizeStamp(n);
 96             if (sc < 0) {
 97                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 98                     sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
 99                     transferIndex <= 0)
100                     break;
101                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
102                     transfer(tab, nt);
103             }
104             else if (U.compareAndSwapInt(this, SIZECTL, sc,
105                                          (rs << RESIZE_STAMP_SHIFT) + 2))
106                 transfer(tab, null);
107             s = sumCount();
108         }
109     }
110 }

 

get流程

get方法比較簡單,給定一個key來確定value的時候,必須滿足兩個條件hash值相同同時 key相同(equals) ,對於節點可能在連結串列或樹上的情況,需要分別去查詢。

get時一般是不加鎖(Node節點中value資料型別是volatile的,保證了記憶體可見性)。如果slot元素為連結串列,直接讀取返回即可;如果slot元素為紅黑樹,並且此時該樹在進行再平衡或者節點刪除操作,讀取操作會按照樹節點的next指標進行讀取,也是不加鎖的;如果該樹並沒有進行平衡或者節點刪除操作,那麼會用CAS加讀鎖,防止讀取過程中其他執行緒該樹進行更新操作,破壞“讀檢視”。

remove流程

remove流程就是根據key找到對應節點,將該節點從連結串列(更改節點前後關係)或者紅黑樹移除的過程,注意,從紅黑樹中刪除元素後,不會將紅黑樹轉換為列表的,只能在put元素時列表可能有轉換紅黑樹操作,不會有反向操作。

注意:hashMap有自動rehash擴容機制,但是當元素remove之後並沒有自動縮容機制,如果陣列經過多次擴容變得很大,並且當前元素較少,請將這些元素轉移到一個新的HashMap中。

rehash流程

rehash時是成倍擴容(老table和新tableNew),對於table中i位置的所有元素,擴容後會被分配到i和i+table.length這兩個位置中。rehash主要的流程transfer方法中,具體不再展開。

 

推薦閱讀:

  • ConcurrentHashMap竟然也有死迴圈問題?

  • 你的ThreadLocal執行緒安全麼

更多文件可掃描以下二維碼:

&n