美國油價上漲刺激新能源車銷量:推廣電車已成國策,但想買到也不是那麼容易
ConcurrentHashMap
HashMap通常的實現方式是“陣列+連結串列”,這種方式被稱為“拉鍊法”。ConcurrentHashMap在這個 基本原理之上進行了各種優化
首先是所有資料都放在一個大的HashMap中;其次是引入了紅黑樹,原理如下:
- 如果頭節點是Node型別,則尾隨它的就是一個普通的連結串列;如果頭節點是TreeNode型別,它的後 面就是一顆紅黑樹,TreeNode是Node的子類
- 連結串列和紅黑樹之間可以相互轉換:初始的時候是連結串列,當連結串列中的元素超過某個閾值時,把連結串列轉 換成紅黑樹;反之,當紅黑樹中的元素個數小於某個閾值時,再轉換為連結串列。
- 這種設計優化:
- 使用紅黑樹,當一個槽裡有很多元素時,其查詢和更新速度會比連結串列快很多,Hash衝突的問 題由此得到較好的解決。
- 加鎖的粒度,並非整個ConcurrentHashMap,而是對每個頭節點分別加鎖,即併發度,就是 Node陣列的長度,初始長度為16
- 併發擴容,這是難度最大的。當一個執行緒要擴容Node陣列的時候,其他執行緒還要讀寫,因此 處理過程很複雜,後面會詳細分析
- 所以這種設計一方面降低了Hash衝突,另一方面也提升了併發度
原始碼解析
1、構造方法
public ConcurrentHashMap(int initialCapacity,//初始容量 float loadFactor, //載入因子 int concurrencyLevel) {//級別 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
- 引數:初始容量,載入因子,併發級別
- 如果初始化大小小於併發級別,則讓他們相等
- 判斷size大小
- cap代表數字長度的值
- 變數cap就是Node陣列的長度,保持為2的整數次方。tableSizeFor(...)方法是根 據傳入的初始容量,計算出一個合適的陣列長度。具體而言:1.5倍的初始容量+1,再往上取最接近的2 的整數次方,作為陣列長度cap的初始值
- 這裡的 sizeCtl,其含義是用於控制在初始化或者併發擴容時候的執行緒數,只不過其初始值設定成 cap。
2、初始化
構造方法裡只計算了陣列的初始大小,並沒有對陣列進行初始化。當多個執行緒都往裡面放 入元素的時候,再進行初始化。這就存在一個問題:多個執行緒重複初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 自旋等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 重點:將sizeCtl設定為-1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 初始化
table = tab = nt;
// sizeCtl不是陣列長度,因此初始化成功後,就不再等於陣列長度,而是n-(n>>>2)=0.75n,表示下一次擴容的閾值:n-n/4
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;// 設定sizeCtl的值為sc。
}
break;
}
}
return tab;
}
- 首先需要初始化一個table,當sizeCtl等於0時,自旋等待
- 若條件發生變化,則將sizeCtl設定為-1(併發,那個執行緒獲取-1,就可以進行下面的操作)
- 初始化建立陣列,長度為n
- sizeCtl不是陣列長度,因此初始化成功後,就不再等於陣列長度,而是n-(n>>>2)=0.75n,表示下一次擴容的閾值:n-n/4
- 設定sizeCtl的值為sc,返回Node陣列
通過上面的程式碼可以看到,多個執行緒的競爭是通過對sizeCtl進行CAS操作實現的。如果某個執行緒成 功地把 sizeCtl 設定為-1,它就擁有了初始化的權利,進入初始化的程式碼模組,等到初始化完成,再把 sizeCtl設定回去;其他執行緒則一直執行while迴圈,自旋等待,直到陣列不為null,即當初始化結束時, 退出整個方法
3、put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 分支1:整個陣列初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 分支2:第i個元素初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 分支3:擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 分支4:放入元素
else {
V oldVal = null;
// 加鎖
synchronized (f) {
// 連結串列
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 紅黑樹
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果是連結串列,上面的binCount會一直累加
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);// 超出閾值,轉換為紅黑樹
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);// 總元素個數累加1
return null;
}
- onlyIfAbsent:表示不存在的時候才向陣列中放
- 若等於分支1.則呼叫初始化陣列方法initTable()
- 若為分支2,第i個元素初始化(表示這個槽點沒有資料,所以直接將該元素放到頭節點返回)
- 若為分支3,說明這個槽點正在擴容,helpTransfer方法幫助擴容
- 最後到分支4,就是把元素放入槽內。槽內可能是一個連結串列,也可能是一棵紅黑樹,通過頭節點的型別 可以判斷是哪一種。第4個分支是包裹在synchronized (f)裡面的,f對應的陣列下標位置的頭節點, 意味著每個陣列元素有一把鎖,併發度等於陣列的長度
- 上面的binCount表示連結串列的元素個數,當這個數目超過TREEIFY_THRESHOLD=8時,把連結串列轉換成 紅黑樹,也就是 treeifyBin(tab,i)方法。但在這個方法內部,不一定需要進行紅黑樹轉換,可能只做 擴容操作。
4、擴容
擴容是ConcurrentHashMap中實現最複雜的一環
從上面的put方法原始碼中,可知當binCount大於等於閾值8,變成紅黑樹,所以擴容從treeifyBin方法讀起
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//陣列長度小於閾值64,不做紅黑樹轉換,直接擴容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 連結串列轉換為紅黑樹
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍歷連結串列,初始化紅黑樹
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
- 判斷tab不等於null,說明槽點資料已經有了
- 陣列長度小於閾值64,不做紅黑樹轉換,直接擴容
- MIN_TREEIFY_CAPACITY=64(陣列的長度):儲存箱可樹化的最小表容量,這個值至少是4被閾值(TREEIFY_THRESHOLD=8)大小,以避免調整大小和樹化閾值之間的衝突
- 若陣列長度大於閾值64時,則轉換為紅黑樹
在treeifyBin方法中,內部呼叫了方法tryPresize方法
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 計算步長
if (nextTab == null) { // 初始化新的HashMap
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //擴容兩倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//初始的transferIndex為舊HashMap的陣列長度
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 此處,i為遍歷下標,bound為邊界
// 如果成功獲取一個任務,則i=nextIndex-1
//bound=nextIndex-stride;
//如果獲取不到,則i=0,bound=0
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance表示在從i=transferIndex-1遍歷到bound位置的過程中,是否一直繼續
while (advance) {
int nextIndex, nextBound;
// 以下是哪個分支中的advance都是false,表示如果三個分支都不執行,才可以一直while迴圈
// 目的在於當對transferIndex執行CAS操作不成功的時候,需要自旋,以期獲取一個stride的遷移任務。
if (--i >= bound || finishing)
// 對陣列遍歷,通過這裡的--i進行。如果成功執行了--i,就不需要繼續while迴圈了,因為advance只能進一步。
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 對transferIndex執行CAS操作,即為當前執行緒分配1個stride。
// CAS操作成功,執行緒成功獲取到一個stride的遷移任務;
// CAS操作不成功,執行緒沒有搶到任務,會繼續執行while迴圈,自旋。
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i越界,整個HashMap遍歷完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// finishing表示整個HashMap擴容完成
if (finishing) {
nextTable = null;
// 將nextTab賦值給當前table
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// tab[i]遷移完畢,賦值一個ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// tab[i]的位置已經在遷移過程中
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 對tab[i]進行遷移操作,tab[i]可能是一個連結串列或者紅黑樹
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 連結串列
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
// 表示lastRun之後的所有元素,hash值都是一樣的
// 記錄下這個最後的位置
lastRun = p;
}
}
if (runBit == 0) {// 連結串列遷移的優化做法
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {// 紅黑樹,遷移做法和連結串列類似
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
-
可以看到,擴容首先是先嚐試擴容,呼叫第一個方法tryPresize,也就是嘗試預先調整表的大小以容納給定數量的元素
-
具體擴容的方法在末尾呼叫的第二個方法transfer
-
擴容的基本原理:
-
首先建立一個新的HashMap,其陣列長度是原來陣列的倆倍
-
將old的元素逐個遷移過來,這點從引數中可以看出,tab是擴容前的HashMap,nextTab是擴容後的hashMap,當nextTab==null時,就會對nextTab進行初始化,容量為2倍
-
上述步驟就會遇到一個問題,併發問題,如下圖:
-
舊的的陣列長度為N,每個執行緒擴容一段,一段的長度用量用變數stride(步長)來表示,transferIndex表示整個陣列擴容的進度
-
stride(步長)的計算公式:在單核模式下直接等於0,因為單核模式下沒有辦 法多個執行緒並行擴容,只需要1個執行緒來擴容整個陣列;在多核模式下為 (n>>> 3)/NCPU,並且保證步長的最小值是 16。顯然,需要的執行緒個數約為n/stride
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
-
transferIndex是ConcurrentHashMap的一個成員變數,記錄了擴容的進度。初始值為n,從大到 小擴容,每次減stride個位置,最終減至n<=0,表示整個擴容完成。因此,從[0,transferIndex-1]的 位置表示還沒有分配到執行緒擴容的部分,從[transfexIndex,n-1]的位置表示已經分配給某個執行緒進行擴 容,當前正在擴容中,或者已經擴容成功。
-
因為transferIndex會被多個執行緒併發修改,每次減stride,所以需要通過CAS進行操作,如下面的程式碼 所示
else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0)))
-
在擴容未完成之前,有的陣列下標對應的槽已經遷移到了新的HashMap裡面,有的還在舊的 HashMap 裡面。這個時候,所有呼叫 get(k,v)的執行緒還是會訪問舊 HashMap,怎麼處理 呢?
-
解決方案:當Node[0]已經遷移成功,而其他Node還在遷移過程中時, 如果有執行緒要讀取Node[0]的資料,就會訪問失敗。為此,新建一個ForwardingNode,即轉 發節點,在這個節點裡面記錄的是新的 ConcurrentHashMap 的引用。這樣,當執行緒訪問到 ForwardingNode之後,會去查詢新的ConcurrentHashMap
-
因為陣列的長度 tab.length 是2的整數次方,每次擴容又是2倍。而 Hash 函式是 hashCode%tab.length,等價於hashCode&(tab.length-1)。這意味著:處於第i個位置的 元素,在新的Hash表的陣列中一定處於第i個或者第i+n個位置,舉個簡單的例 子:假設陣列長度是8,擴容之後是16
-
若hashCode=5,5&8=0,擴容後,5&16=0,位置保持不變;
-
若hashCode=24,24&8=0,擴容後,24&16=8,後移8個位置;
-
若hashCode=25,25&8=1,擴容後,25&16=9,後移8個位置;
-
若hashCode=39,39&8=7,擴容後,39&8=7,位置保持不變;
-
正因為有這樣的規律,所以如下有程式碼:
setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd);
-
也就是把tab[i]位置的連結串列或紅黑樹重新組裝成兩部分,一部分連結到nextTab[i]的位置,一部分鏈 接到nextTab[i+n]的位置。然後把tab[i]的位置指向一個ForwardingNode節點
-
同時,當tab[i]後面是連結串列時,使用類似於JDK 7中在擴容時的優化方法,從lastRun往後的所有節 點,不需依次拷貝,而是直接連結到新的連結串列頭部。從lastRun往前的所有節點,需要依次拷貝
-
瞭解了核心的遷移函式transfer(tab,nextTab),再回頭看tryPresize(int size)函式。這個函 數的輸入是整個Hash表的元素個數,在函式裡面,根據需要對整個Hash表進行擴容。想要看明白這個 函式,需要透徹地理解sizeCtl變數,下面這段註釋摘自原始碼
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */
- 當sizeCtl=-1時,表示整個HashMap正在初始化;
- 當sizeCtl=某個其他負數時,表示多個執行緒在對HashMap做併發擴容;
- 當sizeCtl=cap時,tab=null,表示未初始之前的初始容量(如上面的建構函式所示);
- 擴容成功之後,sizeCtl儲存的是下一次要擴容的閾值,即上面初始化程式碼中的n-(n>>>2) =0.75n。
- 所以,sizeCtl變數在Hash表處於不同狀態時,表達不同的含義。明白了這個道理,再來看上面的 tryPresize(int size)函式。
-
tryPresize(int size)是根據期望的元素個數對整個Hash表進行擴容,核心是呼叫transfer函式。 在第一次擴容的時候,sizeCtl會被設定成一個很大的負數U.compareAndSwapInt(this,SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT)+2);之後每一個執行緒擴容的時候,sizeCtl 就加 1, U.compareAndSwapInt(this,SIZECTL,sc,sc+1),待擴容完成之後,sizeCtl減1。
-
-
-
-