【搞定Java併發程式設計】第21篇:Java併發容器之ConcurrentHashMap詳解
上一篇:讀寫鎖 --- ReentrantReadWriteLock詳解
本文目錄:
2.1、ConcurrentHashMap中主要的成員變數、成員方法和內部類
首先推薦下幾篇不錯的文章:
1、Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
這篇文章要好好讀一讀,全文主要包括四個部分:Java7中的HashMap和ConcurrentHashMap以及Java8中的HashMap和ConcurrentHashMap,能夠很清晰的認識到HashMap和ConcurrentHashMap之間的區別、Java8在哪些地方做出了改進。
這篇文章也是本文主要參考的文章,全文分析了Java7中主要ConcurrentHashMap部分。
本文主要偏重於講ConcurrentHashMap的原始碼分析,至於它和HashMap的對比,後面複習Java基礎集合容器時會專門的再進行講解。這裡不再做過多的贅述,但是這兩者之間的對比是很重要的。另外會再寫文章對Java1.7中的ConcurrentHashMap和Java1.8中的ConcurrentHashMap的區別進行講解:
1、為什麼要使用ConcurrentHashMap?
在併發程式設計中使用HashMap可能導致程式死迴圈。而使用執行緒安全的HashTable效率又非常低下,基於以上兩個原因,便有了ConcurrentHashMap的登場機會。
- 1、執行緒不安全的HashMap
HashMap在併發執行put操作時會發生死迴圈,是因為多執行緒會導致HashMap的Entry連結串列形成環型資料結構,一旦形成環型資料結構,Entry的next節點永遠不為空,就會產生死迴圈獲取Entry。
- 2、效率低下的HashTable
HashTable容器使用synchronized來保證執行緒安全,但線上程競爭激烈的情況下HashTable的效率會非常低下。因為當一個執行緒訪問HashTable的同步方法,其他執行緒也訪問HashTable的同步方法時,會進入阻塞或者輪詢狀態。如執行緒1使用put進行元素的新增,執行緒2不但不能使用put方法新增元素,也不能使用get方法來獲取元素,所以競爭越激烈效率越低。
- 3、ConcurrentHashMap的鎖分段機制可有效提高併發訪問率
HashTable容器在競爭激烈的情況下表現出效率低下的原因是所有訪問HashTable的執行緒都必須競爭同一把鎖,假如容器裡有多把鎖,每一把鎖用於鎖容器其中一部分資料,那麼當多執行緒訪問容器裡不同資料段的資料時,執行緒間就不會存在鎖競爭,從而可以有效提高併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。首先將資料分成一段一段地儲存,然後給每一段資料配一把鎖,當一個執行緒佔用鎖訪問其中一個段資料的時候,其他段的資料也能被其他執行緒訪問。
2、ConcurrentHashMap的實現
ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成的。
Segment是一種可重入鎖(ReentranLock),在ConcurrentHashMap中扮演鎖的角色。
HashEntry則用於儲存鍵值對資料。
一個ConcurrentHashMap包含一個Segment陣列,其結構是:陣列+連結串列的形式。一個Segment裡包含一個HashEntry陣列,每個HashEntry是一個連結串列結構的元素。每個Segment守護著一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得與它對應的Segment鎖。
看下ConcurrentHashMap原始碼中(JDK1.7)主要的成員變數、方法和內部類:
2.1、ConcurrentHashMap中主要的成員變數、成員方法和內部類
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
// 預設初始化容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 預設載入因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 預設併發級別
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 集合最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 分段鎖的最小數量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 分段鎖的最大數量
static final int MAX_SEGMENTS = 1 << 16;
// 加鎖前的重試次數
static final int RETRIES_BEFORE_LOCK = 2;
// 分段鎖的掩碼值
final int segmentMask;
// 分段鎖的移位值
final int segmentShift;
// 分段鎖陣列
final Segment<K,V>[] segments;
private static class Holder {
// ...
}
static final class HashEntry<K,V> {
// ...
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
// ...
}
// 獲取ConcurrentHashMap中元素的個數
public int size() {}
// 獲取元素
public V get(Object key) {}
// 設定元素
public V put(K key, V value) {}
// ...
}
其中Segment陣列代表分段鎖集合;併發級別則代表分段鎖的數量(也意味有多少執行緒可以同時操作);初始化容量代表整個容器的容量;載入因子代表容器元素可以達到多滿的一種程度。
2.2、分段鎖的內部結構
Segment是ConcurrentHashMap的內部類,它繼承了ReentrantLock。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
// 分段鎖
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// 自旋最大次數
static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
// 雜湊表
transient volatile HashEntry<K,V>[] table;
// 元素總數
transient int count;
// 修改次數
transient int modCount;
// 元素閥值
transient int threshold;
// 載入因子
final float loadFactor;
// 建構函式
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
// 省略以下內容
...
}
}
Segment是ConcurrentHashMap的靜態內部類,可以看到它繼承自ReentrantLock,因此它在本質上是一個鎖。它在內部持有一個HashEntry陣列(雜湊表),並且保證所有對該陣列的增刪改查方法都是執行緒安全的,具體是怎樣實現的後面會講到。
所有對ConcurrentHashMap的增刪改查操作都可以委託Segment來進行,因此ConcurrentHashMap能夠保證在多執行緒環境下是安全的。又因為不同的Segment是不同的鎖,所以多執行緒可以同時操作不同的Segment,也就意味著多執行緒可以同時操作ConcurrentHashMap,這樣就能避免HashTable的缺陷,從而極大的提高效能。
2.3、ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通過 initialCapacity、loadFactor 和 concurrencyLevel等幾個引數來初始化 segment陣列、段偏移量segmentShift、段掩碼 segmentMask 和每個 segment 裡的 HashEntry 陣列。
// 核心構造器
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
// 確保併發級別不大於限定值
if (concurrencyLevel > MAX_SEGMENTS) {
concurrencyLevel = MAX_SEGMENTS;
}
int sshift = 0;
int ssize = 1;
// 保證ssize為2的冪, 且是最接近的大於等於併發級別的數
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 計算分段鎖的移位值
this.segmentShift = 32 - sshift;
// 計算分段鎖的掩碼值
this.segmentMask = ssize - 1;
// 總的初始容量不能大於限定值
if (initialCapacity > MAXIMUM_CAPACITY) {
initialCapacity = MAXIMUM_CAPACITY;
}
// 獲取每個分段鎖的初始容量
int c = initialCapacity / ssize;
// 分段鎖容量總和不小於初始總容量
if (c * ssize < initialCapacity) {
++c;
}
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 保證cap為2的冪, 且是最接近的大於等於c的數
while (cap < c) {
cap <<= 1;
}
// 新建一個Segment物件模版
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
// 新建指定大小的分段鎖陣列
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 使用UnSafe給陣列第0個元素賦值
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
ConcurrentHashMap有多個構造器,但是上面貼出的是它的核心構造器,其他構造器都通過呼叫它來完成初始化。核心構造器需要傳入三個引數,分別是初始容量,載入因子和併發級別。
在前面介紹成員變數時我們可以知道預設的初始容量為16,載入因子為0.75f,併發級別為16。現在我們看到核心構造器的程式碼,首先是通過傳入的concurrencyLevel來計算出ssize,ssize是Segment陣列的長度,它必須保證是2的冪,這樣就可以通過hash&ssize-1來計算分段鎖在陣列中的下標。
由於傳入的concurrencyLevel不能保證是2的冪,所以不能直接用它來當作Segment陣列的長度,因此我們要找到一個最接近concurrencyLevel的2的冪,用它來作為陣列的長度。假如現在傳入的concurrencyLevel=15,通過上面程式碼可以計算出ssize=16,sshift=4。
接下來立馬可以算出segmentShift=16,segmentMask=15。注意這裡的segmentShift是分段鎖的移位值,segmentMask是分段鎖的掩碼值,這兩個值是用來計算分段鎖在陣列中的下標,在下面我們會講到。在算出分段鎖的個數ssize之後,就可以根據傳入的總容量來計算每個分段鎖的容量,它的值c = initialCapacity / ssize。
分段鎖的容量也就是HashEntry陣列的長度,同樣也必須保證是2的冪,而上面算出的c的值不能保證這一點,所以不能直接用c作為HashEntry陣列的長度,需要另外找到一個最接近c的2的冪,將這個值賦給cap,然後用cap來作為HashEntry陣列的長度。現在我們有了ssize和cap,就可以新建分段鎖陣列Segment[]和元素陣列HashEntry[]了。
注意,與JDK1.6不同是的,在JDK1.7中只新建了Segment陣列,並沒有對它初始化,初始化Segment的操作留到了插入操作時進行。
2.4、如何定位Segment(鎖)和元素
主要通過segmentForHash方法獲取分段鎖的位置,再根據entryForHash方法獲取元素的位置。
// 根據雜湊碼獲取分段鎖
@SuppressWarnings("unchecked")
private Segment<K,V> segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}
// 根據雜湊碼獲取元素
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
HashEntry<K,V>[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}
在JDK1.7中是通過UnSafe來獲取陣列元素的,因此這裡比JDK1.6多了些計算陣列元素偏移量的程式碼,這些程式碼我們暫時不關注,現在我們只需知道下面這兩點:
a. 通過雜湊碼計算分段鎖在陣列中的下標:(h >>> segmentShift) & segmentMask。
b. 通過雜湊碼計算元素在陣列中的下標:(tab.length - 1) & h。
現在我們假設傳給構造器的兩個引數為initialCapacity=128, concurrencyLevel=16。根據計算可以得到ssize=16, sshift=4,segmentShift=28,segmentMask=15。
同樣,算得每個分段鎖內的HashEntry陣列的長度為8,所以tab.length-1=7。根據這些值,我們通過下圖來解釋如何根據同一個雜湊碼來定位分段鎖和元素。
可以看到分段鎖和元素的定位都是通過元素的雜湊碼來決定的。定位分段鎖是取雜湊碼的高位值(從32位處取起),定位元素是取的雜湊碼的低位值。現在有個問題,它們一個從32位的左端取起,一個從32位的右端取起,那麼會在某個時刻產生衝突嗎?
我們在成員變數裡可以找到MAXIMUM_CAPACITY = 1 << 30,MAX_SEGMENTS = 1 << 16,這說明定位分段鎖和定位元素使用的總的位數不超過30,並且定位分段鎖使用的位數不超過16,所以至少還隔著2位的空餘,因此是不會產生衝突的。
2.5、查詢元素get操作
Segment的get操作實現非常簡單高效。先經過一次雜湊,然後使用這個雜湊值通過雜湊運算定位到Segment,再通過雜湊演算法定位到元素。共需要兩次雜湊運算。
get操作的高效在於整個get過程都不用加鎖,除非讀到的值是空才會加鎖重讀。get方法中使用的共享變數都定義成了volatile型別。
// 根據key獲取value
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
// 使用雜湊函式計算雜湊碼
int h = hash(key);
// 根據雜湊碼計算分段鎖的索引
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 獲取分段鎖和對應的雜湊表
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// 根據雜湊碼獲取連結串列頭節點, 再對連結串列進行遍歷
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// 根據key和hash找到對應元素後返回value值
if ((k = e.key) == key || (e.hash == h && key.equals(k))) {
return e.value;
}
}
}
return null;
}
在JDK1.6中分段鎖的get方法是通過下標來訪問陣列元素的,而在JDK1.7中是通過UnSafe的getObjectVolatile方法來讀取陣列中的元素。
為什麼要這樣做?
我們知道雖然Segment物件持有的HashEntry陣列引用是volatile型別的,但是陣列內的元素引用不是volatile型別的,因此多執行緒對陣列元素的修改是不安全的,可能會在陣列中讀取到尚未構造完成的物件。
在JDK1.6中是通過第二次加鎖讀取來保證安全的,而JDK1.7中通過UnSafe的getObjectVolatile方法來讀取同樣也是為了保證這一點。使用getObjectVolatile方法讀取陣列元素需要先獲得元素在陣列中的偏移量,在這裡根據雜湊碼計算得到分段鎖在陣列中的偏移量為u,然後通過偏移量u來嘗試讀取分段鎖。由於分段鎖陣列在構造時沒進行初始化,因此可能讀出來一個空值,所以需要先進行判斷。
在確定分段鎖和它內部的雜湊表都不為空之後,再通過雜湊碼讀取HashEntry陣列的元素,根據上面的結構圖可以看到,這時獲得的是連結串列的頭結點。之後再從頭到尾的對連結串列進行遍歷查詢,如果找到對應的值就將其返回,否則就返回null。以上就是整個查詢元素的過程。
2.6、插入元素put操作
由於put方法裡需要對共享變數進行寫操作,所以為了執行緒安全,在操作共享變數時必須加鎖。put方法首先定位到Segment,然後在Segment裡進行插入操作。插入操作需要經歷兩個步驟:
1、判斷是否需要對Segment裡的HashEntry陣列進行擴容;
2、定位新增元素的位置,然後將其放在HashEntry陣列中。
- 是否需要擴容?
在插入元素前會先判斷Segment裡的HashEntry陣列是否超過容量(threshold),如果超過閾值,則對陣列進行擴容。值得一提的是,Segment的擴容判斷比HashMap更恰當,因為HashMap是在插入元素後判斷是否已經到達容量的,如果達到了就進行擴容,但是很有可能擴容之後沒有新元素再插入,這時HashMap就進行了一次無效的擴容。
- 如何擴容?
在擴容的時候,首先會建立一個容量是原來容量兩倍的陣列,然後將原來數組裡的元素進行再雜湊後插入到新的數組裡。為了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment擴容。
// 向集合中新增鍵值對(若存在則替換)
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
// 傳入的value不能為空
if (value == null) throw new NullPointerException();
// 使用雜湊函式計算雜湊碼
int hash = hash(key);
// 根據雜湊碼計算分段鎖的下標
int j = (hash >>> segmentShift) & segmentMask;
// 根據下標去嘗試獲取分段鎖
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
// 獲得的分段鎖為空就去構造一個
s = ensureSegment(j);
}
// 呼叫分段鎖的put方法
return s.put(key, hash, value, false);
}
// 向集合新增鍵值對(不存在才新增)
@SuppressWarnings("unchecked")
public V putIfAbsent(K key, V value) {
Segment<K,V> s;
// 傳入的value不能為空
if (value == null) throw new NullPointerException();
// 使用雜湊函式計算雜湊碼
int hash = hash(key);
// 根據雜湊碼計算分段鎖的下標
int j = (hash >>> segmentShift) & segmentMask;
// 根據下標去嘗試獲取分段鎖
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
// 獲得的分段鎖為空就去構造一個
s = ensureSegment(j);
}
// 呼叫分段鎖的put方法
return s.put(key, hash, value, true);
}
ConcurrentHashMap中有兩個新增鍵值對的方法,通過put方法新增時如果存在則會進行覆蓋,通過putIfAbsent方法新增時如果存在則不進行覆蓋,這兩個方法都是呼叫分段鎖的put方法來完成操作,只是傳入的最後一個引數不同而已。在上面程式碼中我們可以看到首先是根據key的雜湊碼來計算出分段鎖在陣列中的下標,然後根據下標使用UnSafe類getObject方法來讀取分段鎖。
由於在構造ConcurrentHashMap時沒有對Segment陣列中的元素初始化,所以可能讀到一個空值,這時會先通過ensureSegment方法新建一個分段鎖。獲取到分段鎖之後再呼叫它的put方法完成新增操作,下面我們來看看具體是怎樣操作的。
// 新增鍵值對
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 嘗試獲取鎖, 若失敗則進行自旋
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 計算元素在陣列中的下標
int index = (tab.length - 1) & hash;
// 根據下標獲取連結串列頭結點
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
// 遍歷連結串列尋找該元素, 找到則進行替換
if (e != null) {
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
// 根據引數決定是否替換舊值
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
// 沒找到則在連結串列新增一個結點
} else {
// 將node結點插入連結串列頭部
if (node != null) {
node.setNext(first);
} else {
node = new HashEntry<K,V>(hash, key, value, first);
}
// 插入結點後將元素總是加1
int c = count + 1;
// 元素超過閥值則進行擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
rehash(node);
// 否則就將雜湊表指定下標替換為node結點
} else {
setEntryAt(tab, index, node);
}
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
為保證執行緒安全,分段鎖中的put操作是需要進行加鎖的,所以執行緒一開始就會去獲取鎖,如果獲取成功就繼續執行,若獲取失敗則呼叫scanAndLockForPut方法進行自旋,在自旋過程中會先去掃描雜湊表去查詢指定的key,如果key不存在就會新建一個HashEntry返回,這樣在獲取到鎖之後就不必再去新建了,為的是在等待鎖的過程中順便做些事情,不至於白白浪費時間,可見作者的良苦用心。
具體自旋方法我們後面再細講,現在先把關注點拉回來,執行緒在成功獲取到鎖之後會根據計算到的下標,獲取指定下標的元素。此時獲取到的是連結串列的頭結點,如果頭結點不為空就對連結串列進行遍歷查詢,找到之後再根據onlyIfAbsent引數的值決定是否進行替換。
如果遍歷沒找到就會新建一個HashEntry指向頭結點,此時如果自旋時建立了HashEntry,則直接將它的next指向當前頭結點,如果自旋時沒有建立就在這裡新建一個HashEntry並指向頭結點。
在向連結串列新增元素之後檢查元素總數是否超過閥值,如果超過就呼叫rehash進行擴容,沒超過的話就直接將陣列對應下標的元素引用指向新新增的node。setEntryAt方法內部是通過呼叫UnSafe的putOrderedObject方法來更改陣列元素引用的,這樣就保證了其他執行緒在讀取時可以讀到最新的值。
2.7、刪除元素的操作
// 刪除指定元素(找到對應元素後直接刪除)
public V remove(Object key) {
// 使用雜湊函式計算雜湊碼
int hash = hash(key);
// 根據雜湊碼獲取分段鎖的索引
Segment<K,V> s = segmentForHash(hash);
// 呼叫分段鎖的remove方法
return s == null ? null : s.remove(key, hash, null);
}
// 刪除指定元素(查詢值等於給定值才刪除)
public boolean remove(Object key, Object value) {
// 使用雜湊函式計算雜湊碼
int hash = hash(key);
Segment<K,V> s;
// 確保分段鎖不為空才呼叫remove方法
return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null;
}
ConcurrentHashMap提供了兩種刪除操作,一種是找到後直接刪除,一種是找到後先比較再刪除。這兩種刪除方法都是先根據key的雜湊碼找到對應的分段鎖後,再通過呼叫分段鎖的remove方法完成刪除操作。下面我們來看看分段鎖的remove方法。
// 刪除指定元素
final V remove(Object key, int hash, Object value) {
// 嘗試獲取鎖, 若失敗則進行自旋
if (!tryLock()) {
scanAndLock(key, hash);
}
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
// 計算元素在陣列中的下標
int index = (tab.length - 1) & hash;
// 根據下標取得陣列元素(連結串列頭結點)
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
// 遍歷連結串列尋找要刪除的元素
while (e != null) {
K k;
// next指向當前結點的後繼結點
HashEntry<K,V> next = e.next;
// 根據key和hash尋找對應結點
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
V v = e.value;
// 傳入的value不等於v就跳過, 其他情況就進行刪除操作
if (value == null || value == v || value.equals(v)) {
// 如果pred為空則代表要刪除的結點為頭結點
if (pred == null) {
// 重新設定連結串列頭結點
setEntryAt(tab, index, next);
} else {
// 設定pred結點的後繼為next結點
pred.setNext(next);
}
++modCount;
--count;
// 記錄元素刪除之前的值
oldValue = v;
}
break;
}
// 若e不是要找的結點就將pred引用指向它
pred = e;
// 檢查下一個結點
e = next;
}
} finally {
unlock();
}
return oldValue;
}
在刪除分段鎖中的元素時需要先獲取鎖,如果獲取失敗就呼叫scanAndLock方法進行自旋,如果獲取成功就執行下一步。首先計算陣列下標,然後通過下標獲取HashEntry陣列的元素,這裡獲得了連結串列的頭結點。接下來就是對連結串列進行遍歷查詢,在此之前先用next指標記錄當前結點的後繼結點,然後對比key和hash看看是否是要找的結點,如果是的話就執行下一個if判斷。
滿足value為空或者value的值等於結點當前值這兩個條件就會進入到if語句中進行刪除操作,否則直接跳過。在if語句中執行刪除操作時會有兩種情況,如果當前結點為頭結點則直接將next結點設定為頭結點,如果當前結點不是頭結點則將pred結點的後繼設定為next結點。
這裡的pred結點表示當前結點的前繼結點,每次在要檢查下一個結點之前就將pred指向當前結點,這就保證了pred結點總是當前結點的前繼結點。注意,與JDK1.6不同,在JDK1.7中HashEntry物件的next變數不是final的,因此這裡可以通過直接修改next引用的值來刪除元素,由於next變數是volatile型別的,所以讀執行緒可以馬上讀到最新的值。
2.8、替換元素的操作
// 替換指定元素(CAS操作)
public boolean replace(K key, V oldValue, V newValue) {
// 使用雜湊函式計算雜湊碼
int hash = hash(key);
// 保證oldValue和newValue不為空
if (oldValue == null || newValue == null) throw new NullPointerException();
// 根據雜湊碼獲取分段鎖的索引
Segment<K,V> s = segmentForHash(hash);
// 呼叫分段鎖的replace方法
return s != null && s.replace(key, hash, oldValue, newValue);
}
// 替換元素操作(CAS操作)
final boolean replace(K key, int hash, V oldValue, V newValue) {
// 嘗試獲取鎖, 若失敗則進行自旋
if (!tryLock()) {
scanAndLock(key, hash);
}
boolean replaced = false;
try {
HashEntry<K,V> e;
// 通過hash直接找到頭結點然後對連結串列遍歷
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
// 根據key和hash找到要替換的結點
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
// 如果指定的當前值正確則進行替換
if (oldValue.equals(e.value)) {
e.value = newValue;
++modCount;
replaced = true;
}
// 否則不進行任何操作直接返回
break;
}
}
} finally {
unlock();
}
return replaced;
}
ConcurrentHashMap同樣提供了兩種替換操作,一種是找到後直接替換,另一種是找到後先比較再替換(CAS操作)。這兩種操作的實現大致是相同的,只是CAS操作在替換前多了一層比較操作,因此我們只需簡單瞭解其中一種操作即可。
這裡拿CAS操作進行分析,還是老套路,首先根據key的雜湊碼找到對應的分段鎖,然後呼叫它的replace方法。進入分段鎖中的replace方法後需要先去獲取鎖,如果獲取失敗則進行自旋,如果獲取成功則進行下一步。
首先根據hash碼獲取連結串列頭結點,然後根據key和hash進行遍歷查詢,找到了對應的元素之後,比較給定的oldValue是否是當前值,如果不是則放棄修改,如果是則用新值進行替換。由於HashEntry物件的value域是volatile型別的,因此可以直接替換。
2.9、自旋操作
// 自旋等待獲取鎖(put操作)
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 根據雜湊碼獲取頭結點
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1;
// 在while迴圈內自旋
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
// 如果頭結點為空就新建一個node
if (e == null) {
if (node == null) {
node = new HashEntry<K,V>(hash, key, value, null);
}
retries = 0;
// 否則就遍歷連結串列定位該結點
} else if (key.equals(e.key)) {
retries = 0;
} else {
e = e.next;
}
// retries每次在這加1, 並判斷是否超過最大值
} else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
// retries為偶數時去判斷first有沒有改變
} else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
return node;
}
// 自旋等待獲取鎖(remove和replace操作)
private void scanAndLock(Object key, int hash) {
// 根據雜湊碼獲取連結串列頭結點
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
// 在while迴圈裡自旋
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
// 遍歷連結串列定位到該結點
if (e == null || key.equals(e.key)) {
retries = 0;
} else {
e = e.next;
}
// retries每次在這加1, 並判斷是否超過最大值
} else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
// retries為偶數時去判斷first有沒有改變
} else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
在前面我們講到過,分段鎖中的put,remove,replace這些操作都會要求先去獲取鎖,只有成功獲得鎖之後才能進行下一步操作,如果獲取失敗就會進行自旋。自旋操作也是在JDK1.7中新增的,為了避免執行緒頻繁的掛起和喚醒,以此提高併發操作時的效能。
在put方法中呼叫的是scanAndLockForPut,在remove和replace方法中呼叫的是scanAndLock。這兩種自旋方法大致是相同的,這裡我們只分析scanAndLockForPut方法。
首先還是先根據hash碼獲得連結串列頭結點,之後執行緒會進入while迴圈中執行,退出該迴圈的唯一方式是成功獲取鎖,而在這期間執行緒不會被掛起。
剛進入迴圈時retries的值為-1,這時執行緒不會馬上再去嘗試獲取鎖,而是先去尋找到key對應的結點(沒找到會新建一個),然後再將retries設為0,接下來就會一次次的嘗試獲取鎖,對應retries的值也會每次加1,直到超過最大嘗試次數如果還沒獲取到鎖,就會呼叫lock方法進行阻塞獲取。
在嘗試獲取鎖的期間,還會每隔一次(retries為偶數)去檢查頭結點是否被改變,如果被改變則將retries重置回-1,然後再重走一遍剛才的流程。這就是執行緒自旋時所做的操作,需注意的是如果在自旋時檢測到頭結點已被改變,則會延長執行緒的自旋時間。
2.10、雜湊表的擴容操作
// 再雜湊
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
// 獲取舊雜湊表的引用
HashEntry<K,V>[] oldTable = table;
// 獲取舊雜湊表的容量
int oldCapacity = oldTable.length;
// 計算新雜湊表的容量(為舊雜湊表的2倍)
int newCapacity = oldCapacity << 1;
// 計算新的元素閥值
threshold = (int)(newCapacity * loadFactor);
// 新建一個HashEntry陣列
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
// 生成新的掩碼值
int sizeMask = newCapacity - 1;
// 遍歷舊錶的所有元素
for (int i = 0; i < oldCapacity ; i++) {
// 取得連結串列頭結點
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 計算元素在新表中的索引
int idx = e.hash & sizeMask;
// next為空表明連結串列只有一個結點
if (next == null) {
// 直接把該結點放到新表中
newTable[idx] = e;
}else {
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 定位lastRun結點, 將lastRun之後的結點直接放到新表中
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// 遍歷在連結串列lastRun結點之前的元素, 將它們依次複製到新表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 計算傳入結點在新表中的下標
int nodeIndex = node.hash & sizeMask;
// 將傳入結點新增到連結串列頭結點
node.setNext(newTable[nodeIndex]);
// 將新表指定下標元素換成傳入結點
newTable[nodeIndex] = node;
// 將雜湊表引用指向新表
table = newTable;
}
rehash方法在put方法中被呼叫,我們知道在put方法時會新建元素並新增到雜湊陣列中,隨著元素的增多發生雜湊衝突的可能性越大,雜湊表的效能也會隨之下降。因此每次put操作時都會檢查元素總數是否超過閥值,如果超過則呼叫rehash方法進行擴容。因為陣列長度一旦確定則不能再被改變,因此需要新建一個數組來替換原先的陣列。
從程式碼中可以知道新建立的陣列長度為原陣列的2倍(oldCapacity << 1)。建立好新陣列後需要將舊陣列中的所有元素移到新陣列中,因此需要計算每個元素在新陣列中的下標。計算新下標的過程如下圖所示。
我們知道下標直接取的是雜湊碼的後幾位,由於新陣列的容量是直接用舊陣列容量右移1位得來的,因此掩碼位數向右增加1位,取到的雜湊碼位數也向右增加1位。如上圖,若舊的掩碼值為111,則元素下標為101,擴容後新的掩碼值為1111,則計算出元素的新下標為0101。
由於同一條連結串列上的元素下標是相同的,現在假設連結串列所有元素的下標為101,在擴容後該連結串列元素的新下標只有0101或1101這兩種情況,因此陣列擴容會打亂原先的連結串列並將連結串列元素分成兩批。
在計算出新下標後需要將元素移動到新陣列中,在HashMap中通過直接修改next引用導致了多執行緒的死鎖。雖然在ConcurrentHashMap中通過加鎖避免了這種情況,但是我們知道next域是volatile型別的,它的改動能立馬被讀執行緒讀取到,因此為保證執行緒安全採用複製元素來遷移陣列。
但是對連結串列中每個元素都進行復制有點影響效能,作者發現連結串列尾部有許多元素的next是不變的,它們在新陣列中的下標是相同的,因此可以考慮整體移動這部分元素。具統計實際操作中只有1/6的元素是必須複製的,所以整體移動連結串列尾部元素(lastRun後面的元素)是可以提升一定效能的。