多執行緒十一之ConcurrentHashMap1.7原始碼分析
目錄
- 簡介
- ConcurrentHashMap資料結構
- 原始碼解析
- put(K key, V value)
- get(Object key)
- size()
- remove(Object key)
- isEmpty()
- 總結
簡介
本文是基於JDK7分析ConcurrentHashMap的實現原理,這個版本ConcurrentHashMap的程式碼實現比較清晰,程式碼加註釋總共也就1622行,適合用來分析學習。
A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.
大意是ConcurrentHashMap支援併發的讀寫,支援HashTable的所有方法,實現併發讀寫不會鎖定整個ConcurrentHashMap。
ConcurrentHashMap資料結構
我們回憶一下HashMap的資料結構(JDK7版本),核心是一個鍵值對Entry陣列,鍵值對通過鍵的hash值對映到陣列上:
ConcurrentHashMap在初始化時會要求初始化concurrencyLevel作為segment陣列長度,即併發度,代表最多有多少個執行緒可以同時操作ConcurrentHashMap,預設是16,每個segment片段裡面含有鍵值對HashEntry陣列,是真正存放鍵值對的地方,這就是ConcurrentHashMap的資料結構。
原始碼解析
從圖中可以看到,ConcurrentHashMap離不開Segment,Segment是ConcurrentHashMap的一個靜態內部類,可以看到Segment繼承了重入鎖ReentrantLock,要想訪問Segment片段,執行緒必須獲得同步鎖,結構如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
//嘗試獲取鎖的最多嘗試次數,即自旋次數
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//HashEntry陣列,也就是鍵值對陣列
transient volatile HashEntry<K, V>[] table;
//元素的個數
transient int count;
//segment中發生改變元素的操作的次數,如put/remove
transient int modCount;
//當table大小超過閾值時,對table進行擴容,值為capacity *loadFactor
transient int threshold;
//載入因子
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}
鍵值對HashEntry是ConcurrentHashMap的基本資料結構,多個HashEntry可以形成連結串列用於解決hash衝突。
static final class HashEntry<K,V> {
//hash值
final int hash;
//鍵
final K key;
//值
volatile V value;
//下一個鍵值對
volatile HashEntry<K, V> next;
HashEntry(int hash, K key, V value, HashEntry<K, V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}
ConcurrentHashMap成員變數和構造方法如下:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
//預設的初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//預設載入因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//預設的併發度,也就是預設的Segment陣列長度
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量,ConcurrentMap最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//每個segment中table陣列的長度,必須是2^n,最小為2
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//允許最大segment數量,用於限定concurrencyLevel的邊界,必須是2^n
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
//非鎖定情況下呼叫size和contains方法的重試次數,避免由於table連續被修改導致無限重試
static final int RETRIES_BEFORE_LOCK = 2;
//計算segment位置的掩碼值
final int segmentMask;
//用於計算算segment位置時,hash參與運算的位數
final int segmentShift;
//Segment陣列
final Segment<K,V>[] segments;
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//引數校驗
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
//找到一個大於等於傳入的concurrencyLevel的2^n數,且與concurrencyLevel最接近
//ssize作為Segment陣列
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 計算每個segment中table的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 確保cap是2^n
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 建立segments並初始化第一個segment陣列,其餘的segment延遲初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
}
concurrencyLevel 引數表示期望併發的修改 ConcurrentHashMap 的執行緒數量,用於決定 Segment 的數量,通過演算法可以知道就是找到最接近傳入的concurrencyLevel的2的冪次方。而segmentMask 和 segmentShift看上去有點難以理解,作用主要是根據key的hash值做計算定位在哪個Segment片段。
對於雜湊表而言,最重要的方法就是put和get了,下面分別來分析這兩個方法的實現:
put(K key, V value)
put方法實際上只有兩步:1.根據鍵的值定位鍵值對在那個segment片段 2.呼叫Segment的put方法
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//計算鍵的hash值
int hash = hash(key);
//通過hash值運算把鍵值對定位到segment[j]片段上
int j = (hash >>> segmentShift) & segmentMask;
//檢查segment[j]是否已經初始化了,沒有的話呼叫ensureSegment初始化segment[j]
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//向片段中插入鍵值對
return s.put(key, hash, value, false);
}
- ensureSegment(int k)
我們從ConcurrentHashMap的建構函式可以發現Segment陣列只初始化了Segment[0],其餘的Segment是用到了在初始化,用了延遲載入的策略,而延遲載入呼叫的就是ensureSegment方法
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//按照segment[0]的HashEntry陣列長度和載入因子初始化Segment[k]
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
- put(K key, int hash, V value, boolean onlyIfAbsent)
呼叫Segment的put方法插入鍵值對到Segment的HashEntry陣列
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//Segment繼承ReentrantLock,嘗試獲取獨佔鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//定位鍵值對在HashEntry陣列上的位置
int index = (tab.length - 1) & hash;
//獲取這個位置的第一個鍵值對
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {//此處有連結串列結構,一直迴圈到e==null
K k;
//存在與待插入鍵值對相同的鍵,則替換value
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {//onlyIfAbsent預設為false
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//node不為null,設定node的next為first,node為當前連結串列的頭節點
if (node != null)
node.setNext(first);
//node為null,建立頭節點,指定next為first,node為當前連結串列的頭節點
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//擴容條件 (1)entry數量大於閾值 (2) 當前陣列tab長度小於最大容量。滿足以上條件就擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//擴容
rehash(node);
else
//tab的index位置設定為node,
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
- scanAndLockForPut(K key, int hash, V value)
在不超過最大重試次數MAX_SCAN_RETRIES通過CAS嘗試獲取鎖
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//first,e:鍵值對的hash值定位到陣列tab的第一個鍵值對
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//執行緒嘗試通過CAS獲取鎖
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
//當e==null或key.equals(e.key)時retry=0,走出這個分支
if (e == null) {
if (node == null) // speculatively create node
//初始化鍵值對,next指向null
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//超過最大自旋次數,阻塞
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//頭節點發生變化,重新遍歷
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
- rehash(HashEntry<K,V> node)
用於對Segment的table陣列進行擴容,擴容後的陣列長度是原陣列的兩倍。
private void rehash(HashEntry<K,V> node) {
//擴容前的舊tab陣列
HashEntry<K,V>[] oldTable = table;
//擴容前陣列長度
int oldCapacity = oldTable.length;
//擴容後陣列長度(擴容前兩倍)
int newCapacity = oldCapacity << 1;
//計算新的閾值
threshold = (int)(newCapacity * loadFactor);
//新的tab陣列
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//新的掩碼
int sizeMask = newCapacity - 1;
//遍歷舊的陣列
for (int i = 0; i < oldCapacity ; i++) {
//遍歷陣列的每一個元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
//元素e指向的下一個節點,如果存在hash衝突那麼e不為空
HashEntry<K,V> next = e.next;
//計算元素在新陣列的索引
int idx = e.hash & sizeMask;
// 桶中只有一個元素,把當前的e設定給新的table
if (next == null) // Single node on list
newTable[idx] = e;
//桶中有佈置一個元素的連結串列
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
// idx 是當前連結串列的頭結點 e 的新位置
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
//k是單鏈表元素在新陣列的位置
int k = last.hash & sizeMask;
//lastRun是最後一個擴容後不在原桶處的Entry
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//lastRun以及它後面的元素都在一個桶中
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//遍歷到lastRun即可
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//處理引起擴容的那個待新增的節點
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
//把Segment的table指向擴容後的table
table = newTable;
}
get(Object key)
get獲取元素不需要加鎖,效率高,獲取key定位到的segment片段還是遍歷table陣列的HashEntry元素時使用了UNSAFE.getObjectVolatile保證了能夠無鎖且獲取到最新的volatile變數的值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
//計算key的hash值
int h = hash(key);
//根據hash值計算key在哪個segment片段
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//獲取segments[u]的table陣列
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//遍歷table中的HashEntry元素
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//找到相同的key,返回value
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
size()
size方法用來計算ConcurrentHashMap中儲存元素的個數。那麼在統計所有的segment元素的個數是否都需要上鎖呢?如果不上鎖在統計的過程中可能存在其他執行緒併發儲存/刪除元素,而如果上鎖又會降低讀寫效率。ConcurrentHashMap在實現時使用了折中的方法,它會無鎖遍歷三次把所有的segment的modCount加到sum裡面,如果與前一次遍歷結果相比sum沒有改變那麼說明這兩次遍歷沒有其他執行緒修改ConcurrentHashMap,返回segment的count的和;如果每次遍歷與上一次相比都不一樣那就上鎖進行同步。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//達到RETRIES_BEFORE_LOCK,也就是三次
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
//遍歷計算segment的modCount和count的和
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
//是否溢位int範圍
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//last是上一次的sum值,相等跳出迴圈
if (sum == last)
break;
last = sum;
}
} finally {
//解鎖
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
remove(Object key)
呼叫Segment的remove方法
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
- remove(Object key, int hash, Object value)
獲取同步鎖,移除指定的鍵值對
final V remove(Object key, int hash, Object value) {
//獲取同步鎖
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
//遍歷連結串列用來儲存當前連結串列節點的前一個節點
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
//找到key對應的鍵值對
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
//鍵值對的值與傳入的value相等
if (value == null || value == v || value.equals(v)) {
//當前元素為頭節點,把當前元素的下一個節點設為頭節點
if (pred == null)
setEntryAt(tab, index, next);
//不是頭節點,把當前連結串列節點的前一個節點的next指向當前節點的下一個節點
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
- scanAndLock(Object key, int hash)
掃描是否含有指定的key並且獲取同步鎖,當方法執行完畢也就是跳出迴圈肯定成功獲取到同步鎖,跳出迴圈有兩種方式:1.tryLock方法嘗試獲取獨佔鎖成功 2.嘗試獲取超過最大自旋次數MAX_SCAN_RETRIES執行緒堵塞,當執行緒從等待佇列中被喚醒獲取到鎖跳出迴圈。
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
isEmpty()
檢查ConcurrentHashMap是否為空。同樣沒有使用同步鎖,通過兩次遍歷:1.確定每個segment是否為0,其中任何一個segment的count不為0,就返回,都為0,就累加modCount為sum.2.第一個迴圈執行完還沒有推出,map可能為空,再做一次遍歷,如果在這個過程中任何一個segment的count不為0返回false,同時sum減去每個segment的modCount,若迴圈執行完程式還沒有退出,比較sum是否為0,為0表示兩次檢查沒有元素插入,map確實為空,否則map不為空。
public boolean isEmpty() {
//累計segment的modCount值
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
//再次檢查
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L)
return false;
}
return true;
}
總結
ConcurrentHashMap引入分段鎖的概念提高了併發量,每當執行緒要修改雜湊表時並不是鎖住整個表,而是去操作某一個segment片段,只對segment做同步,通過細化鎖的粒度提高了效率,相對與HashTable對整個雜湊表做同步處理更實用與多執行緒環