ConcurrentHashMap JDK 1.6 原始碼分析
前言
前段時間把 JDK 1.6 中的 HashMap
主要的一些操作原始碼分析了一次。既然把 HashMap
原始碼分析了, 就順便把 JDK 1.6 中 ConcurrentHashMap
的主要一些操作原始碼分析一下。因為其中有很多思想是值得我們去借鑑的。 ConcurrentHashMap
中的分段鎖。這個思想在 JDK 1.8 中 為了優化 JUC 下的原子鎖 CAS 高併發情況下導致自旋次數太多效率低下。引用 Adder
。其中就是借鑑了分段鎖的思想。AtomicLong
對比 LongAdder
。 有興趣可以檢視。
準備
如果有人問你瞭解 ConcurrentHashMap
嗎? 你可以這樣回答,瞭解。 ConcurrentHashMap
HashMap
非執行緒安全的,一種執行緒安全實現類。它有一個 Segment
陣列,Segment
本身就是相當於一個 HashMap
物件。裡面是一個 HashEntry
陣列,陣列中的每一個 HashEntry
都是一個鍵值對,也是一個連結串列的表頭。如果別人問你,那 ConcurrentHashMap
get
或者 put
一個物件的時候是怎麼操作的 ,你該怎麼回答。emmm..... 繼續往下看。會有你要的答案。
建構函式
分析原始碼,先從建構函式開始。直接研究帶所有引數的構造方法,其他一些過載的構造方法,最裡面還是呼叫了該構造方法。在看構造方法之前,需要 明白 sshift 是表示併發數的2的幾次方 比如併發數是16 那麼他的值就是 4 。ssize 是 segment
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 用來與 key的hashCode >>> 運算 獲取HashCode的高位 segmentShift = 32 - sshift; // 高位與 它做與運算 eg 假如 預設的建立該物件 那麼 segmentShift = 28 segmentMask=15(二進位制為1111) 假如現在put一個值 他的key的HashCode值為2的32次方 那麼 他在segment裡面的定位時 2的32次方 無符號 高位補零 右移28個 那麼就等於 10000(二進位制) 等於 16 與 1111 做與運算 等於0 也就是定位在 segment[0]上 。 segmentMask = ssize - 1; // segment陣列大小為 16 this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // segment陣列中 每個HashEntry陣列的大小, int cap = 1; while (cap < c) cap <<= 1; // 為segment陣列中的每個HashEntry陣列初始化大小,每個semengt中只有一個HashEntry陣列。如果你設定的 ConcurrentHashMap 初始化大小為16的話,則 segment陣列中每個的HashEntry的大小為1,如果你初始化他的大小為28 的話。它會根據上面的運算,cap的大小為2,也就是segment陣列中的每個HashEntry陣列的大小為2 ,總的大小為32。 for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }
上面的註釋應該都挺清楚了,要注意的是 ConcurrentHashMap
的大小 是所有 Segment
陣列中每個HashEntry
陣列的大小相加的和。
put 方法
ConcurrentHashMap
每次 put
的時候都是需要加鎖的,只不過會鎖住他所在的那個Segment
陣列位置。其他的不鎖,這也就是分段鎖,預設支援16個併發。說起put,以陣列的形式儲存的資料,就會涉及到擴容。這樣是接下來需要好好討論的一個事情。
public V put(K key, V value) {
// key value 不能為null
if (value == null)
throw new NullPointerException();
// 獲取hash值
int hash = hash(key.hashCode());
// 先獲取hash二進位制數的高位與15的二進位制做與運算,得到segment陣列的位置。
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 鎖住
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
// 擴容操作
rehash();
// 獲取 Segment陣列中的其中的HashEntry陣列
HashEntry<K,V>[] tab = table;
// 獲取在在HashEntry陣列中的位置。
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 判斷是否是該key。
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
// 如果存在該key的資料 ,那麼更新該值 返回舊值
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
//頭插法插入 tab[index]
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
// 看下擴容操作的細節
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;
// HashEntry陣列,新的陣列為它的兩倍
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
// 閾值
threshold = (int)(newTable.length * loadFactor);
//他的二進位制新增以為 原來他的大小為3 那麼二進位制就是11 現在就為 7 二進位制為 111
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 舊的HashEntry。
HashEntry<K,V> e = oldTable[i];
if (e != null) {
// 下一個 該HashEntry陣列上的 HashEntry是否為連結串列,有下一個值。
HashEntry<K,V> next = e.next;
// 該HashEntry的新位置 如果高位為1 那麼他在HashEntry陣列中的位置就是老的HashEntry陣列中的加上這個倍數。舉個例子
// 假如e.hash 原來的的二進位制位...111 老的HashEntry陣列的大小為 4 那麼e.hash和 4-1 也就是3 做與運算 得到的值也就是二進位制的11
// 值位3 現在新的HashEntry陣列的大小為 8 也就是 e.hash 和 8-1 做與運算 得到的值 也就是二進位制位 111 位 7 。
int idx = e.hash & sizeMask;
// 沒有的話就直接放入該位置了,如果有的話往下看:
if (next == null)
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
// 假如idx 等於 7
int lastIdx = idx;
// 找到最後一個 HashEntry中的位置,並且後面的HashEntry的位置都是一樣的。舉個例子
// 假如這個連結串列中的所有HashEntry的Hash值為 1-5-1-5-5-5 。那麼最後lastIdx = 5 也就是1-5-1後面的這個5 。lastRun 為 1-5-1後面的這個5的HashEnrty。
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
//
lastRun = last;
}
}
// 將 lastRun 複製給 這個新的Table 那麼後面還有 5-5-5 這些的就不用移動了 直接帶過來了。 這就是上面那個for迴圈做的事情
newTable[lastIdx] = lastRun;
// 對前面的 1-5-1做操作了 1就是在新HashEntry書中的1的位置 5的後就是頭插法 ,查到新HashEntry的頭部了
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}
其實put
方法中有點難理解的就是 把查詢到後面如果有所有相同的 HashEntry
的key
的位置是一樣的話,就不用額外的進行Hash
重新定位了。不知道我描述的清不清楚。如果還有不清楚的話,可以私信一下我。
get 方法
ConcurrentHashMap
中 get
方法是不會加鎖的,如果get
的值為null的時候,這個時候會對這個HashEntry
進行加鎖。預防此時併發出現的問題。
public V get(Object key) {
//定位Segment陣列中的HashEntry陣列為位置
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
// 曾經put進去過,也就是裡面有值
if (count != 0) { // read-volatile
// 定位HashEntry陣列中的HashEntry。
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
ConcurrentHashMap
中get
方法是比較簡單的。看一看就知道了。
總結
這一遍ConcurrentHashMap
原始碼分析,可以說是自己寫了大半個月吧。好早之前就準備寫了。總是寫一點,然後就停筆了。加上自己換了公司的原因,又忙上了一段時間,導致一拖再落。哇,嚴重拖延症患者。上面自己也是全部透徹之後寫下來的,如果有些表達不夠清晰的還得多加包涵,如果有不同的可以下方瀏覽討論一下。上面很多關鍵的程式碼我都寫上了註釋,可以配合著註釋,然後自己對原始碼進行研究,檢視,如果還有不是很透徹的話,自己多翻一翻其他人寫的。最近一直在寫LeetCode上的動態規劃這些演算法題。其實也就是抄一遍。等以後有了感悟再來寫這一些吧