jdk1.8中ConcurrentHashMap的實現原理
併發環境下為什麼使用ConcurrentHashMap
1. HashMap在高併發的環境下,執行put操作會導致HashMap的Entry連結串列形成環形資料結構,從而導致Entry的next節點始終不為空,因此產生死迴圈獲取Entry
2. HashTable雖然是執行緒安全的,但是效率低下,當一個執行緒訪問HashTable的同步方法時,其他執行緒如果也訪問HashTable的同步方法,那麼會進入阻塞或者輪訓狀態。
3. 在jdk1.6中ConcurrentHashMap使用鎖分段技術提高併發訪問效率。首先將資料分成一段一段地儲存,然後給每一段資料配一個鎖,當一個執行緒佔用鎖訪問其中一段資料時,其他段的資料也能被其他執行緒訪問。然而在jdk1.8中的實現已經拋棄了Segment分段鎖機制,利用CAS+Synchronized來保證併發更新的安全,底層依然採用陣列+連結串列+紅黑樹的儲存結構。
JDK1.6分析
ConcurrentHashMap採用 分段鎖的機制,實現併發的更新操作,底層由Segment陣列和HashEntry陣列組成。Segment繼承ReentrantLock用來充當鎖的角色,每個 Segment 物件守護每個雜湊對映表的若干個桶。HashEntry 用來封裝對映表的鍵 / 值對;每個桶是由若干個 HashEntry 物件連結起來的連結串列。一個 ConcurrentHashMap 例項中包含由若干個 Segment 物件組成的陣列,下面我們通過一個圖來演示一下 ConcurrentHashMap 的結構:
JDK1.8分析
改進一:取消segments欄位,直接採用transient volatile HashEntry<K,V> table
改進二:將原先table陣列+單向連結串列的資料結構,變更為table陣列+單向連結串列+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之後能均勻的分佈在陣列中。如果hash之後雜湊的很均勻,那麼table陣列中的每個佇列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類預設的載入因子為0.75,但是在資料量過大或者運氣不佳的情況下,還是會存在一些佇列長度過長的情況,如果還是採用單向列表方式,那麼查詢某個節點的時間複雜度為O(n);因此,對於個數超過8(預設值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度可以降低到O(logN),可以改進效能。
ConcurrentHashMap的重要屬性
/**
* races. Updated via CAS.
* 記錄容器的容量大小,通過CAS更新
*/
private static final long BASECOUNT;
/**
* 這個sizeCtl是volatile的,那麼他是執行緒可見的,一個思考:它是所有修改都在CAS中進行,但是sizeCtl為什麼不設計成LongAdder(jdk8出現的)型別呢?
* 或者設計成AtomicLong(在高併發的情況下比LongAdder低效),這樣就能減少自己操作CAS了。
*
* 預設為0,用來控制table的初始化和擴容操作,具體應用在後續會體現出來。
* -1 代表table正在初始化
* -N 表示有N-1個執行緒正在進行擴容操作
* 其餘情況:
*1、如果table未初始化,表示table需要初始化的大小。
*2、如果table初始化完成,表示table的容量,預設是table大小的0.75 倍,居然用這個公式算0.75(n - (n >>> 2))。
**/
private static final long SIZECTL;
/**
* 自旋鎖 (鎖定通過 CAS) 在調整大小和/或建立 CounterCells 時使用。 在CounterCell類更新value中會使用,功能類似顯示鎖和內建鎖,效能更好
* 在Striped64類也有應用
*/
private static final long CELLSBUSY;
Node:儲存key,value及key的hash值的資料結構。其中value和next都用volatile修飾,保證併發的可見性。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;//volatile型別的
volatile Node<K,V> next;//volatile型別的
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//省略部分程式碼
}
ForwardingNode:一個特殊的Node節點,hash值為-1,其中儲存nextTable的引用。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
//省略部分程式碼
}
ConcurrentHashMap的建構函式
//預設的建構函式
public ConcurrentHashMap(){}
/**
*initialCapacity 初始化容量
**/
public ConcurrentHashMap(int initialCapacity) {}
/**
*
*建立與給定map具有相同對映的新map
**/
public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
/**
*initialCapacity 初始容量
*loadFactor 負載因子,當容量達到initialCapacity*loadFactor時,執行擴容
*concurrencyLevel 預估的併發更新執行緒數
**/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {}
/**
*initialCapacity 初始容量
*loadFactor 負載因子
*concurrencyLevel 預估的併發更新執行緒數
**/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {}
接下來具體看看第四個建構函式的具體實現:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) //至少使用盡可能多的bin
initialCapacity = concurrencyLevel; //作為估計執行緒
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;//初始化sizeCtl
}
/**
*返回給定所需容量,table的大小總是2的冪次方
**/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
ConcurrentHashMap在建構函式中只會初始化sizeCtl值,並不會直接初始化table,而是延緩到第一次put操作
put()方法的實現
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//對hashCode進行再雜湊,演算法為(h ^ (h >>> 16)) & HASH_BITS
int binCount = 0;
//這邊加了一個迴圈,就是不斷的嘗試,因為在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
//因為如果其他執行緒正在修改tab,那麼嘗試就會失敗,所以這邊要加一個for迴圈,不斷的嘗試
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table為空,初始化;否則,根據hash值計算得到陣列索引i,如果tab[i]為空,直接新建節點Node即可。注:tab[i]實質為連結串列或者紅黑樹的首節點。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不為空並且hash值為MOVED(-1),說明該連結串列正在進行transfer操作,返回擴容完成後的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 針對首個節點進行加鎖操作,而不是segment,進一步減少執行緒衝突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在連結串列中找到值為key的節點e,直接設定e.val = value即可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果沒有找到值為key的節點,直接新建Node並加入連結串列即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首節點為TreeBin型別,說明為紅黑樹結構,執行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果節點數>=8,那麼轉換連結串列結構為紅黑樹結構。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 計數增加1,有可能觸發transfer操作(擴容)。
addCount(1L, binCount);
return null;
}
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/*
*但是這邊為什麼i要等於((long)i << ASHIFT) + ABASE呢,計算偏移量
*ASHIFT是指tab[i]中第i個元素在相對於陣列第一個元素的偏移量,而ABASE就算第一陣列的記憶體素的偏移地址
*所以呢,((long)i << ASHIFT) + ABASE就算i最後的地址
* 那麼compareAndSwapObject的作用就算tab[i]和c比較,如果相等就tab[i]=v否則tab[i]=c;
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
我們還是繼續一步步看程式碼,看inputVal的註釋a,這個方法helpTransfer,如果執行緒進入到這邊說明已經有其他執行緒正在做擴容操作,這個是一個輔助方法
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//下面幾種情況和addCount的方法一樣,請參考addCount的備註
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
當我們的putVal執行到addCount的時候
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次竟來都baseCount都加1因為x=1
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//多執行緒CAS發生失敗的時候執行
fullAddCount(x, uncontended);//2
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//當條件滿足開始擴容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {//如果小於0說明已經有執行緒在進行擴容操作了
//一下的情況說明已經有在擴容或者多執行緒進行了擴容,其他執行緒直接break不要進入擴容操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//如果相等說明擴容已經完成,可以繼續擴容
transfer(tab, nt);
}
//這個時候sizeCtl已經等於(rs << RESIZE_STAMP_SHIFT) + 2等於一個大的負數,這邊加上2很巧妙,因為transfer後面對sizeCtl--操作的時候,最多隻能減兩次就結束
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
看上面註釋1,每次都會對baseCount 加1,如果併發競爭太大,那麼可能導致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失敗,那麼為了提高高併發的時候baseCount可見性失敗的問題,又避免一直重試,這樣效能會有很大的影響,那麼在jdk8的時候是有引入一個類Striped64,其中LongAdder和DoubleAdder就是對這個類的實現。這兩個方法都是為解決高併發場景而生的,是AtomicLong的加強版,AtomicLong在高併發場景效能會比LongAdder差。但是LongAdder的空間複雜度會高點。
我們每次進來都對baseCount進行加1當達到一定的容量時,就需要對table進行擴容。擴容方法就是transfer,這個方法稍微複雜一點,大部分的程式碼我都做了註釋
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//構建一個連節點的指標,用於標識位
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
//迴圈的關鍵變數,判斷是否已經擴容完成,完成就return,退出迴圈
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//迴圈的關鍵i,i--操作保證了倒序遍歷陣列
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(預設16)
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//i<0說明已經遍歷完舊的陣列tab;i>=n什麼時候有可能呢?在下面看到i=n,所以目前i最大應該是n吧。
//i+n>=nextn,nextn=nextTab.length,所以如果滿足i+n>=nextn說明已經擴容完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {// a
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新這個擴容閾值,在這裡面sizectl值減一,說明新加入一個執行緒參與到擴容操作,參考sizeCtl的註釋
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果有多個執行緒進行擴容,那麼這個值在第二個執行緒以後就不會相等,因為sizeCtl已經被減1了,所以後面的執行緒就只能直接返回,始終保證只有一個執行緒執行了 a(上面註釋a)
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;//finishing和advance保證執行緒已經擴容完成了可以退出迴圈
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)//如果tab[i]為null,那麼就把fwd插入到tab[i],表明這個節點已經處理過了
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)//那麼如果f.hash=-1的話說明該節點為ForwardingNode,說明該節點已經處理過了
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//這邊還對連結串列進行遍歷,這邊的的演算法和hashmap的演算法又不一樣了,這班是有點對半拆分的感覺
//把連結串列分表拆分為,hash&n等於0和不等於0的,然後分別放在新表的i和i+n位置
//次方法同hashmap的resize
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//把已經替換的節點的舊tab的i的位置用fwd替換,fwd包含nextTab
setTabAt(tab, i, fwd);
advance = true;
}//下面紅黑樹基本和連結串列差不多
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//判斷擴容後是否還需要紅黑樹結構
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
值得細細品味的是,transfer的for迴圈是倒敘的,說明對table的遍歷是從table.length-1開始到0的。我覺得這段程式碼寫得太牛逼了,特別是
//利用CAS方法更新這個擴容閾值,在這裡面sizectl值減一,說明新加入一個執行緒參與到擴容操作,參考sizeCtl的註釋
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果有多個執行緒進行擴容,那麼這個值在第二個執行緒以後就不會相等,因為sizeCtl已經被減1了,所以後面的執行緒就只能直接返回,始終保證只有一個執行緒執行了 a(上面註釋a)
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;//finishing和advance保證執行緒已經擴容完成了可以退出迴圈
i = n; // recheck before commit
}
注意:如果連結串列結構中元素超過TREEIFY_THRESHOLD閾值,預設為8個,則把連結串列轉化為紅黑樹,提高遍歷查詢效率.接下來我們看看如何構造樹結構,程式碼如下:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
可以看出,生成樹節點的程式碼塊是同步的,進入同步程式碼塊之後,再次驗證table中index位置元素是否被修改過。
1、根據table中index位置Node連結串列,重新生成一個hd為頭結點的TreeNode連結串列。
2、根據hd頭結點,生成TreeBin樹結構,並把樹結構的root節點寫到table的index位置的記憶體中,具體實現如下:
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
get()方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//如果eh=-1就說明e節點為ForWordingNode,這說明什麼,說明這個節點已經不存在了,被另一個執行緒正則擴容
//所以要查詢key對應的值的話,直接到新newtable找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
這個get請求,我們需要cas來保證變數的原子性。如果tab[i]正被鎖住,那麼CAS就會失敗,失敗之後就會不斷的重試。這也保證了get在高併發情況下不會出錯。
我們來分析下到底有多少種情況會導致get在併發的情況下可能取不到值。1、一個執行緒在get的時候,另一個執行緒在對同一個key的node進行remove操作;2、一個執行緒在get的時候,另一個執行緒正則重排table。可能導致舊table取不到值。
那麼本質是,我在get的時候,有其他執行緒在對同一桶的連結串列或樹進行修改。那麼get是怎麼保證同步性的呢?我們看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是幹嘛的:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
它是對tab[i]進行原子性的讀取,因為我們知道putVal等對table的桶操作是有加鎖的,那麼一般情況下我們對桶的讀也是要加鎖的,但是我們這邊為什麼不需要加鎖呢?因為我們用了Unsafe的getObjectVolatile,因為table是volatile型別,所以對tab[i]的原子請求也是可見的。因為如果同步正確的情況下,根據happens-before原則,對volatile域的寫入操作happens-before於每一個後續對同一域的讀操作。所以不管其他執行緒對table連結串列或樹的修改,都對get讀取可見。
參考
相關推薦
jdk1.8中ConcurrentHashMap的實現原理
併發環境下為什麼使用ConcurrentHashMap 1. HashMap在高併發的環境下,執行put操作會導致HashMap的Entry連結串列形成環形資料結構,從而導致Entry的next節點始終不為空,因此產生死迴圈獲取Entry 2. HashTa
JDK1.8中HashMap實現
替換 應該 初始化 第一個元素 擴容 實現 1.8 put 相同 JDK1.8中的HashMap實現跟JDK1.7中的實現有很大差別。下面分析JDK1.8中的實現,主要看put和get方法。 構造方法的時候並沒有初始化,而是在第一次put的時候初始化 put
1.jdk1.8中hashMap的原理,hash衝突如何解決
一:hashMap的工作原理 HashMap是基於鏈地址法的原理,使用put(key, value)儲存物件到HashMap中,使用get(key)從HashMap中獲取物件。 當我們給put()方法傳遞鍵和值時,我們先對鍵呼叫hashCode
HashMap 在JDK1.8中的實現
摘要HashMap是Java程式設計師使用頻率最高的用於對映(鍵值對)處理的資料型別。隨著JDK(Java Developmet Kit)版本的更新,JDK1.8對HashMap底層的實現進行了優化,例如引入紅黑樹的資料結構和擴容的優化等。本文結合JDK1.7和JDK1.8的區別,深入探討HashMap的結構
ConcurrentHashMap JDK1.8中結構原理及原始碼分析
注:本文根據網路和部分書籍整理基於JDK1.7書寫,如有雷同敬請諒解 歡迎指正文中的錯誤之處。 資料結構 ConcurrentHashMap 1.8 拋棄了Segment分段鎖機制,採用Node + CAS + Synchronized來保證併發安全進行實現
JDK1.8中ArrayList的實現原理及原始碼分析
一、概述 ArrayList是Java開發中使用比較頻繁的一個類,通過對原始碼的解讀,可以瞭解ArrayList的內部結構以及實現方法,清楚它的優缺點,以便我們在程式設計時靈活運用。 二、原始碼分析 2.1 類結構 JDK1.8原始碼中的A
HashMap在jdk1.7和1.8中的實現
Java集合類的原始碼是深入學習Java非常好的素材,原始碼裡很多優雅的寫法和思路,會讓人歎為觀止。HashMap的原始碼尤為經典,是非常值得去深入研究的,jdk1.8中HashMap發生了比較大的變化,這方面的東西也是各個公司高頻的考點。網上也有很多應對面試的標準答案,我之前也寫過類似的面
【必備技能】HashMap在jdk1.7和1.8中的實現
static final int TREEIFY_THRESHOLD = 8; public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } final V putVal(i
[技術分享]-ConcurrentHashMap在jdk1.8中的改進
一、簡單回顧ConcurrentHashMap在jdk1.7中的設計與Hashtable不同的是,ConcurrentHashMap使用的是分段鎖技術,將ConcurrentHashMap容器的資料分段儲存,每一段資料分配一個Segment,當執行緒佔用一個Segment時,
jdk1.6 1.7 1.8 LinkedList原始碼實現原理及區別
LinkedList(jdk1.6) private transient Entry<E> header = new Entry<E>(null, null, null); 定義一個空的Entry物件作為頭結點,Entry是其內部
Java併發程式設計總結4——ConcurrentHashMap在jdk1.8中的改進
一、簡單回顧ConcurrentHashMap在jdk1.7中的設計 先簡單看下ConcurrentHashMap類在jdk1.7中的設計,其基本結構如圖所示: 每一個segment都是一個HashEntry<K,V>[] table, table中的每一個元素本質上都是一個Has
JDK1.8 中的hashmap和concurrentHashMap
hashmap 在JDK1.6中,HashMap採用Node陣列+連結串列實現,即使用連結串列處理衝突,同一hash值的連結串列都儲存在一個連結串列裡。但是當位於一個桶中的元素較多,即hash值相等的元素較多時,通過key值依次查詢的效率較低。而JDK1.8中
C#中foreach實現原理
示例 元素 res 過程 false 編程語言 static posit this 本文主要記錄我在學習C#中foreach遍歷原理的心得體會。 對集合中的要素進行遍歷是所有編碼中經常涉及到的操作,因此大部分編程語言都把此過程寫進了語法中,比如C#中的foreach。經
ConcurrentHashMap實現原理
過時 initial 初始化 bin 重新 hashcode his 就是 cati ConcurrentHashMap采用了分段加鎖的方式看看get操作hashTable和ConcurrenHashMap的區別 public synchronized V get(Ob
Java8 中 ConcurrentHashMap工作原理的要點分析
tail dtree outer initial 而不是 ubd rule 設定 tree 簡介: 本文主要介紹Java8中的並發容器ConcurrentHashMap的工作原理,和其它文章不同的是,本文重點分析了不同線程的各類並發操作如get,put,remove之間是如
jdk1.8中接口可以寫默認方法
wheel void JD PE 靜態 調用 默認 sta default interface Vehicle { default void print(){ System.out.println("我是一輛車!"); } stat
ConcurrentHashMap實現原理以及源碼分析
賦值 already 設計 [] 取數 ole vat 復制 變化 ConcurrentHashMap是HashMap的高並發版本,是線程安全的,而HashMap是非線程安全的 一、底層實現 底層結構跟hashmap一樣,都是通過數組+鏈表+紅黑樹實現的,不過它要保證線程
ConcurrentHashMap實現原理以及原始碼解析
ConcurrentHashMap實現原理以及原始碼解析 ConcurrentHashMap是Java1.5中引用的一個執行緒安全的支援高併發的HashMap集合類。 1、執行緒不安全的HashMap 因為多執行緒環境下,使用Hashmap進行put操作會引起死迴圈
Redux 中 combineReducers實現原理
使用 product patch 分支結構 實現原理 復合 reducer 實現 判斷 使用一個reducer const initialState = { id : 2, name : ‘myName
JDK1.8中TreeMap原始碼解析——紅黑樹刪除
在看本文之前建議先看一下二叉樹的刪除過程,這裡有一篇文章寫得不錯,可以看一下 1、後繼節點 在看原始碼之前,先說說紅黑樹尋找 待刪除節點t 的 後繼節點 的過程: 如果待刪除節點t有右節點,那麼後繼節點為該節點右子樹中最左的節點,也就是右子樹中值最小的節