Java concurrent Framework併發容器之ConcurrentHashMap(JDK5,6,7,8)原始碼分析&ConcurrentSkipListMap
ConcurrentHashMap
ConcurrentHashMap是一個執行緒安全的Hash Table,它的主要功能是提供了一組和HashTable功能相同但是執行緒安全的方法。ConcurrentHashMap可以做到讀取資料不加鎖,並且其內部的結構可以讓其在進行寫操作的時候能夠將鎖的粒度保持地儘量地小,不用對整個ConcurrentHashMap加鎖。
ConcurrentHashMap的內部結構
ConcurrentHashMap為了提高本身的併發能力,在內部採用了一個叫做Segment的結構,一個Segment其實就是一個類Hash Table的結構,Segment內部維護了一個連結串列陣列,我們用下面這一幅圖來看下ConcurrentHashMap的內部結構:
從上面的結構我們可以瞭解到,ConcurrentHashMap定位一個元素的過程需要進行兩次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的連結串列的頭部,因此,這一種結構的帶來的副作用是Hash的過程要比普通的HashMap要長,但是帶來的好處是寫操作的時候可以只對元素所在的Segment進行加鎖即可,不會影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支援Segment數量大小的寫操作(剛好這些寫操作都非常平均地分佈在所有的Segment上),所以,通過這一種結構,ConcurrentHashMap的併發能力可以大大的提高。
Segment
我們再來具體瞭解一下Segment的資料結構:
1 |
static final class Segment<k,v>
extends ReentrantLock implements
Serializable { |
2 |
transient volatile int
count; |
3 |
transient int modCount; |
4 |
transient int threshold; |
5 |
transient volatile HashEntry<k,v>[] table; |
6 |
final float loadFactor; |
7 |
}</k,v></k,v> |
詳細解釋一下Segment裡面的成員變數的意義:
- count:Segment中元素的數量
- modCount:對table的大小造成影響的操作的數量(比如put或者remove操作)
- threshold:閾值,Segment裡面元素的數量超過這個值依舊就會對Segment進行擴容
- table:連結串列陣列,陣列中的每一個元素代表了一個連結串列的頭部
- loadFactor:負載因子,用於確定threshold
HashEntry
Segment中的元素是以HashEntry的形式存放在連結串列陣列中的,看一下HashEntry的結構:
1 |
static final class HashEntry<k,v> { |
2 |
final K key; |
3 |
final int hash; |
4 |
volatile V value; |
5 |
final HashEntry<k,v> next; |
6 |
}</k,v></k,v> |
可以看到HashEntry的一個特點,除了value以外,其他的幾個變數都是final的,這樣做是為了防止連結串列結構被破壞,出現ConcurrentModification的情況。
ConcurrentHashMap的初始化
下面我們來結合原始碼來具體分析一下ConcurrentHashMap的實現,先看下初始化方法:
01 |
public ConcurrentHashMap( int initialCapacity, |
02 |
float loadFactor, int
concurrencyLevel) { |
03 |
if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel
<= 0 ) |
04 |
throw new IllegalArgumentException(); |
05 |
06 |
if (concurrencyLevel > MAX_SEGMENTS) |
07 |
concurrencyLevel = MAX_SEGMENTS; |
08 |
09 |
// Find power-of-two sizes best matching arguments |
10 |
int sshift = 0 ; |
11 |
int ssize = 1 ; |
12 |
while (ssize < concurrencyLevel) { |
13 |
++sshift; |
14 |
ssize <<= 1 ; |
15 |
} |
16 |
segmentShift = 32 - sshift; |
17 |
segmentMask = ssize - 1 ; |
18 |
this .segments = Segment.newArray(ssize); |
19 |
20 |
if (initialCapacity > MAXIMUM_CAPACITY) |
21 |
initialCapacity = MAXIMUM_CAPACITY; |
22 |
int c = initialCapacity / ssize; |
23 |
if (c * ssize < initialCapacity) |
24 |
++c; |
25 |
int cap = 1 ; |
26 |
while (cap < c) |
27 |
cap <<= 1 ; |
28 |
29 |
for ( int i =
0 ; i < this .segments.length; ++i) |
30 |
this .segments[i] = new Segment<k,v>(cap, loadFactor); |
31 |
}</k,v> |
CurrentHashMap的初始化一共有三個引數,一個initialCapacity,表示初始的容量,一個loadFactor,表示負載引數,最後一個是concurrentLevel,代表ConcurrentHashMap內部的Segment的數量,ConcurrentLevel一經指定,不可改變,後續如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中連結串列陣列的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment裡面的元素做一次rehash就可以了。
整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據concurrentLevel來new出Segment,這裡Segment的數量是不大於concurrentLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操作來進行hash,加快hash的過程。接下來就是根據intialCapacity確定Segment的容量的大小,每一個Segment的容量大小也是2的指數,同樣使為了加快hash的過程。
這邊需要特別注意一下兩個變數,分別是segmentShift和segmentMask,這兩個變數在後面將會起到很大的作用,假設建構函式確定了Segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。
ConcurrentHashMap的get操作
前面提到過ConcurrentHashMap的get操作是不用加鎖的,我們這裡看一下其實現:
1 |
public V get(Object key) { |
2 |
int hash = hash(key.hashCode()); |
3 |
return segmentFor(hash).get(key, hash); |
4 |
} |
看第三行,segmentFor這個函式用於確定操作應該在哪一個segment中進行,幾乎對ConcurrentHashMap的所有操作都需要用到這個函式,我們看下這個函式的實現:
1 |
final Segment<k,v> segmentFor( int hash) { |
2 |
return segments[(hash >>> segmentShift) & segmentMask]; |
3 |
}</k,v> |
這個函式用了位操作來確定Segment,根據傳入的hash值向右無符號右移segmentShift位,然後和segmentMask進行與操作,結合我們之前說的segmentShift和segmentMask的值,就可以得出以下結論:假設Segment的數量是2的n次方,根據元素的hash值的高n位就可以確定元素到底在哪一個Segment中。
在確定了需要在哪一個segment中進行操作以後,接下來的事情就是呼叫對應的Segment的get方法:
01 |
V get(Object key, int hash) { |
02 |
if (count != 0 ) { // read-volatile |
03 |
HashEntry<k,v> e = getFirst(hash); |
04 |
while (e != null ) { |
05 |
if (e.hash == hash && key.equals(e.key)) { |
06 |
V v = e.value; |
07 |
if (v != null ) |
08 |
return v; |
09 |
return readValueUnderLock(e); // recheck |
10 |
} |
11 |
e = e.next; |
12 |
} |
13 |
} |
14 |
return null ; |
15 |
}</k,v> |
先看第二行程式碼,這裡對count進行了一次判斷,其中count表示Segment中元素的數量,我們可以來看一下count的定義:
1 |
transient
volatile int count; |
可以看到count是volatile的,實際上這裡裡面利用了volatile的語義:
對volatile欄位的寫入操作happens-before於每一個後續的同一個欄位的讀操作。
因為實際上put、remove等操作也會更新count的值,所以當競爭發生的時候,volatile的語義可以保證寫操作在讀操作之前,也就保證了寫操作對後續的讀操作都是可見的,這樣後面get的後續操作就可以拿到完整的元素內容。
然後,在第三行,呼叫了getFirst()來取得連結串列的頭部:
1 |
HashEntry<k,v> getFirst( int hash) { |
2 |
HashEntry<k,v>[] tab = table; |
3 |
return tab[hash & (tab.length - 1 )]; |
4 |
}</k,v></k,v> |
同樣,這裡也是用位操作來確定連結串列的頭部,hash值和HashTable的長度減一做與操作,最後的結果就是hash值的低n位,其中n是HashTable的長度以2為底的結果。
在確定了連結串列的頭部以後,就可以對整個連結串列進行遍歷,看第4行,取出key對應的value的值,如果拿出的value的值是null,則可能這個key,value對正在put的過程中,如果出現這種情況,那麼就加鎖來保證取出的value是完整的,如果不是null,則直接返回value。
ConcurrentHashMap的put操作
看完了get操作,再看下put操作,put操作的前面也是確定Segment的過程,這裡不再贅述,直接看關鍵的segment的put方法:
01 |
V put(K key, int hash, V value, boolean
onlyIfAbsent) { |
02 |
lock(); |
03 |
try { |
04 |
int c = count; |
05 |
if (c++ > threshold) // ensure capacity |
06 |
rehash(); |
07 |
HashEntry<k,v>[] tab = table; |
08 |
int index = hash & (tab.length - 1 ); |
09 |
HashEntry<k,v> first = tab[index]; |
10 |
HashEntry<k,v> e = first; |
11 |
while (e != null
&& (e.hash != hash || !key.equals(e.key))) |
12 |
e = e.next; |
13 |
14 |
V oldValue; |
15 |
if (e != null ) { |
16 |
oldValue = e.value; |
17 |
if (!onlyIfAbsent) |
18 |
e.value = value; |
19 |
} |
20 |
else { |
21 |
oldValue = null ; |
22 |
++modCount; |
23 |
tab[index] = new HashEntry<k,v>(key, hash, first, value); |
24 |
count = c; // write-volatile |
25 |
} |
26 |
return oldValue; |
27 |
}
finally { |
28 |
unlock(); |
29 |
} |
30 |
}</k,v></k,v></k,v></k,v> |
首先對Segment的put操作是加鎖完成的,然後在第五行,如果Segment中元素的數量超過了閾值(由建構函式中的loadFactor算出)這需要進行對Segment擴容,並且要進行rehash,關於rehash的過程大家可以自己去了解,這裡不詳細講了。
第8和第9行的操作就是getFirst的過程,確定連結串列頭部的位置。
第11行這裡的這個while迴圈是在連結串列中尋找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果沒有找到,則進入21行這裡,生成一個新的HashEntry並且把它加到整個Segment的頭部,然後再更新count的值。
ConcurrentHashMap的remove操作
Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過程,然後再呼叫Segment的remove方法:
01 |
V remove(Object key, int hash, Object value) { |
02 |
lock(); |
03 |
try { |
04 |
int c = count - 1 ; |
05 |
HashEntry<k,v>[] tab = table; |
06 |
int index = hash & (tab.length - 1 ); |
07 |
HashEntry<k,v> first = tab[index]; |
08 |
HashEntry<k,v> e = first; |
09 |
while (e != null
&& (e.hash != hash || !key.equals(e.key))) |
10 |
e = e.next; |
11 |
12 |
V oldValue = null ; |
13 |
if (e != null ) { |
14 |
V v = e.value; |
15 |
if (value == null
|| value.equals(v)) { |
16 |
oldValue = v; |
17 |
// All entries following removed node can stay |
18 |
// in list, but all preceding ones need to be |
19 |
// cloned. |
20 |
++modCount; |
21 |
HashEntry<k,v> newFirst = e.next; |
22 |
for (HashEntry<k,v> p = first; p != e; p = p.next) |
23 |
newFirst = new HashEntry<k,v>(p.key, p.hash, |
24 |
newFirst, p.value); |
25 |
tab[index] = newFirst; |
26 |
count = c; // write-volatile |
27 |
} |
28 |
} |
29 |
return oldValue; |
30 |
}
finally { |
31 |
unlock(); |
32 |
} |
33 |
}</k,v></k,v></k,v></k,v></k,v></k,v> |
首先remove操作也是確定需要刪除的元素的位置,不過這裡刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向後面一個就完事了,我們之前已經說過HashEntry中的next是final的,一經賦值以後就不可修改,在定位到待刪除元素的位置以後,程式就將待刪除元素前面的那一些元素全部複製一遍,然後再一個一個重新接到連結串列上去,看一下下面這一幅圖來了解這個過程:
假設連結串列中原來的元素如上圖所示,現在要刪除元素3,那麼刪除元素3以後的連結串列就如下圖所示:ConcurrentHashMap的size操作
在前面的章節中,我們涉及到的操作都是在單個Segment中進行的,但是ConcurrentHashMap有一些操作是在多個Segment中進行,比如size操作,ConcurrentHashMap的size操作也採用了一種比較巧的方式,來儘量避免對所有的Segment都加鎖。
前面我們提到了一個Segment中的有一個modCount變數,代表的是對Segment中元素的數量造成影響的操作的次數,這個值只增不減,size操作就是遍歷了兩次Segment,每次記錄Segment的modCount值,然後將兩次的modCount進行比較,如果相同,則表示期間沒有發生過寫入操作,就將原先遍歷的結果返回,如果不相同,則把這個過程再重複做一次,如果再不相同,則就需要將所有的Segment都鎖住,然後一個一個遍歷了,具體的實現大家可以看ConcurrentHashMap的原始碼,這裡就不貼了。
原文地址:http://www.goldendoc.org/2011/06/juc_concurrenthashmap