java集合類之ConcurrentHashMap
阿新 • • 發佈:2018-12-25
本文從執行緒安全的角度結合原始碼介紹了ConcurrentHashMap,不介紹與HashMap雷同的部分,如果對HashMap的實現有興趣,可以參考java 集合類之HashMap。
本文介紹的ConcurrentHashMap基於Java1.8原始碼,ConcurrentHashMap的實現在1.8有重大調整,使用CAS的方式取代了之前的分段鎖。
sizeCtl
sizeCtl 是一個標誌量,下面的判斷很多都是基於這個值來判斷的,需要首先了解一下不同狀態下的sizeCtl
只調用了建構函式,還沒初始化表的時候 sizeCtl=0
一個執行緒成功競爭到了initTable的資格,sizeCtl = -1
resize過程中,sizeCtl = (rs<<16) + 正在參與transfer的執行緒的數目+1,其中rs是一個第17位為1的正數
已完成初始化且不在resize狀態,sizeCtl 是陣列容量的3/4,起到類似HashMap的threshold的作用
initTable()
初始化表格
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果多個執行緒競爭初始化,競爭失敗的執行緒自旋等待
if ( (sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// cas 設定sizeCtl的值,需要注意的是這裡沒有使用傳統的方法對sizeCtl使用等號賦值
// SIZECTL這個變數在類的靜態初始化塊中儲存了sizeCtl這個變數的地址,直接通過記憶體
// 的cas 為sizeCtl field設定值
else if (U.compareAndSwapInt (this, SIZECTL, sc, -1)) {
try {
// 初始化表並記錄threshold
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
// 最後將sizeCtl的值設定為陣列長度的四分之三,這個sizeCtl充當的角色就是
// HashMap裡面的threshold,用來判斷是否擴容
sizeCtl = sc;
}
break;
}
}
return tab;
}
put()
put()方法用於向ConcurrentHashMap中新增資料,需要被設計成原子的操作。
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
* key和value都不能是null
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果表還沒初始化,那麼初始化這個表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果對應的index還沒有資料,那麼使用cas新建一個bucket,新增就完成了,跳出迴圈
// 如果這裡cas失敗了,那麼就會再迴圈,然後往這個新增的bucket後面追加資料
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果當前正在遷移資料,那麼本執行緒加入遷移工作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 當前的hash對應的bucket已經有值了,加鎖,執行插入操作,
// 以下的插入操作和HashMap十分類似
V oldVal = null;
synchronized (f) {
// 注意這裡又重新檢驗了一次,確認在比對完成到加鎖的這段時間,
// 這個連結串列的頭節點沒有發生變化
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果這個key已經存在,那麼直接替換
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果遍歷到尾部發現還是沒有這個key,那麼直接新增一個
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 當前連結串列已經轉化成了樹,往樹中插入節點
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 計算連結串列的長度並判斷是否要轉化為樹
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 成功插入之後調整size
addCount(1L, binCount);
return null;
}
addCount()
addCount()函式在put操作之後調整大小並決定是否要開始擴容
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果countCells不為空或者cas更新base count 出錯那麼就執行下面的程式碼
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果as是null或者當前的as的長度為0或者當前執行緒對應的countCell是空或者cas更新當前執行緒對應的count cell 出錯
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 記錄現有的size
s = sumCount();
}
// 如果要check resize
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果現在的大小大於了sizeCtl,並且當前size還有繼續擴充套件的空間,那麼rezie
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 計算一個和長度對應的標識
int rs = resizeStamp(n);
// resizeStamp(n) = Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
// rs高16位都是0.第16位是1,後面的是n高位連續0的個數
// sc小於0 表示正在擴容
if (sc < 0) {
// 首先注意這裡有個[bug](https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427)
// sc == rs+1 這個不可能成立,因為sc <0且rs>0,這裡應該是
// sc == ( rs<<<RESIZE_STAMP_SHIFT ) +1 || sc == ( rs<<<RESIZE_STAMP_SHIFT ) + MAX_RESIZERS
// sc == ( rs<<<RESIZE_STAMP_SHIFT ) +1 判斷擴容是否結束,這是怎麼實現的呢,因為第一個執行緒開始transfer的時候
// 設定size ctl = (rs<<RESIZE_STAMP_SHIFT)+2,之後每個執行緒加入transfer會將size ctl 加1,退出transfer的時候會將size ctl 減1
// 這樣當完成transfer的時候,size ctl = (rs<<RESIZE_STAMP_SHIFT)+1
// (sc >>> RESIZE_STAMP_SHIFT) == rs + MAX_RESIZERS 判斷helpTransfer的執行緒是不是達到了限制數量
// (sc >>> RESIZE_STAMP_SHIFT) != rs 是不是size ctl 發生了變化
// (nt = nextTable) == null 結束擴容了
// transferIndex <= 0 也表示擴容結束了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果cas 成功更新了size ctl 那麼開始轉移資料
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 還沒開始擴容,那麼開始擴容,
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
transfer()
transfer實現擴容後資料的遷移
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
* copy on write 的轉移模式
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 根據cpu的個數確定步長,即一次transfer需要處理的桶的個數
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果還沒初始化下個表,就初始化nextTable
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 這個while塊是調整transferIndex的地方,如果調整成功了,那麼就開始執行遷移工作
while (advance) {
int nextIndex, nextBound;
// 如果這一批的transfer 還沒到bound的位置
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 如果成功更新了transferIndex就開始transfer工作
// 這裡的i 表示的是這次transfer要處理的桶的下標,這個是從大到小遞減的
// nextBound 表示的是這次transfer要處理到的最後一個桶的下標
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 越界檢查,如果要處理的下標越界了,
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果完成了本次transfer,那麼標誌整個transfer完成
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 那麼可能是出錯了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 如果這條執行緒不是最後一個負責transfer的,那就退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 繼續處理
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果本次要負責的這個桶沒有資料,那麼設定為fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果本次負責的這個桶已經被處理了,直接將advance設定為true,繼續下次的transfer
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// transfer 桶的過程
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 判斷是不是連結串列,TreeBin的hash 是-2,ForwardNode的hash是-1
if (fh >= 0) {
// 計算這個桶的hash的二進位制低位第n位的值
int runBit = fh & n;
Node<K,V> lastRun = f;
// 這個for迴圈的程式碼可以看出是為了找到一個lastRun,那麼找到這個lastRun的目的是什麼呢
// 瞭解HashMap resize()過程的人肯定知道,這整個分支的工作是將連結串列分為兩類LOW和HIGH,LOW的hash&n==0
// 這一部分在擴容之後的index計算不受影響,HIGH的hash&n==n,這一部分新的index應該加上n,
// 這段程式碼的作用是在一個連結串列的尾部有若干個類別相同的節點,那麼在後面的劃分時當作一個整體處理,就會減少開銷
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 設定重用的尾部的那一部分鏈作為高頭還是低頭
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 頭插法插入不同型別的節點到兩個連結串列中
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);