併發程式設計---ConcurrentHashMap原始碼解析
阿新 • • 發佈:2018-11-28
ConcurrentHashMap是java中為了解決HashMap不能支援高併發而設計的新的實現。
ConcurrentHashMap的類結構
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
......
}
ConcurrentHashMap的主要成員變數
//容量最大值 private static final int MAXIMUM_CAPACITY = 1 << 30; //預設容量大小 private static final int DEFAULT_CAPACITY = 16; //陣列容量的最大值 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //預設的併發數 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //負載因子 private static final float LOAD_FACTOR = 0.75f; //由連結串列轉為紅黑樹的閾值 static final int TREEIFY_THRESHOLD = 8; //由紅黑樹轉為連結串列的閾值 static final int UNTREEIFY_THRESHOLD = 6; //轉換為紅黑樹的最小容量 static final int MIN_TREEIFY_CAPACITY = 64; //每次進行轉移的最小值 private static final int MIN_TRANSFER_STRIDE = 16; //生成sizeCtl所使用的最小bit位數 private static int RESIZE_STAMP_BITS = 16; //進行擴容鎖需要的最大執行緒數 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //記錄sizeCtl的大小所需要進行的偏移位數 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //標識 static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash /** Number of CPUS, to place bounds on some sizings */ //cpu的個數 static final int NCPU = Runtime.getRuntime().availableProcessors(); //儲存元素的陣列 transient volatile Node<K,V>[] table; //擴容時新生成的陣列,用於下一個存放元素的陣列,其大小為原陣列的兩倍 private transient volatile Node<K,V>[] nextTable; //基本計數 private transient volatile long baseCount; /** * hash表初始化或擴容時的一個控制位標識量。 * 負數代表正在進行初始化或擴容操作 * -1代表正在初始化 * -N 表示有N-1個執行緒正在進行擴容操作 * 正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小 */ private transient volatile int sizeCtl; //擴容下另一個表的索引 private transient volatile int transferIndex; // private transient volatile int cellsBusy; // private transient volatile CounterCell[] counterCells; //以下是通過sun.misc.Unsafe的objectFieldOffset方法獲取成員變數在class域中的偏移值 private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
ConcurrentHashMap中的主要內部類
Node:
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
由於這裡Node和HashMap中的Node基本一致,所以不再贅述。
ForwardingNode:繼承Node節點,hash值為-1,其中儲存nextTable的引用。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
ReservationNode:繼承於Node,雜湊值為-3。
static final class ReservationNode<K,V> extends Node<K,V> { ReservationNode() { super(RESERVED, null, null, null); } Node<K,V> find(int h, Object k) { return null; } }
ConcurrentHashMap的主要建構函式
//設定容量值的建構函式
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//根據容量值計算sizeCtl
//MAXIMUM_CAPACITY >>> 1 允許的最大容量值無符號右移一位,(1 << 30)>>>1
//如果引數容量值大於等於引數容量值,sizeCtl直接為允許的最大容量值
//否則,initialCapacity + (initialCapacity >>> 1) + 1,大概為initialCapacity 的1.5倍
//tableSizeFor方法咱們再HashMap中分析過了,是取大於引數的最小二次冪,比如引數為15,結果就為16
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//設定sizeCtl值
this.sizeCtl = cap;
}
//構造引數為Map的構造方法
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
//sizeCtl直接為預設容量值
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//設定容量值、負載因子的建構函式
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//設定容量值、負載因子、併發等級的建構函式
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
//如果容量值小於併發等級,則將容量值設定成併發等級,也就是容量值不能小於併發等級
initialCapacity = concurrencyLevel; // as estimated threads
//容量值除以負載因子,可以將容量值看成閾值,然後反推容量值,這裡size就是反推的容量值
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//根據容量值計算sizeCtl,該方法上面已經分析過了
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
我們發現建構函式中都只是對sizeCtl進行了初始化,其餘成員變數,比如table陣列,均沒有初始化,而是等到第一次put操作時進行初始化。
ConcurrentHashMap的主要方法
我們先來看看ConcurrentHashMap的一些基礎方法。
雜湊計算:int spread(int h):對key的hashCode值進行雜湊計算。
//對key值的hashCode值進行雜湊
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
原子操作方法:tabAt、casTabAt、setTabAt。這是三個原子操作,用於對指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的執行緒安全。
//獲得陣列table中在i位置上的Node節點
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
//這裡U就是UnSafe類的例項
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS演算法設定i位置上的Node節點。之所以能實現併發是因為他指定了原來這個節點的值是多少
//在CAS演算法中,會比較記憶體中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改
//因此當前執行緒中的值並不是最新的值,這種修改可能會覆蓋掉其他執行緒的修改結果,ABA問題
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法設定陣列table中位置為i的node
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
初始化陣列:Node<K,V>[] initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
獲取:V get(Object key)
public V get(Object key) {
//定義兩個型別為Node陣列的區域性變數tab和e
//定義兩個int型別的區域性變數n、eh
//定義型別為泛型的區域性變數ek
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//對key的hashCode進行雜湊計算
int h = spread(key.hashCode());
//將當前node陣列table賦值給tab,將node陣列長度賦值給n
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}