ConcurrentHashMap 詳解一
本文程式碼來自JDK8
- ConcurrentHashMap 實現了執行緒安全;
- 雖然可以通過 Hashtable 或者 Collections.synchronizedMap 來生成一個執行緒安全的 Map 例項, 但這是全域性鎖方式, 效能不行;
- ConcurrentHashMap 的儲存方式和 HashMap 十分類似, 都是用陣列 + 連結串列 + 紅黑樹的結構, 差別在於一些操作需要做執行緒同步處理;
- 在 JDK7 ConcurrentHashMap 使用 Segment 分段鎖的方式實現執行緒安全, 而在 JDK8 就拋棄這種做法, 採用 CAS 演算法來保證執行緒安全, 這裡就不展開討論分段鎖的方式了, 有興趣可以去找 JDK7 的原始碼分析.
記憶體模型
要了解 Java 的執行緒安全, 首先要知道它的記憶體模型. 記憶體模型是一個概念, 簡單來講就是它定義了虛擬機器裡多執行緒如何訪問變數. 這裡涉及到兩個概念: 主記憶體和工作記憶體. 執行緒之間的共享變數都是儲存在主記憶體, 每個執行緒如果想要訪問變數, 那麼需要先把變數從主記憶體載入到私有的工作記憶體, 然後對工作記憶體的變數進行操作, 最後再更新到主記憶體中. 注意區分這裡的記憶體模型跟硬體記憶體不是一碼事, 簡單理解為主記憶體和工作記憶體都可以包含 CPU 暫存器, CPU 快取, RAM.
volatile
根據記憶體模型, 假設有一個成員變數 value, 執行緒 A 修改了它, 實際是修改執行緒 A 私有的工作記憶體裡面的 value 的副本, 還沒有更新到主記憶體的 value, 因此執行緒 B 讀取的 value 還是舊的 value, 這樣就不同步了.
而使用 volatile 修飾的成員變數 value 有個特點, 那就是假如執行緒 A 修改了這個變數的值, 那麼它會通知其他執行緒應該去主記憶體讀取新的 value, 這就是可見性.
CAS
CAS 全稱是比較並交換, 是一種樂觀鎖機制, 它要求變數對其他執行緒是可見的, 所以需要用 volatile 修飾, 當然也不是一定要這樣, 也可以用 synchronized 來實現, 畢竟這只是個演算法, 實現方式多種多樣. 涉及到三個引數: 記憶體值, 期待值, 更新值. 演算法有三個步驟:
- 讀取記憶體值;
- 比較記憶體值和期待值;
- 兩者相同就把更新值更新上去.
這三個步驟是一個原子操作, 實際是通過 CPU 匯流排加鎖或者快取加鎖方式來實現, 我也沒有具體深入瞭解, 只知道是個原子操作就行.
在 Java 中 CAS 的實現是在 Unsafe 類中, 它有個本地方法 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
ConcurrentHashMap 的插入方式跟 HashMap 是一樣的, 唯一的區別就是多了執行緒安全處理, 所以接下來主要是對執行緒安全詳細講解.
變數
-
transient volatile Node<K,V>[] table;
使用 volatile 修飾的陣列, 針對該陣列引用具有可見性, 對於陣列元素沒有可見性. 為了保證陣列元素也有可見性, 這裡就用 volatile 修飾 Node 類的 val 和 next. -
private transient volatile Node<K,V>[] nextTable;
用來擴容時的臨時陣列 -
private transient volatile int sizeCtl;
一個控制欄位, -1 表示在初始化陣列, -N 表示有 N-1 個執行緒在擴容, table 初始化後儲存下一個次擴容的大小, 跟閾值一樣. 這裡要說明一下, 雖然都說 -N 表示有 N-1 個執行緒擴容, 我也沒仔細研究是不是這樣, 但是在程式碼裡面是這樣來設定的: 第一個執行緒需要擴容, 它會用一個非常大的負數對 sizeCtl 設定, 此後每多一個執行緒擴容, sizeCtl 就會加一, 然後擴容完成後 sizeCtl 減一, 最後把它設定成新容量的閾值(正數).
初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl 小於 0, 表示陣列正在被其他執行緒進行初始化, 那麼掛起本執行緒
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// U 是 Unsafe 類的例項, 用來操作 CAS 演算法
// 這裡使用 CAS 演算法把 sizeCtl 改成 -1
// 如果修改成功會返回 true, 然後初始化 table
// 因為這個 CAS 條件, 這裡的初始化只有一個執行緒執行, 其他執行緒是不會進入這裡
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 相當於 0,75*n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
擴容
ConcurrentHashMap 是支援多執行緒擴容方式, 比如執行緒 A 把舊陣列的索引位置 1 複製到新陣列, 同時執行緒 B 也把舊陣列的索引位置 2 複製到新陣列, 效率更加快. 另外每個執行緒都是負責一整個索引區域, 所以擴容原理是先找出本執行緒應該負責的索引區域, 然後遍歷這個區域, 將其中的節點複製到新陣列.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 單個執行緒允許處理的最少 table 桶首節點數量
// 即每個執行緒的任務量
// 大概是根據 CPU 數量來算出, 為什麼這樣算我還沒弄明白
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab 作為臨時陣列先擴容一倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 這是一個特殊的節點, hash 值設定為 -1, 也就是常量 MOVED
// 擴容過程中遇到索引位置為空就設定成該節點
// 或者索引位置不為空, 但是已經處理複製後也把索引位置設定為該節點
// 目的是為了告訴其他執行緒不需要再處理該索引位置
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 表示索引 i 節點是否被複製成功
boolean advance = true;
// 表示所有節點複製完成
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 這個迴圈目的很簡單
// 首先我們要知道擴容是一批一批的複製到新陣列的
// 比如把索引範圍 [10, 16) 的節點複製到新陣列
// 而這裡是逆序擴容, 比如原來陣列範圍是 [0, 16), 首先是對 [10, 16) 進行復制
// 還有變數 stride 就是區間大小, 比如這裡就是 6
// 所以這個迴圈目的就是為了找出允許執行緒擴容的索引範圍 [bound, i]
// 這裡只有更新共享變數 transferIndex 才用到 CAS 演算法, 其他操作就不需要了
while (advance) {
int nextIndex, nextBound;
// 滿足 [bound, i] 這個區間或者已經完成擴容, 跳出這個迴圈
if (--i >= bound || finishing)
advance = false;
// nextIndex 是邊界 i 的臨時儲存, 如果小於 0, 說明沒有要複製的節點了
// transferIndex 是共享變數, 儲存區間範圍的上限, 初始值是舊陣列長度
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 嘗試更新 transferIndex
// 如果成功, 當前執行緒就負責複製 [nextBound, nextIndex) 範圍的節點
// transferIndex 變成 nextBound
// 注意這裡 i=nextIndex-1, 所以 [nextBound, nextIndex) 也是 [bound, i]
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 下面開始複製 [bound, i] 範圍的節點, 逆序複製, 從 i 開始
// 對於擴容完成處理
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
// sizeCtl 設定為總大小的 0.75
// 至於這裡為什麼不用 CAS 更新值, 可能是沒有必要吧, 重複更新也沒關係
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 擴容完成, sizeCtl 減一
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 擴容前 sizeCtl 會設定成 resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
// 如果不相等說明有其他執行緒執行擴容完成的操作了, 本執行緒不需要重複操作了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 對於 i 的節點為空, 那麼設定指向特殊節點 ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 當前執行緒判斷到這個節點的 hash 值是 MOVED
// 說明是特殊節點, 已經有其他執行緒操作了, 可以跳過這個節點
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 如果 i 既不是空值, 也不是特殊節點, 說明這是個普通節點
// 那麼就開始對這個連結串列或者樹進行復制, 首先是把它鎖上, 防止其他執行緒同時操作它
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 下面就是對連結串列或者樹複製的過程, 具體可以參考 HashMap
// 值得注意的是對於樹結構, 這裡索引位置不是直接指向樹的根節點
// 而是用 TreeBin 構造紅黑樹, 然後指向這個 TreeBin
// TreeBin 的 hash 值設定為 -2, 而其他節點的 hash 值都是大於 0
// 節點 hash 值的計算可以參考 spread 方法
// 因此可以通過 hash 值大於等於 0 來判斷是連結串列還是樹結構
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 複製完成後用特殊節點代替原來節點
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 這裡建立 TreeBin 來構造紅黑樹, 具體構造過程可以參考 HashMap
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 複製完成後用特殊節點代替原來節點
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
/**
* 計算 key 的 hash 值, 其中 HASH_BITS 是 0x7fffffff, 所以 hash 值一定大於等於 0
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}