ConcurrentHashMap實現原理以及源碼分析
ConcurrentHashMap是HashMap的高並發版本,是線程安全的,而HashMap是非線程安全的
一、底層實現
底層結構跟hashmap一樣,都是通過數組+鏈表+紅黑樹實現的,不過它要保證線程安全性,所以在源碼上要復雜一些
線程安全是通過CAS和synchronized實現的
源碼分析
相關節點
- Node:該類用於構造table[],只讀節點(不提供修改方法)。
- TreeBin:紅黑樹結構。
- TreeNode:紅黑樹節點。
- ForwardingNode:臨時節點(擴容時使用)。
table
transient volatile Node<K,V>[] table;
所有的數據都存放在table中,table的容量會根據實際情況進行擴容,table[i]存放的類型有一下三種
- TreeBin用於包裝紅黑樹的節點類型
- ForWardingNode擴容時存放的節點類型,並發擴容實現關鍵之一
- Node普通節點類型,表示鏈表頭節點
sizeCtl
private transient volatile int sizeCtl;
默認為0,用來控制table的初始化和擴容操作,不同值的代表狀態如下:
- -1:table[i]正在初始化
- -N:表示有N-1個線程正在進行擴容操作
- 非負數情況:(1)如果table[i]未初始化,則表示table需要初始化的大小(2)如果初始化完成,則表示table[i]擴容的閥值,默認是table[]容量的0.75倍
tabAt()/casTabAt()/setTabAt()
(long)i << ASHIFT) + ABASE得到table[i]的內存偏移地址
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
這三個方法都是有原子意義的讀、寫操作
而casTabAt()與setTabAt()方法的區別
所以真正要進行有原子語義的寫操作需要使用casTabAt()方法,setTabAt()是在鎖定桶的狀態下需要使用的方法,如此方法實現只是帶保守性的一種寫法而已。
put()方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //@1
int binCount = 0; //i處結點標誌,0: 未加入新結點, 2: TreeBin或鏈表結點數, 其它:鏈表結點數。主要用於每次加入結點後查看是否要由鏈表轉為紅黑樹
for (Node<K,V>[] tab = table;;) { //CAS經典寫法,不成功無限重試
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // 若table[]未創建,則初始化
//除非構造時指定初始化集合,否則默認構造不初始化table,真正添加時元素檢查是否需要初始化。
tab = initTable(); //@2
//CAS操作得到對應table中元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //table[i]後面無節點時,直接創建Node(無鎖操作)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; //null創建Node對象做為鏈表首結點
}
else if ((fh = f.hash) == MOVED) //如果當前正在擴容,則幫助擴容並返回最新table[]
tab = helpTransfer(tab, f); //擴容完畢再在新table中放入鍵值對,擴容節細講
else { //在鏈表或者紅黑樹中追加節點
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { //雙重檢查i處結點未變化
if (fh >= 0) { //如果為鏈表結構 //表明是鏈表結點類型,hash值是大於0的,即spread()方法計算而來
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //找到key,替換value
oldVal = e.val;
if (!onlyIfAbsent) //是新元素才加入標誌位,一般使用不會用到
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//jdk1.8版本是把新結點加入鏈表尾部,next由volatile修飾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //紅黑樹結點類型
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { //@4
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //默認桶中結點數超過8個數據結構會轉為紅黑樹
treeifyBin(tab, i); //@5
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //更新size,檢測擴容
return null;
}
從上面代碼可以看出,put的步驟大致如下:
1、參數校驗。
2、若table[]未創建,則初始化。
3、當table[i]後面無節點時,直接創建Node(無鎖操作)。
4、如果當前正在擴容,則幫助擴容並返回最新table[]。
5、然後在鏈表或者紅黑樹中追加節點。
6、最後還回去判斷是否到達閥值,如到達變為紅黑樹結構。
get()方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());// 定位到table[]中的i
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {// 若table[i]存在
if ((eh = e.hash) == h) {// 比較鏈表頭部
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)// 若為紅黑樹,查找樹
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {// 循環鏈表查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;// 未找到
}
get()方法的流程相對簡單一點,從上面代碼可以看出以下步驟:
1、首先定位到table[]中的i。
2、若table[i]存在,則繼續查找。
3、首先比較鏈表頭部,如果是則返回。
4、然後如果為紅黑樹,查找樹。
5、最後再循環鏈表查找
從上面步驟可以看出,ConcurrentHashMap的get操作上面並沒有加鎖。所以在多線程操作的過程中,並不能完全的保證一致性。
table擴容機制
觸發擴容的時機
- 新增節點後,addcount統計tab中的節點個數大於闕值(sizeCtl),會觸發tranfer,重新調整節點位置
- 鏈表轉化紅黑樹(put()時檢查調用treeifyBin(tab, i))時如果table容量小於64,則會觸發擴容,調用tryPresize(n << 1)進行擴容;
- 調用putAll()一次性加入大量元素,調用tryPresize(m.size())進行擴容;
addCount()方法
private final void addCount(long x, int check) {
...
//check就是binCount,有新元素加入成功才檢查是否要擴容。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//大於當前擴容閾值並且小於最大擴容值才擴容,如果table還未初始化則等待初始化完成。
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); //@1
if (sc < 0) { //已經有線程在進行擴容工作
//檢查是原容量為n的情況下進行擴容,保證sizeCtl與n是一塊修改好的,條件2與條件3在當前RESIZE_STAMP_BITS情況下應該不會成功,歡迎指正。條件4與條件5確保tranfer()中的nextTable相關初始化邏輯已走完。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //有新線程參與擴容則sizeCtl統計加1
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) //有線程檢測到需要擴容時走這裏,初始值為(rs << RESIZE_STAMP_SHIFT) + 2)),+2沒什麽特別,只是為符合-(1+擴容線程數)的定義。
transfer(tab, null);
s = sumCount();
}
}
}
treeifyBin()方法
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//如果table.length<64 就擴大一倍 返回
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//構造了一個TreeBin對象 把所有Node節點包裝成TreeNode放進去
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);//這裏只是利用了TreeNode封裝 而沒有利用TreeNode的next域和parent域
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//在原來index的位置 用TreeBin替換掉原來的Node對象
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
tryPresize()方法
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
從上面的分析中我們可以看,addCount()是擴容是老老實實按容量x2來擴容的,而tryPresize()會傳入一個size參數,可能一次性擴容很多倍。後面采用一樣的方式調用transfer()來進行真正的擴容處理。
transfer()(擴容的核心方法)
當table的元素數量達到容量閾值sizeCtl,需要對table進行擴容:
- 構建一個nextTable,大小為table兩倍
- 把table的數據復制到nextTable中。
在擴容過程中,依然支持並發更新操作;也支持並發插入。
關於擴容機制,推薦大家看
https://blog.csdn.net/elricboa/article/details/70199409
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 構建一個nextTable,大小為table兩倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//通過for自循環處理每個槽位中的鏈表元素,默認advace為真,通過CAS設置transferIndex屬性值,並初始化i和bound值,i指當前處理的槽位序號,bound指需要處理的槽位邊界,先處理槽位15的節點;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) { // 遍歷table中的每一個節點
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // //如果所有的節點都已經完成復制工作 就把nextTable賦值給table 清空臨時對象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //擴容閾值設置為原來容量的1.5倍 依然相當於現在容量的0.75倍
return;
}
// 利用CAS方法更新這個擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍歷到的節點為空 則放入ForwardingNode指針
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍歷到ForwardingNode節點 說明這個點已經被處理過了 直接跳過 這裏是控制並發擴容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // 鏈表節點
int runBit = fh & n; // resize後的元素要麽在原地,要麽移動n位(n為原capacity),詳解見:https://huanglei.rocks/coding/194.html#4%20resize()%E7%9A%84%E5%AE%9E%E7%8E%B0
Node<K,V> lastRun = f;
//以下的部分在完成的工作是構造兩個鏈表 一個是原鏈表 另一個是原鏈表的反序排列
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一個鏈表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一個鏈表
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
//設置advance為true 返回到上面的while循環中 就可以執行i--操作
advance = true;
}
//對TreeBin對象進行處理 與上面的過程類似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//構造正序和反序兩個鏈表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// (1)如果lo鏈表的元素個數小於等於UNTREEIFY_THRESHOLD,默認為6,則通過untreeify方法把樹節點鏈表轉化成普通節點鏈表;(2)否則判斷hi鏈表中的元素個數是否等於0:如果等於0,表示lo鏈表中包含了所有原始節點,則設置原始紅黑樹給ln,否則根據lo鏈表重新構造紅黑樹。
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); // tab[i]已經處理完了
advance = true;
}
}
}
}
}
}
二、ConcurrentHashMap在1.7與1.8的不同
區別 | JDK1.7 | JDK1.8 |
---|---|---|
同步機制 | 分段鎖,每個segment繼承ReentrantLock | CAS+synchronized保證並發更新 |
存儲結構 | 數組+鏈表 | 數組+鏈表+紅黑樹 |
鍵值對 | HashEntry | Node |
put操作 | 多個線程同時競爭獲取同一個segment鎖,獲取成功的線程更新map;失敗的線程嘗試多次獲取鎖仍未成功,則掛起線程,等待釋放鎖 | 訪問相應的bucket時,使用sychronizeded關鍵字,防止多個線程同時操作同一個bucket,如果該節點的hash不小於0,則遍歷鏈表更新節點或插入新節點;如果該節點是TreeBin類型的節點,說明是紅黑樹結構,則通過putTreeVal方法往紅黑樹中插入節點;更新了節點數量,還要考慮擴容和鏈表轉紅黑樹 |
size實現 | 統計每個Segment對象中的元素個數,然後進行累加,但是這種方式計算出來的結果並不一樣的準確的。先采用不加鎖的方式,連續計算元素的個數,最多計算3次:如果前後兩次計算結果相同,則說明計算出來的元素個數是準確的;如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數; | 通過累加baseCount和CounterCell數組中的數量,即可得到元素的總個數; |
三、ConcurrentHashMap與HashTable的區別
hash table雖然性能上不如ConcurrentHashMap,但並不能完全被取代,兩者的叠代器的一致性不同的,hash table的叠代器是強一致性的,而concurrenthashmap是弱一致的。 ConcurrentHashMap的get,clear,iterator 都是弱一致性的。
下面是大白話的解釋:
- Hashtable的任何操作都會把整個表鎖住,是阻塞的。好處是總能獲取最實時的更新,比如說線程A調用putAll寫入大量數據,期間線程B調用get,線程B就會被阻塞,直到線程A完成putAll,因此線程B肯定能獲取到線程A寫入的完整數據。壞處是所有調用都要排隊,效率較低。
- ConcurrentHashMap 是設計為非阻塞的。在更新時會局部鎖住某部分數據,但不會把整個表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。壞處 是嚴格來說讀取操作不能保證反映最近的更新。例如線程A調用putAll寫入大量數據,期間線程B調用get,則只能get到目前為止已經順利插入的部分 數據。
選擇哪一個,是在性能與數據一致性之間權衡。ConcurrentHashMap適用於追求性能的場景,大多數線程都只做insert/delete操作,對讀取數據的一致性要求較低。
ConcurrentHashMap實現原理以及源碼分析