【Java併發程式設計】深入分析ConcurrentHashMap(九)
本章是提高教程可能對於剛入門同學來說會有些難度,讀懂本章你需要了解以下知識點:
一、Concurrent原始碼分析
ConcurrentHashMap是由Segment(桶)、HashEntry(節點)2大資料結構組成。如下圖所示:
1.1 Segment類和屬性
//Segment內部維護了一個連結串列陣列 static final class Segment<K,V> extends ReentrantLock implements Serializable { //連結串列陣列,陣列中的每一個元素代表了一個連結串列的頭部 transient volatile HashEntry<K,V>[] table; //Segment中元素的數量 transient int count; //對table的大小造成影響的操作的數量(比如put或者remove操作) transient int modCount; //閾值,Segment裡面元素的數量超過這個值會對Segment進行擴容,擴容後大小=old*2*負載因子 transient int threshold; //負載因子,用於確定threshold final float loadFactor; }
Segment繼承了ReentrantLock,這意味著每個segment都可以當做一個鎖,每一把鎖只鎖住整個容器中的部分資料,這樣不影響執行緒訪問其它資料,當然如果是對全域性改變時會鎖定所有的segment段。比如:size()和containsValue(),注意的是要按順序鎖定所有段,操作完畢後,再按順序釋放所有段的鎖。如果不按順序的話,有可能會出現死鎖。
1.2 HashEntry類和屬性
//HashEntry是一個單向連結串列 static final class HashEntry<K,V> { //雜湊值 final int hash; //儲存的key和值value final K key; volatile V value; //指向的下一個HashEntry,即連結串列的下一個節點 volatile HashEntry<K,V> next; }
類似與HashMap節點Entry,HashEntry也是一個單向連結串列,它包含了key、hash、value和下一個節點資訊。HashEntry和Entry的不同點:
不同點一:使用了多個final關鍵字(final class 、final hash) ,這意味著不能從連結串列的中間或尾部新增或刪除節點,後面刪除操作時會講到。
不同點二:使用volatile,是為了更新值後能立即對其它執行緒可見。這裡沒有使用鎖,效率更高。
1.3 類的初始化
/** * * @param initialCapacity 初始容量 * @param loadFactor 負載因子 * @param concurrencyLevel 代表ConcurrentHashMap內部的Segment的數量, * ConcurrentLevel 併發級別 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1;//ssize左移一位也就是每次ssize=2*ssize。 } //主要使用於put()和segmentForHash()方法,結合hash計算出元素在哪一個Segment中。 //假如concurrencyLevel是16,那麼sshift=4、segmentShift=28、segmentMask=15; this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
ConcurrencyLevel預設情況下內部按併發級別為16來建立。對於每個segment的容量,預設情況也是16。其中concurrentLevel和segment的初始容量都是可以通過建構函式設定的。要注意的是ConcurrencyLevel一經指定,不可改變,後續如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中連結串列陣列的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment裡面的元素做一次rehash就可以了。
1.4 ensureSegment()方法
該方法返回給定索引位置的Segment,如果Segment不存在,則參考Segment表中的第一個Segment的引數建立一個Segment並通過CAS操作將它記錄到Segment表中去。 private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
1.5 entryAt()方法
entryAt()方法是從連結串列中查詢節點。在方法引數裡注意到有傳入tab連結串列陣列和index索引,那為什麼還要呼叫entryAt()方法獲取陣列項的值而不是通過tab[index]方式直接獲取?那我們從源頭(put)開始分析,見1.6put()操作。static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
1.6 put()操作
1.6.1 鎖分離技術
大家知道HashTable是使用了synchronized來保證執行緒安全,但是其效率非常差。它效率非常差的原因是多個執行緒訪問HashTable時需要競爭同一把鎖,如果我們有多把鎖,每一把鎖只鎖住一部分資料,那麼多執行緒在訪問不同的資料時也就不會存在競爭,能提高訪問效率。這種做法我們稱為鎖分離技術。在《Java併發程式設計實戰》一書中作者提到過分拆鎖和分離鎖技術:分拆鎖(lock spliting)就是若原先的程式中多處邏輯都採用同一個鎖,但各個邏輯之間又相互獨立,就可以拆(Spliting)為使用多個鎖,每個鎖守護不同的邏輯。
分拆鎖有時候可以被擴充套件,分成可大可小加鎖塊的集合,並且它們歸屬於相互獨立的物件,這樣的情況就是分離鎖(lock striping)。
而ConcurrentHashMap就是使用了分離鎖技術,對每個Segment配置一把鎖,如下圖所示:
1.6.2 原始碼分析
Segment的put操作原理如下圖所示,圖中展示的不是很詳細,其中關於加鎖的步驟沒有加上去,原因是加了幾次覺得加鎖後看著很複雜。用圖片展示是為了更加簡單和明瞭,如果看著複雜也就沒有意義了,我儘量用文字說清楚它的步驟。
步驟一:進入Segment的put操作時先進行加鎖保護。如果加鎖沒有成功,呼叫scanAndLockForPut方法(詳細步驟見下面scanAndLockForPut()原始碼分析)進入自旋狀態,該方法持續查詢key對應的節點鏈中是已存在該機節點,如果沒有找到,則預建立一個新節點,並且嘗試n次,直到嘗試次數操作限制,才真正進入加鎖等待狀態,自旋結束並返回節點(如果返回了一個非空節點,則表示在連結串列中沒有找到相應的節點)。對最大嘗試次數,目前的實現單核次數為1,多核為64。
步驟二:使用(tab.length - 1) & hash計算第一個節點位置,再通過entryAt()方法去查詢第一個節點。如果節點存在,遍歷連結串列找到key值所在的節點,如果找到了這個節點則直接更新舊value,結束迴圈。其中value使用了volatile,它更新後的值立馬對其它執行緒可見。如果節點不存在,將步驟一預建立的新節點(如果沒有則重新建立)新增到連結串列中,新增前先檢查新增後節點數量是否超過容器大小,如果超過了,則rehash操作。沒有的話呼叫setNext或setEntryAt方法新增新節點;
要注意的是在更新連結串列時使用了Unsafe.putOrderedObject()方法,這個方法能夠實現非堵塞的寫入,這些寫入不會被Java的JIT重新排序指令(instruction reordering),使得它能更加快速的儲存。
解決1.5問題:為什麼還要呼叫entryAt()方法獲取陣列項的值而不是通過tab[index]方式直接獲取?
雖然在開始時volatile table將引用賦值給了變數tab,但是多執行緒下table裡的值可能發生改變,使用tab[index]並不能獲得最新的值。。為了保證接下來的put操作能夠讀取到上一次的更新結果,需要使用volatile的語法去讀取節點鏈的鏈頭.
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
//計算Segment的位置,在初始化的時候對segmentShift和segmentMask做了解釋
int j = (hash >>> segmentShift) & segmentMask;
//從Segment陣列中獲取segment元素的位置
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//
return s.put(key, hash, value, false);
}
//往Segment的HashEntry中新增元素,使用了分鎖機制
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock 僅在呼叫時鎖為空閒狀態才獲取該鎖。如果鎖可用,則獲取鎖,並立即返回值 true。否則是false
//scanAndLockForPut 下面單獨說scanAndLockForPut
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
1.5 segmentForHash()方法
/**
* 查詢Segment物件,這裡Unsafe的主要作用是提供原子操作。
*/
@SuppressWarnings("unchecked")
private Segment<K,V> segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}
1.6 scanAndLockForPut()方法
在下面程式碼中,它先獲取key對應的頭節點,進入連結串列迴圈。如果連結串列中不存在要插入的節點,則預建立一個新節點,否則retries值遞增,直到操作最大嘗試次數而進入等待狀態。這個方法要注意的是:當在自旋過程中發現連結串列鏈頭髮生了變化,則更新節點鏈的鏈頭,並重置retries值為-1,重新為嘗試獲取鎖而自旋遍歷。private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//第一步:先找到HashEntry連結串列中的頭節點
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//第一次一定執行該條件內程式碼
if (retries < 0) {
//第二步:分三種情景
//情景一:沒有找到,建立一個新的節點。
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
//情景二:找到相同key的節點
else if (key.equals(e.key))
retries = 0;
else
//情景三:沒找到key值對應的節點,指向下一個節點繼續
e = e.next;
}
//嘗試次數達到限制進入加鎖等待狀態。 對最大嘗試次數,目前的實現單核次數為1,多核為64:
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//retries是偶數並且不是頭節點。在自旋中鏈頭可能會發生變化
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
1.7 get()操作
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
從程式碼可以看出get方法並沒有呼叫鎖,它使用了volatile的可見性來實現執行緒安全的。參考資料