1. 程式人生 > 實用技巧 >原始碼淺入淺出 Java ConcurrentHashMap

原始碼淺入淺出 Java ConcurrentHashMap

從原始碼的角度深入地分析了 ConcurrentHashMap 這個執行緒安全的 HashMap,希望能夠給你一些幫助。

老讀者就請肆無忌憚地點贊吧,微信搜尋【沉默王二】關注這個在九朝古都洛陽苟且偷生的程式設計師。
本文 GitHub github.com/itwanger 已收錄,裡面還有我精心為你準備的一線大廠面試題。

HashMap 是 Java 中非常強大的資料結構,使用頻率非常高,幾乎所有的應用程式都會用到它。但 HashMap 不是執行緒安全的,不能在多執行緒環境下使用,該怎麼辦呢?

1)Hashtable,一個老掉牙的同步雜湊表,t 竟然還是小寫的,一看就非常不專業:

public
classHashtable<K,V>
extendsDictionary<K,V>
implementsMap<K,V>,Cloneable,java.io.Serializable
{
publicsynchronizedVput(Kkey,Vvalue){}
publicsynchronizedintsize(){}
publicsynchronizedVget(Objectkey){}
}

裡面的方法全部是 synchronized,同步的力度非常大,對不對?這樣的話,效能就沒法保證了。pass。

2)Collections.synchronizedMap(new HashMap<String, String>())

,可以把一個 HashMap 包裝成同步的 SynchronizedMap:

privatestaticclassSynchronizedMap<K,V>
implementsMap<K,V>,Serializable
{
publicintsize(){
synchronized(mutex){returnm.size();}
}
publicVget(Objectkey){
synchronized(mutex){returnm.get(key);}
}
publicVput(Kkey,Vvalue){
synchronized(mutex){returnm.put(key,value);}
}
}

可以看得出,SynchronizedMap 確實比 Hashtable 改進了,synchronized 不再放在方法上,而是放在方法內部,作為同步塊出現,但仍然是物件級別的同步鎖,讀和寫操作都需要獲取鎖,本質上,仍然只允許一個執行緒訪問,其他執行緒被排斥在外。

3)ConcurrentHashMap,本篇的主角,唯一正確的答案。Concurrent 這個單詞就是併發、並行的意思,所以 ConcurrentHashMap 就是一個可以在多執行緒環境下使用的 HashMap。

ConcurrentHashMap 一直在進化,Java 7 和 Java 8 就有很大的不同。Java 7 版本的 ConcurrentHashMap 是基於分段鎖的,就是將內部分成不同的 Segment(段),每個段裡面是 HashEntry 陣列。

來看一下 Segment:

staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{
transientvolatileHashEntry<K,V>[]table;
transientintcount;
transientintmodCount;
transientintthreshold;
finalfloatloadFactor;
}

再來看一下 HashEntry:

staticfinalclassHashEntry<K,V>{
finalKkey;//宣告key為final型
finalinthash;//宣告hash值為final型
volatileVvalue;//宣告value為volatile型
finalHashEntry<K,V>next;//宣告next為final型

HashEntry(Kkey,inthash,HashEntry<K,V>next,Vvalue){
this.key=key;
this.hash=hash;
this.next=next;
this.value=value;
}
}

和 HashMap 非常相似,唯一的區別就是 value 是 volatile 的,保證 get 時候的可見性。

Segment 繼承自 ReentrantLock,所以不會像 Hashtable 那樣不管是 put 還是 get 都需要 synchronized,鎖的力度變小了,每個執行緒只鎖一個 Segment,對其他執行緒訪問的 Segment 沒有影響。

Java 8 和之後的版本在此基礎上做了很大的改進,不再採用分段鎖的機制了,而是利用 CAS(Compare and Swap,即比較並替換,實現併發演算法時常用到的一種技術)和 synchronized 來保證併發,雖然內部仍然定義了 Segment,但僅僅是為了保證序列化時的相容性,程式碼註釋上就可以看得出來:

/**
*Stripped-downversionofhelperclassusedinpreviousversion,
*declaredforthesakeofserializationcompatibility.
*/

staticclassSegment<K,V>extendsReentrantLockimplementsSerializable{
finalfloatloadFactor;
Segment(floatlf){this.loadFactor=lf;}
}

底層結構和 Java 7 也有所不同,更接近 HashMap(陣列+雙向連結串列+紅黑樹):

來看一下新版 ConcurrentHashMap 定義的關鍵欄位:

publicclassConcurrentHashMap<K,V>extendsAbstractMap<K,V>
implementsConcurrentMap<K,V>,Serializable
{
transientvolatileNode<K,V>[]table;
privatetransientvolatileNode<K,V>[]nextTable;
privatetransientvolatileintsizeCtl;
}

1)table,預設為 null,第一次 put 的時候初始化,預設大小為 16,用來儲存 Node 節點,擴容時大小總是 2 的冪次方。

順帶看一下 Node 的定義:

staticclassNode<K,V>implementsMap.Entry<K,V>{
finalinthash;
finalKkey;
volatileVval;
volatileNode<K,V>next;
//…
}

hash 和 key 是 final 的,和 HashMap 的 Node 一樣,因為 key 是不會發生變化的。val 和 next 是 volatile 的,保證多執行緒環境下的可見性。

2)nextTable,預設為 null,擴容時新生成的陣列,大小為原陣列的兩倍。

3)sizeCtl,預設為 0,用來控制 table 的初始化和擴容操作。-1 表示 table 正在初始化;-(1+執行緒數) 表示正在被多個執行緒擴容。

Map 最重要的方法就是 put,ConcurrentHashMap 也不例外:

publicVput(Kkey,Vvalue){
returnputVal(key,value,false);
}

finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){
if(key==null||value==null)thrownewNullPointerException();
inthash=spread(key.hashCode());
intbinCount=0;
for(Node<K,V>[]tab=table;;){
Node<K,V>f;intn,i,fh;
if(tab==null||(n=tab.length)==0)
tab=initTable();
elseif((f=tabAt(tab,i=(n-1)&hash))==null){
if(casTabAt(tab,i,null,newNode<K,V>(hash,key,value,null)))
break;//nolockwhenaddingtoemptybin
}
elseif((fh=f.hash)==MOVED)
tab=helpTransfer(tab,f);
...省略部分程式碼
}
addCount(1L,binCount);
returnnull;
}

1)spread() 是一個雜湊演算法,和 HashMap 的 hash() 方法類似:

staticfinalintspread(inth){
return(h^(h>>>16))&HASH_BITS;
}

2)如果是第一次 put 的話,會呼叫 initTable() 對 table 進行初始化。

privatefinalConcurrentHashMap.Node<K,V>[]initTable(){
ConcurrentHashMap.Node<K,V>[]tab;intsc;
while((tab=table)==null||tab.length==0){
if((sc=sizeCtl)<0)
Thread.yield();//lostinitializationrace;justspin
elseif(U.compareAndSetInt(this,SIZECTL,sc,-1)){
try{
if((tab=table)==null||tab.length==0){
intn=(sc>0)?sc:DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
ConcurrentHashMap.Node<K,V>[]nt=(ConcurrentHashMap.Node<K,V>[])newConcurrentHashMap.Node<?,?>[n];
table=tab=nt;
sc=n-(n>>>2);
}
}finally{
sizeCtl=sc;
}
break;
}
}
returntab;
}

外層用了一個 while 迴圈,如果發現 sizeCtl 小於 0 的話,就意味著其他執行緒正在初始化,yield 讓出 CPU。

第一次 put 的時候會執行 U.compareAndSetInt(this, SIZECTL, sc, -1),把 sizeCtl 賦值為 -1,表示當前執行緒正在初始化。

privatestaticfinalUnsafeU=Unsafe.getUnsafe();
privatestaticfinallongSIZECTL
=U.objectFieldOffset(ConcurrentHashMap.class,"sizeCtl");

U 是一個 Unsafe(可以提供硬體級別的原子操作,可以獲取某個屬性在記憶體中的位置,也可以修改物件的欄位值)物件,compareAndSetInt() 是 Unsafe 的一個本地(native)方法,它就負責把 ConcurrentHashMap 的 sizeCtl 修改為指定的值(-1)。

初始化後的 table 大小為 16(DEFAULT_CAPACITY)。

不是第一次 put 的話,會呼叫 tabAt() 取出 key 位置((n - 1) & hash)上的值(f):

staticfinal<K,V>ConcurrentHashMap.Node<K,V>tabAt(ConcurrentHashMap.Node<K,V>[]tab,inti){
return(ConcurrentHashMap.Node<K,V>)U.getReferenceAcquire(tab,((long)i<<ASHIFT)+ABASE);
}

U.getReferenceAcquire() 會呼叫 Unsafe 的 本地方法 getReferenceVolatile() 獲取指定記憶體中的資料,保證每次拿到的資料都是最新的。

如果 f 為 null,說明 table 中這個位置上是第一次 put 元素,呼叫 casTabAt() 插入 Node。

staticfinal<K,V>booleancasTabAt(ConcurrentHashMap.Node<K,V>[]tab,inti,
ConcurrentHashMap.Node<K,V>c,ConcurrentHashMap.Node<K,V>v)
{
returnU.compareAndSetReference(tab,((long)i<<ASHIFT)+ABASE,c,v);
}

如果 CAS 成功,說明 Node 插入成功,執行 addCount() 方法檢查是否需要擴容。

如果失敗,說明有其他執行緒提前插入了 Node,進行下一輪 for 迴圈繼續嘗試,俗稱自旋。

如果 f 的 hash 為 MOVED(-1),意味著有其他執行緒正在擴容,執行 helpTransfer() 一起擴容。

否則,把 Node 按連結串列或者紅黑樹的方式插入到合適的位置,這個過程是通過 synchronized 塊實現的。

synchronized(f){
if(tabAt(tab,i)==f){
if(fh>=0){
binCount=1;
for(Node<K,V>e=f;;++binCount){
Kek;
if(e.hash==hash&&
((ek=e.key)==key||
(ek!=null&&key.equals(ek)))){
oldVal=e.val;
if(!onlyIfAbsent)
e.val=value;
break;
}
Node<K,V>pred=e;
if((e=e.next)==null){
pred.next=newNode<K,V>(hash,key,
value,null);
break;
}
}
}
elseif(finstanceofTreeBin){
Node<K,V>p;
binCount=2;
if((p=((TreeBin<K,V>)f).putTreeVal(hash,key,
value))!=null){
oldVal=p.val;
if(!onlyIfAbsent)
p.val=value;
}
}
}
}

1)插入之前,再次呼叫 tabAt(tab, i) == f 來判斷 f 是否被其他執行緒修改。

2)如果 fh(f 的雜湊值) >= 0,說明 f 是連結串列的頭節點,遍歷連結串列,找到對應的 Node,更新值,否則插入到末尾。

3)如果 f 是紅黑樹,則按照紅黑樹的方式插入或者更新節點。

分析完 put() 方法後,再來看 get() 方法:

publicVget(Objectkey){
ConcurrentHashMap.Node<K,V>[]tab;ConcurrentHashMap.Node<K,V>e,p;intn,eh;Kek;
inth=spread(key.hashCode());
if((tab=table)!=null&&(n=tab.length)>0&&
(e=tabAt(tab,(n-1)&h))!=null){
if((eh=e.hash)==h){
if((ek=e.key)==key||(ek!=null&&key.equals(ek)))
returne.val;
}
elseif(eh<0)
return(p=e.find(h,key))!=null?p.val:null;
while((e=e.next)!=null){
if(e.hash==h&&
((ek=e.key)==key||(ek!=null&&key.equals(ek))))
returne.val;
}
}
returnnull;
}

是不是簡單很多?

1)如果雜湊值相等((eh = e.hash) == h),直接返回 table 陣列中的元素。

2)如果是紅黑樹(eh < 0),按照紅黑樹的方式 find 返回。

3)如果是連結串列,進行遍歷,然後根據 key 獲取 value。

最後,來寫一個 ConcurrentHashMap 的應用例項吧!

/**
*@author沉默王二,一枚有趣的程式設計師
*/

publicclassConcurrentHashMapDemo{
publicfinalstaticintTHREAD_POOL_SIZE=5;

publicstaticvoidmain(String[]args)throwsInterruptedException{
Map<String,String>map=newConcurrentHashMap<>();

longstartTime=System.nanoTime();
ExecutorServicecrunchifyExServer=Executors.newFixedThreadPool(THREAD_POOL_SIZE);
for(intj=0;j<THREAD_POOL_SIZE;j++){
crunchifyExServer.execute(newRunnable(){
@SuppressWarnings("unused")
@Override
publicvoidrun(){
for(inti=0;i<500000;i++){
map.put("itwanger"+i,"沉默王二");
}
}
});
}

crunchifyExServer.shutdown();
crunchifyExServer.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);

longentTime=System.nanoTime();
longtotalTime=(entTime-startTime)/1000000L;
System.out.println(totalTime+"ms");
}
}

給同學們留一道作業題,感興趣的話可以嘗試下,把 ConcurrentHashMap 換成 SynchronizedMap,比較一下兩者效能上的差異,差距還是挺明顯的。


我是沉默王二,一枚在九朝古都洛陽苟且偷生的程式設計師。關注即可提升學習效率,感謝你的三連支援,奧利給