ConcurrentHashMap原始碼刨析(基於jdk1.7)
看原始碼前我們必須先知道一下ConcurrentHashMap的基本結構。ConcurrentHashMap是採用分段鎖來進行併發控制的。
其中有一個內部類為Segment類用來表示鎖。而Segment類裡又有一個HashEntry<K,V>[]陣列,這個陣列才是真正用
來存放我們的key-value的。
大概為如下圖結構。一個Segment陣列,而Segment陣列每個元素為一個HashEntry陣列
看原始碼前我們還必須瞭解的幾個預設的常量值:
-
DEFAULT_INITIAL_CAPACITY = 16 容器預設容量為16
-
DEFAULT_LOAD_FACTOR = 0.75f 預設擴容因子是0.75
-
DEFAULT_CONCURRENCY_LEVEL = 16 預設併發度是16
-
MAXIMUM_CAPACITY = 1 << 30 容器最大容量為1073741824
-
MIN_SEGMENT_TABLE_CAPACITY = 2 段的最小大小
-
MAX_SEGMENTS = 1 << 16 段的最大大小
-
RETRIES_BEFORE_LOCK = 2 通過不獲取鎖的方式嘗試獲取size的次數
以上以及預設值是ConcurrentHashMap中定義好的,下面我們很多地方會用到他們。
先從初始化開始說起
通過我們使用ConcurrentHashMap都是通過 ConcurrentHashMap<String,String> map = new ConcurrentHashMap<>();的方式
我們點進去跟蹤下原始碼
/** * Creates a new, empty map with a default initial capacity (16), * load factor (0.75) and concurrencyLevel (16). */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
可以看到,預設無參建構函式內呼叫了另一個帶參建構函式,而這個建構函式也就是不管你初始化時傳進來什麼引數,最終都會跳到那個帶參建構函式。
點進去看看這個帶參建構函式實現了什麼功能
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
我們看到該建構函式一共有三個引數,分別是容器的初始化大小、負載因子、併發度,這三個引數如果我們new 一個ConcurrentHashMap時沒有指定,
那麼將會採用預設的引數,也就是我們本文開始說的那幾個常量值。
在這裡我對這三個引數做下解釋。容器初始化大小是整個map的容量。負載因子是用來計算每個segment裡的HashEntry陣列擴容時的閾值的。併發度是
用來設定segment陣列的長度的。
開頭這兩個if沒什麼好說的。就是用來判斷我們傳進來的引數的正確性。當負載因子,初始容量和併發度不按照規範來時會丟擲算術異常。第二個if時當傳進來的
併發度大於最大段大小的時候,就將其設定為最大段大小。
這段就比較有意思了。由於segment陣列要求長度必須為2的n次方,當我們傳進來的併發度不是2的n次方時會計算出一個最接近它的2的n次方值
比如如何我們傳進來的併發度為14 15那麼通過計算segment陣列長度就是16。在上圖中我們可以看到兩個區域性變數ssize和sshift,在迴圈中如果ssize小於
併發度就將其二進位制左移一位,即乘2。因此ssize就是用來儲存我們計算出來的最接近併發度的2的n次方值。而ssfhit是用來計算偏移量的。在這裡我們又
要說兩個很重要的全域性常量。segmentMask和segmentShift。其中segmentMask為ssize - 1,由於ssize為2的倍數。那麼segmentMask就是奇數。化為
二進位制就是全1,而segmentShift為32 - sshift大小。32是key值經過再hash求出來的值的二進位制位。segmentMask和segmentShift是用來定位當前元素
在segment陣列那個位置,和在HashEntry陣列的哪個位置,後面我們會詳細說說怎麼算的。
這一段程式碼就是用來確定每個segment裡面的hashentry的一些引數和初始化segment陣列了。第一個if是防止我們設定的初始化
容量大於最大容量。而c是用來計算每個hashentry陣列的容量。由於每個hashentry陣列容量也需要為2的n次方,因此這裡也需要
一個cap和迴圈來計算一個2的n次方值,方法和上面一樣。這裡計算出來的cap值就是最終hashentry陣列實際的大小了。
初始化就做了這些工作了。
那麼我們在說說最簡單的get方法。
get方法就需要用到定位我們的元素了。而定位元素就需要我們上面初始化時設定好的兩個值:segmentMask和segmentShift
上面說了,併發度預設值為16,那麼ssize也為16,因此segmentMask為15.由於ssize二進位制往左移了4位,那麼sshift就是4,
segmentShift就是32-4=28.下面我們就用segmentMask=15,segmentShift為28來說說怎麼確定元素位置的。
在這裡我們要說下hash值,這裡的hash值不是key的hashcode值,而是經過再hash確定下來的一個hash值,目的是為了減少hash衝突。
hash值二進位制為32位。
上圖兩個紅框就是分別確定segment陣列中的位置和hashentry陣列中的位置。
我們可以看到確定segment陣列是採用 (h >>> segmentShift) & segmentMask,其中h為再hash過的hash值。將32為的hash值往右移segmentShift位。這裡我們假設移了28位。
而segmentMask為15,就是4位都為一的二進位制。將高4位與segmentMask相與會等到一個小於16的值,就是當前元素再的segment位置。
確定了所屬的segment後。就要確認在的hashentry位置了。通過第二個紅框處,我們可以看到確定hashentry的位置沒有使用上面兩個值了。而是直接使用當前hashentry陣列的長度減一
和hash值想與。通過兩種不同的演算法分別定位segment和hashenrty可以保證元素在segment陣列和hashentry數組裡面都雜湊開了。
Put方法
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
上面兩片程式碼就是put一個元素的過程。由於Put方法裡需要對共享變數進行寫入操作,因此為了安全,需要在操作共享變數時加鎖。put時先定位到segment,然後在segment裡及逆行擦汗如操作。
插入有兩個步驟,第一步判斷是否需要對segment裡的hashenrty陣列進行擴容。第二步是定位新增元素的位置,然後將其放在hashenrty數組裡。
我們先說說擴容。
在插入元素的時候會先判斷segment裡面的hashenrty陣列是否超過容量threshold。這個容量是我們剛開始初始化hashenrty陣列時採用容量大小和負載因子計算出來的。
如果超過這個閾值(threshold)那麼就會進行擴容。擴容括的時當前hashenrty而不是整個map。
如何擴容
擴容的時候會先建立一個容量是原來兩個容量大小的陣列,然後將原數組裡的元素進行再雜湊後插入到新的數組裡。
Size方法
由於map裡的元素是遍佈所有hashenrty的。因此統計size的時候需要統計每個hashenrty的大小。由於是併發環境下,可能出現有執行緒在插入或者刪除的情況。因此會出現
錯誤。我們能想到的就是使用size方法時把所有的segment的put,remove和clean方法都鎖起來。但是這種方法時很低效的。因此concurrenthashmap採用了以下辦法:
先嚐試2次通過不加鎖的方式來統計各個segment大小,如果統計的過程中,容器的count發生了變化,再採用加鎖的方式來統計所有segment的大小。
concurrenthashmap時使用modcount變數來判斷再統計的時候容器是否放生了變化。在put、remove、clean方法裡操作資料前都會將辯能力modCount進行加一,那麼在統計
size千後比較modCount是否發生變化,就可以知道容器大小是否發生變化