[技術分享]-ConcurrentHashMap在jdk1.8中的改進
一、簡單回顧ConcurrentHashMap在jdk1.7中的設計
- 與Hashtable不同的是,ConcurrentHashMap使用的是分段鎖技術,將ConcurrentHashMap容器的資料分段儲存,每一段資料分配一個Segment,當執行緒佔用一個Segment時,其他執行緒可以訪問其他段的資料.(每個segment都是一個鎖). 與hashtable相比,這麼設計的目的是對於put, remove等操作,可以減少併發衝突,對不屬於同一個片段的節點可以併發操作,大大提高了效能.
Segment : 可重入鎖(在JAVA環境下 ReentrantLock 和synchronized 都是 可重入鎖),繼承ReentrantLock, 也稱之為桶( 本質上Segment類就是一個小的hashmap,裡面table陣列儲存了各個節點的資料,繼承了ReentrantLock, 可以作為互斥鎖使用 )
HashEntry : 主要儲存鍵值對, 這裡也可以叫節點
HashEntry原始碼:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
其中,volatile關鍵字保證了多執行緒讀取的時候一定是最新值。
ConcurrentHashMap包含一個Segment陣列,每個Segment包含一個HashEntry陣列,當修改HashEntry陣列,採用開鏈法處理衝突,所以它的每個HashEntry元素又是連結串列結構的元素。
ConcurrentHashMap構造方法:
整個初始化是通過引數initialCapacity(初始容量),loadFactor(增長因子)和concurrencyLevel(併發等級)來初始化segmentShift(段偏移量)、segmentMask(段掩碼)和segment陣列。注意以下兩點:public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; //1 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; //2 } this.segmentShift = 32 - sshift; //3 this.segmentMask = ssize - 1; //4 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);//5 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //6 UNSAFE.putOrderedObject(ss, SBASE, s0); this.segments = ss; }
1: 最大的併發等級不能超過MAX_SEGMENTS 1<<16(65535),(如果你傳入的是15 就是向上取2的4次方倍 也就是16.
2:segmentShift和segmentMask在定位segment使用,segmentShift = 32 - ssize向左移位的次數,segmentMask = ssize - 1。ssize的最大長度是65536,對應的 segmentShift最大值為16,segmentMask最大值是65535,對應的二進位制16位全為1;
初始化segment: 1:初始化每個segment的HashEntry長度;2:建立segment陣列和segment[0]。注:HashEntry長度cap同樣也是2的N次方,預設情況,ssize = 16,initialCapacity = 16,loadFactor = 0.75f,那麼cap = 1,threshold = (int) cap * loadFactor = 0。
ConcurrentHashMap Get操作
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key); //1
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && //2
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
根據key計算hash值;根據計算出的hash值定位segment 如果segment不為null segment.table也不為null 跳轉進裡面的迴圈( 通過hash值定位segment中對應的HashEntry 遍歷HashEntry,如果key存在,返回key對應的value 如果不存在則返回null )
ConcurrentHashMap Put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
1.判斷值是否為null2.計算hash值
3.定位segment 如果不存在,則建立
4.呼叫segment的put方法
Segment的put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value); //1
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index); //2
for (HashEntry<K,V> e = first;;) { //3
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
1. 獲取鎖 ,保證執行緒安全2. 定位到具體的HashEntry
3. 遍歷HashEntry連結串列,如果key已存在 再判斷傳入的onlyIfAbsent的值 ,再決定是否覆蓋舊值.
4. 最後釋放鎖,返回舊值.
二、ConcurrentHashMap在jdk1.8中做了兩方面的改進
改進一:取消segments欄位,直接採用transient volatile HashEntry<K,V>[] table儲存資料,採用table陣列元素作為鎖,從而實現了對每一行資料進行加鎖,進一步減少併發衝突的概率。
改進二:將原先table陣列+單向連結串列的資料結構,變更為table陣列+單向連結串列+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之後能均勻的分佈在陣列中。如果hash之後雜湊的很均勻,那麼table陣列中的每個佇列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類預設的載入因子為0.75,但是在資料量過大或者運氣不佳的情況下,還是會存在一些佇列長度過長的情況,如果還是採用單向列表方式,那麼查詢某個節點的時間複雜度為O(n);因此,對於個數超過8(預設值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度可以降低到O(logN),以此改進效能。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table為空,初始化;否則,根據hash值計算得到陣列索引i,如果tab[i]為空,直接新建節點Node即可。注:tab[i]實質為連結串列或者紅黑樹的首節點。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 如果tab[i]不為空並且hash值為MOVED,說明該連結串列正在進行transfer操作,返回擴容完成後的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 針對首個節點進行加鎖操作,而不是segment,進一步減少執行緒衝突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在連結串列中找到值為key的節點e,直接設定e.val = value即可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果沒有找到值為key的節點,直接新建Node並加入連結串列即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首節點為TreeBin型別,說明為紅黑樹結構,執行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果節點數>=8,那麼轉換連結串列結構為紅黑樹結構。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 計數增加1,有可能觸發transfer操作(擴容)。
addCount(1L, binCount);
return null;
}