1. 程式人生 > >ConcurrentHashMap JDK1.7和JDK1.8區別

ConcurrentHashMap JDK1.7和JDK1.8區別



前言

以前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操作,就是rehash,這個會重新將原陣列的內容重新hash到新的擴容陣列中,在多執行緒的環境下,存在同時其他的元素也在進行put操作,如果hash值相同,可能出現同時在同一陣列下用連結串列表示,造成閉環,導致在get時會出現死迴圈,所以HashMap是執行緒不安全的。

我們來了解另一個鍵值儲存集合HashTable,它是執行緒安全的,它在所有涉及到多執行緒操作的都加上了synchronized關鍵字來鎖住整個table,這就意味著所有的執行緒都在競爭一把鎖,在多執行緒的環境下,它是安全的,但是無疑是效率低下的。

其實HashTable有很多的優化空間,鎖住整個table這麼粗暴的方法可以變相的柔和點,比如在多執行緒的環境下,對不同的資料集進行操作時其實根本就不需要去競爭一個鎖,因為他們不同hash值,不會因為rehash造成執行緒不安全,所以互不影響,這就是鎖分離技術,將鎖的粒度降低,利用多個鎖來控制多個小的table,這就是這篇文章的主角ConcurrentHashMap JDK1.7版本的核心思想

ConcurrentHashMap

JDK1.7的實現

在JDK1.7版本中,ConcurrentHashMap的資料結構是由一個Segment陣列和多個HashEntry組成,如下圖所示:


Segment陣列的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分離技術,而每一個Segment元素儲存的是HashEntry陣列+連結串列,這個和HashMap的資料儲存結構一樣

初始化

ConcurrentHashMap的初始化是會通過位與運算來初始化Segment的大小,用ssize來表示,如下所示


  
  1. int sshift = 0;
  2. int
    ssize = 1;
  3. while (ssize < concurrencyLevel) {
  4. ++sshift;
  5. ssize <<= 1;
  6. }

如上所示,因為ssize用位於運算來計算(ssize <<=1),所以Segment的大小取值都是以2的N次方,無關concurrencyLevel的取值,當然concurrencyLevel最大隻能用16位的二進位制來表示,即65536,換句話說,Segment的大小最多65536個,沒有指定concurrencyLevel元素初始化,Segment的大小ssize預設為16

每一個Segment元素下的HashEntry的初始化也是按照位於運算來計算,用cap來表示,如下所示


  
  1. int cap = 1;
  2. while (cap < c)
  3. cap <<= 1;

如上所示,HashEntry大小的計算也是2的N次方(cap <<=1), cap的初始值為1,所以HashEntry最小的容量為2

put操作

對於ConcurrentHashMap的資料插入,這裡要進行兩次Hash去定位資料的儲存位置

static class Segment<K,V> extends ReentrantLock implements Serializable {

  

從上Segment的繼承體系可以看出,Segment實現了ReentrantLock,也就帶有鎖的功能,當執行put操作時,會進行第一次key的hash來定位Segment的位置,如果該Segment還沒有初始化,即通過CAS操作進行賦值,然後進行第二次hash操作,找到相應的HashEntry的位置,這裡會利用繼承過來的鎖的特性,在將資料插入指定的HashEntry位置時(連結串列的尾端),會通過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖,如果獲取成功就直接插入相應的位置,如果已經有執行緒獲取該Segment的鎖,那當前執行緒會以自旋的方式去繼續的呼叫tryLock()方法去獲取鎖,超過指定次數就掛起,等待喚醒

get操作

ConcurrentHashMap的get操作跟HashMap類似,只是ConcurrentHashMap第一次需要經過一次hash定位到Segment的位置,然後再hash定位到指定的HashEntry,遍歷該HashEntry下的連結串列進行對比,成功就返回,不成功就返回null

size操作

計算ConcurrentHashMap的元素大小是一個有趣的問題,因為他是併發操作的,就是在你計算size的時候,他還在併發的插入資料,可能會導致你計算出來的size和你實際的size有相差(在你return size的時候,插入了多個數據),要解決這個問題,JDK1.7版本用兩種方案


  
  1. try {
  2. for (;;) {
  3. if (retries++ == RETRIES_BEFORE_LOCK) {
  4. for ( int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation
  5. }
  6. sum = 0L;
  7. size = 0;
  8. overflow = false;
  9. for ( int j = 0; j < segments.length; ++j) {
  10. Segment<K,V> seg = segmentAt(segments, j);
  11. if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0)
  12. overflow = true;
  13. } }
  14. if (sum == last) break;
  15. last = sum; } }
  16. finally {
  17. if (retries > RETRIES_BEFORE_LOCK) {
  18. for ( int j = 0; j < segments.length; ++j)
  19. segmentAt(segments, j).unlock();
  20. }
  21. }

1.第一種方案他會使用不加鎖的模式去嘗試多次計算ConcurrentHashMap的size,最多三次,比較前後兩次計算的結果,結果一致就認為當前沒有元素加入,計算的結果是準確的

2.第二種方案是如果第一種方案不符合,他就會給每個Segment加上鎖,然後計算ConcurrentHashMap的size返回

JDK1.8的實現

JDK1.8的實現已經摒棄了Segment的概念,而是直接用Node陣列+連結串列+紅黑樹的資料結構來實現,併發控制使用Synchronized和CAS來操作,整個看起來就像是優化過且執行緒安全的HashMap,雖然在JDK1.8中還能看到Segment的資料結構,但是已經簡化了屬性,只是為了相容舊版本

在深入JDK1.8的put和get實現之前要知道一些常量設計和資料結構,這些是構成ConcurrentHashMap實現結構的基礎,下面看一下基本屬性:


  
  1. // node陣列最大容量:2^30=1073741824
  2. private static final int MAXIMUM_CAPACITY = 1 << 30;
  3. // 預設初始值,必須是2的幕數
  4. private static final int DEFAULT_CAPACITY = 16;
  5. //陣列可能最大值,需要與toArray()相關方法關聯
  6. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  7. //併發級別,遺留下來的,為相容以前的版本
  8. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  9. // 負載因子
  10. private static final float LOAD_FACTOR = 0.75f;
  11. // 連結串列轉紅黑樹閥值,> 8 連結串列轉換為紅黑樹
  12. static final int TREEIFY_THRESHOLD = 8;
  13. //樹轉連結串列閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
  14. static final int UNTREEIFY_THRESHOLD = 6;
  15. static final int MIN_TREEIFY_CAPACITY = 64;
  16. private static final int MIN_TRANSFER_STRIDE = 16;
  17. private static int RESIZE_STAMP_BITS = 16;
  18. // 2^15-1,help resize的最大執行緒數
  19. private static final int MAX_RESIZERS = ( 1 << ( 32 - RESIZE_STAMP_BITS)) - 1;
  20. // 32-16=16,sizeCtl中記錄size大小的偏移量
  21. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
  22. // forwarding nodes的hash值
  23. static final int MOVED = - 1;
  24. // 樹根節點的hash值
  25. static final int TREEBIN = - 2;
  26. // ReservationNode的hash值
  27. static final int RESERVED = - 3;
  28. // 可用處理器數量
  29. static final int NCPU = Runtime.getRuntime().availableProcessors();
  30. //存放node的陣列
  31. transient volatile Node<K,V>[] table;
  32. /*控制識別符號,用來控制table的初始化和擴容的操作,不同的值有不同的含義
  33. *當為負數時:-1代表正在初始化,-N代表有N-1個執行緒正在 進行擴容
  34. *當為0時:代表當時的table還沒有被初始化
  35. *當為正數時:表示初始化或者下一次進行擴容的大小*/
  36. private transient volatile int sizeCtl;

基本屬性定義了ConcurrentHashMap的一些邊界以及操作時的一些控制,下面看一些內部的一些結構組成,這些是整個ConcurrentHashMap整個資料結構的核心

Node

Node是ConcurrentHashMap儲存結構的基本單元,繼承於HashMap中的Entry,用於儲存資料,原始碼如下


  
  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. //連結串列的資料結構
  3. final int hash;
  4. final K key;
  5. //val和next都會在擴容時發生變化,所以加上volatile來保持可見性和禁止重排序
  6. volatile V val;
  7. volatile Node<K,V> next;
  8. Node( int hash, K key, V val, Node<K,V> next) {
  9. this.hash = hash;
  10. this.key = key;
  11. this.val = val;
  12. this.next = next;
  13. }
  14. public final K getKey() { return key; }
  15. public final V getValue() { return val; }
  16. public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
  17. public final String toString(){ return key + "=" + val; }
  18. //不允許更新value
  19. public final V setValue(V value) {
  20. throw new UnsupportedOperationException();
  21. }
  22. public final boolean equals(Object o) {
  23. Object k, v, u; Map.Entry<?,?> e;
  24. return ((o instanceof Map.Entry) &&
  25. (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
  26. (v = e.getValue()) != null &&
  27. (k == key || k.equals(key)) &&
  28. (v == (u = val) || v.equals(u)));
  29. }
  30. //用於map中的get()方法,子類重寫
  31. Node<K,V> find(int h, Object k) {
  32. Node<K,V> e = this;
  33. if (k != null) {
  34. do {
  35. K ek;
  36. if (e.hash == h &&
  37. ((ek = e.key) == k || (ek != null && k.equals(ek))))
  38. return e;
  39. } while ((e = e.next) != null);
  40. }
  41. return null;
  42. }
  43. }

Node資料結構很簡單,從上可知,就是一個連結串列,但是隻允許對資料進行查詢,不允許進行修改

TreeNode

TreeNode繼承與Node,但是資料結構換成了二叉樹結構,它是紅黑樹的資料的儲存結構,用於紅黑樹中儲存資料,當連結串列的節點數大於8時會轉換成紅黑樹的結構,他就是通過TreeNode作為儲存結構代替Node來轉換成黑紅樹原始碼如下


  
  1. static final class TreeNode<K,V> extends Node<K,V> {
  2. //樹形結構的屬性定義
  3. TreeNode<K,V> parent; // red-black tree links
  4. TreeNode<K,V> left;
  5. TreeNode<K,V> right;
  6. TreeNode<K,V> prev; // needed to unlink next upon deletion
  7. boolean red; //標誌紅黑樹的紅節點
  8. TreeNode( int hash, K key, V val, Node<K,V> next,
  9. TreeNode<K,V> parent) {
  10. super(hash, key, val, next);
  11. this.parent = parent;
  12. }
  13. Node<K,V> find(int h, Object k) {
  14. return findTreeNode(h, k, null);
  15. }
  16. //根據key查詢 從根節點開始找出相應的TreeNode,
  17. final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
  18. if (k != null) {
  19. TreeNode<K,V> p = this;
  20. do {
  21. int ph, dir; K pk; TreeNode<K,V> q;
  22. TreeNode<K,V> pl = p.left, pr = p.right;
  23. if ((ph = p.hash) > h)
  24. p = pl;
  25. else if (ph < h)
  26. p = pr;
  27. else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
  28. return p;
  29. else if (pl == null)
  30. p = pr;
  31. else if (pr == null)
  32. p = pl;
  33. else if ((kc != null ||
  34. (kc = comparableClassFor(k)) != null) &&
  35. (dir = compareComparables(kc, k, pk)) != 0)
  36. p = (dir < 0) ? pl : pr;
  37. else if ((q = pr.findTreeNode(h, k, kc)) != null)
  38. return q;
  39. else
  40. p = pl;
  41. } while (p != null);
  42. }
  43. return null;
  44. }
  45. }

TreeBin

TreeBin從字面含義中可以理解為儲存樹形結構的容器,而樹形結構就是指TreeNode,所以TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制,部分原始碼結構如下


  
  1. static final class TreeBin<K,V> extends Node<K,V> {
  2. //指向TreeNode列表和根節點
  3. TreeNode<K,V> root;
  4. volatile TreeNode<K,V> first;
  5. volatile Thread waiter;
  6. volatile int lockState;
  7. // 讀寫鎖狀態
  8. static final int WRITER = 1; // 獲取寫鎖的狀態
  9. static final int WAITER = 2; // 等待寫鎖的狀態
  10. static final int READER = 4; // 增加資料時讀鎖的狀態
  11. /**
  12. * 初始化紅黑樹
  13. */
  14. TreeBin(TreeNode<K,V> b) {
  15. super(TREEBIN, null, null, null);
  16. this.first = b;
  17. TreeNode<K,V> r = null;
  18. for (TreeNode<K,V> x = b, next; x != null; x = next) {
  19. next = (TreeNode<K,V>)x.next;
  20. x.left = x.right = null;
  21. if (r == null) {
  22. x.parent = null;
  23. x.red = false;
  24. r = x;
  25. }
  26. else {
  27. K k = x.key;