ConcurrentHashMap JDK 1.6 源碼分析
前言
前段時間把 JDK 1.6 中的 HashMap
主要的一些操作源碼分析了一次。既然把 HashMap
源碼分析了, 就順便把 JDK 1.6 中 ConcurrentHashMap
的主要一些操作源碼分析一下。因為其中有很多思想是值得我們去借鑒的。 ConcurrentHashMap
中的分段鎖。這個思想在 JDK 1.8 中 為了優化 JUC 下的原子鎖 CAS 高並發情況下導致自旋次數太多效率低下。引用 Adder
。其中就是借鑒了分段鎖的思想。AtomicLong
對比 LongAdder
。 有興趣可以查看。
準備
如果有人問你了解 ConcurrentHashMap
嗎? 你可以這樣回答,了解。 ConcurrentHashMap
HashMap
非線程安全的,一種線程安全實現類。它有一個 Segment
數組,Segment
本身就是相當於一個 HashMap
對象。裏面是一個 HashEntry
數組,數組中的每一個 HashEntry
都是一個鍵值對,也是一個鏈表的表頭。如果別人問你,那 ConcurrentHashMap
get
或者 put
一個對象的時候是怎麽操作的 ,你該怎麽回答。emmm..... 繼續往下看。會有你要的答案。
構造函數
分析源碼,先從構造函數開始。直接研究帶所有參數的構造方法,其他一些重載的構造方法,最裏面還是調用了該構造方法。在看構造方法之前,需要 明白 sshift 是表示並發數的2的幾次方 比如並發數是16 那麽他的值就是 4 。ssize 是 segment
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 用來與 key的hashCode >>> 運算 獲取HashCode的高位 segmentShift = 32 - sshift; // 高位與 它做與運算 eg 假如 默認的創建該對象 那麽 segmentShift = 28 segmentMask=15(二進制為1111) 假如現在put一個值 他的key的HashCode值為2的32次方 那麽 他在segment裏面的定位時 2的32次方 無符號 高位補零 右移28個 那麽就等於 10000(二進制) 等於 16 與 1111 做與運算 等於0 也就是定位在 segment[0]上 。 segmentMask = ssize - 1; // segment數組大小為 16 this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // segment數組中 每個HashEntry數組的大小, int cap = 1; while (cap < c) cap <<= 1; // 為segment數組中的每個HashEntry數組初始化大小,每個semengt中只有一個HashEntry數組。如果你設置的 ConcurrentHashMap 初始化大小為16的話,則 segment數組中每個的HashEntry的大小為1,如果你初始化他的大小為28 的話。它會根據上面的運算,cap的大小為2,也就是segment數組中的每個HashEntry數組的大小為2 ,總的大小為32。 for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }
上面的註釋應該都挺清楚了,要註意的是 ConcurrentHashMap
的大小 是所有 Segment
數組中每個HashEntry
數組的大小相加的和。
put 方法
ConcurrentHashMap
每次 put
的時候都是需要加鎖的,只不過會鎖住他所在的那個Segment
數組位置。其他的不鎖,這也就是分段鎖,默認支持16個並發。說起put,以數組的形式存儲的數據,就會涉及到擴容。這樣是接下來需要好好討論的一個事情。
public V put(K key, V value) {
// key value 不能為null
if (value == null)
throw new NullPointerException();
// 獲取hash值
int hash = hash(key.hashCode());
// 先獲取hash二進制數的高位與15的二進制做與運算,得到segment數組的位置。
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 鎖住
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
// 擴容操作
rehash();
// 獲取 Segment數組中的其中的HashEntry數組
HashEntry<K,V>[] tab = table;
// 獲取在在HashEntry數組中的位置。
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 判斷是否是該key。
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
// 如果存在該key的數據 ,那麽更新該值 返回舊值
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
//頭插法插入 tab[index]
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
// 看下擴容操作的細節
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;
// HashEntry數組,新的數組為它的兩倍
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
// 閾值
threshold = (int)(newTable.length * loadFactor);
//他的二進制添加以為 原來他的大小為3 那麽二進制就是11 現在就為 7 二進制為 111
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 舊的HashEntry。
HashEntry<K,V> e = oldTable[i];
if (e != null) {
// 下一個 該HashEntry數組上的 HashEntry是否為鏈表,有下一個值。
HashEntry<K,V> next = e.next;
// 該HashEntry的新位置 如果高位為1 那麽他在HashEntry數組中的位置就是老的HashEntry數組中的加上這個倍數。舉個例子
// 假如e.hash 原來的的二進制位...111 老的HashEntry數組的大小為 4 那麽e.hash和 4-1 也就是3 做與運算 得到的值也就是二進制的11
// 值位3 現在新的HashEntry數組的大小為 8 也就是 e.hash 和 8-1 做與運算 得到的值 也就是二進制位 111 位 7 。
int idx = e.hash & sizeMask;
// 沒有的話就直接放入該位置了,如果有的話往下看:
if (next == null)
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
// 假如idx 等於 7
int lastIdx = idx;
// 找到最後一個 HashEntry中的位置,並且後面的HashEntry的位置都是一樣的。舉個例子
// 假如這個鏈表中的所有HashEntry的Hash值為 1-5-1-5-5-5 。那麽最後lastIdx = 5 也就是1-5-1後面的這個5 。lastRun 為 1-5-1後面的這個5的HashEnrty。
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
//
lastRun = last;
}
}
// 將 lastRun 復制給 這個新的Table 那麽後面還有 5-5-5 這些的就不用移動了 直接帶過來了。 這就是上面那個for循環做的事情
newTable[lastIdx] = lastRun;
// 對前面的 1-5-1做操作了 1就是在新HashEntry書中的1的位置 5的後就是頭插法 ,查到新HashEntry的頭部了
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}
其實put
方法中有點難理解的就是 把查找到後面如果有所有相同的 HashEntry
的key
的位置是一樣的話,就不用額外的進行Hash
重新定位了。不知道我描述的清不清楚。如果還有不清楚的話,可以私信一下我。
get 方法
ConcurrentHashMap
中 get
方法是不會加鎖的,如果get
的值為null的時候,這個時候會對這個HashEntry
進行加鎖。預防此時並發出現的問題。
public V get(Object key) {
//定位Segment數組中的HashEntry數組為位置
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
// 曾經put進去過,也就是裏面有值
if (count != 0) { // read-volatile
// 定位HashEntry數組中的HashEntry。
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
ConcurrentHashMap
中get
方法是比較簡單的。看一看就知道了。
總結
這一遍ConcurrentHashMap
源碼分析,可以說是自己寫了大半個月吧。好早之前就準備寫了。總是寫一點,然後就停筆了。加上自己換了公司的原因,又忙上了一段時間,導致一拖再落。哇,嚴重拖延癥患者。上面自己也是全部透徹之後寫下來的,如果有些表達不夠清晰的還得多加包涵,如果有不同的可以下方瀏覽討論一下。上面很多關鍵的代碼我都寫上了註釋,可以配合著註釋,然後自己對源碼進行研究,查看,如果還有不是很透徹的話,自己多翻一翻其他人寫的。最近一直在寫LeetCode上的動態規劃這些算法題。其實也就是抄一遍。等以後有了感悟再來寫這一些吧。
ConcurrentHashMap JDK 1.6 源碼分析