LockFreeHashMap:無阻塞程式碼技巧
10年前,cliff click博士就為關聯資料結構ConcurrentHashMap給出了一個採用open Address的無阻塞實現(NonBlockingHashMap)。其中為了減少執行緒之間執行順序的依賴而採用的演算法充滿技巧性。這個演算法宣稱是無鎖,幾乎可以保證任何時候停止某個特定執行緒都不會導致整體程序的停止(極端情況下,這一點還是會阻塞整個程序的)。
本文嘗試詳細地分析其中的完整程式碼,從而收穫其中的無阻塞程式設計技術:
首先我們來看K/V資料結構:
private transient Object[] _kvs; private static final CHM chm (Object[] kvs) { return (CHM )kvs[0]; } private static final int[] hashes(Object[] kvs) { return (int[])kvs[1]; } private static final boolean CAS_key( Object[] kvs, int idx, Object old, Object key ) { return _unsafe.compareAndSwapObject( kvs, rawIndex(kvs,(idx<<1)+2), old, key ); } private static final boolean CAS_val( Object[] kvs, int idx, Object old, Object val ) { return _unsafe.compareAndSwapObject( kvs, rawIndex(kvs,(idx<<1)+3), old, val ); } private static long rawIndex(final Object[] ary, final int idx) { assert idx >= 0 && idx < ary.length; return _Obase + idx * _Oscale; }
可以看出,NonBlockingHashMap,採用了一個大的Object陣列_kvs,將其中的第一、第二個元素預設採用用於CHM和hash碼的數值組,從第2開始的元素用於真正的資料儲存,也就是說,_kvs中它的資料儲存形式為:(2:key1, 3:val1, 4:key2, 5:key2…..)。為了提高併發性,它不再像JUC中的ConcurrentHashMap一樣將(key,value),作為一個元素整體來儲存了。儲存形式如下圖所示:
Figure 0:
為了方便顯示,我們這個檢視的資料結構是隻能儲存8個元素對(key,value),而預設建立的NonBlockingHashMap能夠儲存32個元素,初始化程式碼如下:
/** Create a new NonBlockingHashMap with default minimum size (currently set * to 8 K/V pairs or roughly 84 bytes on a standard 32-bit JVM). */ public NonBlockingHashMap( ) { this(MIN_SIZE); } /** Create a new NonBlockingHashMap with initial room for the given number of * elements, thus avoiding internal resizing operations to reach an * appropriate size. Large numbers here when used with a small count of * elements will sacrifice space for a small amount of time gained. The * initial size will be rounded up internally to the next larger power of 2. */ public NonBlockingHashMap( final int initial_sz ) { initialize(initial_sz); } private final void initialize( int initial_sz ) { if( initial_sz < 0 ) throw new IllegalArgumentException(); int i; // Convert to next largest power-of-2 if( initial_sz > 1024*1024 ) initial_sz = 1024*1024; for( i=MIN_SIZE_LOG; (1<<i) < (initial_sz<<2); i++ ) ; // Double size for K,V pairs, add 1 for CHM and 1 for hashes _kvs = new Object[((1<<i)<<1)+2]; _kvs[0] = new CHM(new Counter()); // CHM in slot 0 _kvs[1] = new int[1<<i]; // Matching hash entries _last_resize_milli = System.currentTimeMillis(); }
好的,瞭解了初始化之後,我們接著看看呼叫的主要介面put、putIfAbsent、replace、remove。
public TypeV put ( TypeK key, TypeV val ) { return putIfMatch( key, val, NO_MATCH_OLD); } public TypeV putIfAbsent( TypeK key, TypeV val ) { return putIfMatch( key, val, TOMBSTONE ); } public TypeV replace ( TypeK key, TypeV val ) { return putIfMatch( key, val,MATCH_ANY ); } public TypeV remove ( Object key ) { return putIfMatch( key,TOMBSTONE, NO_MATCH_OLD); }
putIfAbsent:當key對應的val不存在時插入。
replace:當key對應的val存在時更新那個val。
remove:刪除存在的(key, val)。
所以我們可以看到第三個數,NO_MATCH_OLD代表直接的put。TOMBSTONE代表對應的(key,val)不存在才插入。MATCH_ANY代表對應的(key,val)存在才插入。
我們接著看它對應的呼叫方法:
private final TypeV putIfMatch( Object key, Object newVal, Object oldVal ) { if (oldVal == null || newVal == null) throw new NullPointerException(); final Object res = putIfMatch( this, _kvs, key, newVal, oldVal ); assert !(res instanceof Prime); assert res != null; return res == TOMBSTONE ? null : (TypeV)res; }
可以看到返回值res,如果是TOMBSTONE則作為null來返回。我們接著看5個引數的putIfMatch:
// --- putIfMatch --------------------------------------------------------- // Put, Remove, PutIfAbsent, etc. Return the old value. If the returned // value is equal to expVal (or expVal is NO_MATCH_OLD) then the put can be // assumed to work (although might have been immediately overwritten). Only // the path through copy_slot passes in an expected value of null, and // putIfMatch only returns a null if passed in an expected null. private static final Object putIfMatch( final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final Object putval, final Object expVal ) { assert putval != null; assert !(putval instanceof Prime); assert !(expVal instanceof Prime); final int fullhash = hash (key); // throws NullPointerException if key null final int len = len (kvs); // Count of key/value pairs, reads kvs.length final CHM chm = chm (kvs); // Reads kvs[0] final int[] hashes = hashes(kvs); // Reads kvs[1], read before kvs[0] int idx = fullhash & (len-1); // --- // Key-Claim stanza: spin till we can claim a Key (or force a resizing). int reprobe_cnt=0; Object K=null, V=null; Object[] newkvs=null; while( true ) { // Spin till we get a Key slot V = val(kvs,idx); // Get old value (before volatile read below!) K = key(kvs,idx); // Get current key if( K == null ) { // Slot is free? // Found an empty Key slot - which means this Key has never been in // this table. No need to put a Tombstone - the Key is not here! if( putval == TOMBSTONE ) return putval; // Not-now & never-been in this table // Claim the null key-slot if( CAS_key(kvs,idx, null, key ) ) { // Claim slot for Key chm._slots.add(1); // Raise key-slots-used count hashes[idx] = fullhash; // Memoize fullhash break; // Got it! } // CAS to claim the key-slot failed. // // This re-read of the Key points out an annoying short-coming of Java // CAS. Most hardware CAS's report back the existing value - so that // if you fail you have a *witness* - the value which caused the CAS // to fail. The Java API turns this into a boolean destroying the // witness. Re-reading does not recover the witness because another // thread can write over the memory after the CAS. Hence we can be in // the unfortunate situation of having a CAS fail *for cause* but // having that cause removed by a later store. This turns a // non-spurious-failure CAS (such as Azul has) into one that can // apparently spuriously fail - and we avoid apparent spurious failure // by not allowing Keys to ever change. K = key(kvs,idx); // CAS failed, get updated value assert K != null; // If keys[idx] is null, CAS shoulda worked } // Key slot was not null, there exists a Key here // We need a volatile-read here to preserve happens-before semantics on // newly inserted Keys. If the Key body was written just before inserting // into the table a Key-compare here might read the uninitalized Key body. // Annoyingly this means we have to volatile-read before EACH key compare. newkvs = chm._newkvs; // VOLATILE READ before key compare if( keyeq(K,key,hashes,idx,fullhash) ) break; // Got it! // get and put must have the same key lookup logic! Lest 'get' give // up looking too soon. //topmap._reprobes.add(1); if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or key == TOMBSTONE ) { // found a TOMBSTONE key, means no more keys // We simply must have a new table to do a 'put'. At this point a // 'get' will also go to the new table (if any). We do not need // to claim a key slot (indeed, we cannot find a free one to claim!). newkvs = chm.resize(topmap,kvs); if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy return putIfMatch(topmap,newkvs,key,putval,expVal); } idx = (idx+1)&(len-1); // Reprobe! } // End of spinning till we get a Key slot // --- // Found the proper Key slot, now update the matching Value slot. We // never put a null, so Value slots monotonically move from null to // not-null (deleted Values use Tombstone). Thus if 'V' is null we // fail this fast cutout and fall into the check for table-full. if( putval == V ) return V; // Fast cutout for no-change // See if we want to move to a new table (to avoid high average re-probe // counts). We only check on the initial set of a Value from null to // not-null (i.e., once per key-insert). Of course we got a 'free' check // of newkvs once per key-compare (not really free, but paid-for by the // time we get here). if( newkvs == null && // New table-copy already spotted? // Once per fresh key-insert check the hard way ((V == null && chm.tableFull(reprobe_cnt,len)) || // Or we found a Prime, but the JMM allowed reordering such that we // did not spot the new table (very rare race here: the writing // thread did a CAS of _newkvs then a store of a Prime. This thread // reads the Prime, then reads _newkvs - but the read of Prime was so // delayed (or the read of _newkvs was so accelerated) that they // swapped and we still read a null _newkvs. The resize call below // will do a CAS on _newkvs forcing the read. V instanceof Prime) ) newkvs = chm.resize(topmap,kvs); // Force the new table copy to start // See if we are moving to a new table. // If so, copy our slot and retry in the new table. if( newkvs != null ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); // --- // We are finally prepared to update the existing table while( true ) { assert !(V instanceof Prime); // Must match old, and we do not? Then bail out now. Note that either V // or expVal might be TOMBSTONE. Also V can be null, if we've never // inserted a value before. expVal can be null if we are called from // copy_slot. if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? V != expVal && // No instant match already? (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last return V; // Do not update! // Actually change the Value in the Key,Value pair if( CAS_val(kvs, idx, V, putval ) ) { // CAS succeeded - we did the update! // Both normal put's and table-copy calls putIfMatch, but table-copy // does not (effectively) increase the number of live k/v pairs. if( expVal != null ) { // Adjust sizes - a striped counter if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); } return (V==null && expVal!=null) ? TOMBSTONE : V; } // Else CAS failed V = val(kvs,idx); // Get new value // If a Prime'd value got installed, we need to re-run the put on the // new table. Otherwise we lost the CAS to another racing put. // Simply retry from the start. if( V instanceof Prime ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); } }
這個方法是我們put的主邏輯。
分成三個部分來看,無論被哪個介面呼叫時候,第一步是根據key來定位,如果不存在就直接返回,或者用CAS建立,並且進入下一步。或者發現key已經存在,那麼我們也直接進入下一步。將多餘的註釋刪除後得到如下程式碼:
while( true ) { // Spin till we get a Key slot V = val(kvs,idx); // Get old value (before volatile read below!) K = key(kvs,idx); // Get current key if( K == null ) { // Slot is free? if( putval == TOMBSTONE ) return putval; // Not-now & never-been in this table if( CAS_key(kvs,idx, null, key ) ) { // Claim slot for Key chm._slots.add(1); // Raise key-slots-used count hashes[idx] = fullhash; // Memoize fullhash break; // Got it! } K = key(kvs,idx); // CAS failed, get updated value assert K != null; // If keys[idx] is null, CAS shoulda worked } newkvs = chm._newkvs; // VOLATILE READ before key compare if( keyeq(K,key,hashes,idx,fullhash) ) break; // Got it! if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or key == TOMBSTONE ) { // found a TOMBSTONE key, means no more keys newkvs = chm.resize(topmap,kvs); if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy return putIfMatch(topmap,newkvs,key,putval,expVal); } idx = (idx+1)&(len-1); // Reprobe! }
2-3行,根據前面hash碼計算出的idx來定位物件組_kvs中的K和V。由於是採用openAddress的定址方法,所以很可能這裡要多試幾遍。
4-13行,處理當發現K==null的情況,這種情況下假如呼叫的是remove方法,可以直接結束。否則就cas將我們的key值填入對應的下標。如果成功則增加chm一個slots計數。並且將計算出的fullhash填入hashes以備後用。然後直接跳出迴圈,結束查詢對應key的邏輯。
15-18行,假如cas操作失敗,或則K!=null,那麼我們就得對比K與我們嘗試的key的值(傳入的hashes顯然是之前存入,用於此時的對比),這裡的第15行比較特別:newkvs = chm._newkvs; 因為_newkvs是volatile變數,chm對應的_kvs[0]則是執行緒開始前就初始化成功的變數,所以先讀_newkvs的這個volatile read的語義能夠確保接下來,K讀到的資料是初始化完全的,從而能夠參與equals對比。這裡說明不屬於volatile變數的_kvs物件組中的元素,似乎只要是通過CAS操作來更新值,那麼更新後的值必定能夠被其他執行緒看到(我認為不一定,這一點存疑,可見 記憶體可見性 )。這裡要求不存在依然讀到舊值的情況(其實不一定能夠保證),但是可能存在讀到不完全的物件的可能。但是假如這個物件是volatile變數,或者讀取時候採用getXXXVolatile或則在普通讀取之前先讀取某個volatile變數,那麼就能確保讀取到更新後的完整資料。假如對比的結果是true,說明找對了K,則跳出這個迴圈。
20-26行,到了這裡說明K不對,我們就要繼續找對的。首先增加一個reprobe_cnt 用於統計失敗次數。如果失敗的次數達到一定的程度(map總容量的1/4+10)或者key==TOMBSTONE(雖然我沒發現這種情況),則擴容(後面說擴容)。並且在新建立的kvs上插入資料。
28行,向右移動一個節點查再次查詢。
第二部分:
程式碼如下:
if( putval == V ) return V; // Fast cutout for no-change if( newkvs == null && // New table-copy already spotted? // Once per fresh key-insert check the hard way ((V == null && chm.tableFull(reprobe_cnt,len)) || V instanceof Prime) ) newkvs = chm.resize(topmap,kvs); // Force the new table copy to start if( newkvs != null ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal);
1行,假如put進來的val與原來的值相同,則不需要做工作直接返回。
3-7行,假如我們需要增加value而現在map容量過小,那我們需要擴容,或則已經開始擴容。那麼我們通過第八行得到新的kvs。
10-11行,如果返現舊值新的kvs已經構造了,我們嘗試先將舊值複製到新的kvs上(此過程可能需要協助複製舊kvs的一部分資料),然後接著將當前值put進新的kvs。
第三部分程式碼:
// We are finally prepared to update the existing table while( true ) { assert !(V instanceof Prime); if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? V != expVal && // No instant match already? (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last return V; // Do not update! if( CAS_val(kvs, idx, V, putval ) ) { if( expVal != null ) { // Adjust sizes - a striped counter if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); } return (V==null && expVal!=null) ? TOMBSTONE : V; } V = val(kvs,idx); // Get new value if( V instanceof Prime ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); }
5-10行,這裡的程式碼有點亂,意思是分別對expVal為 NO_MATCH_OLD TOMBSTONE MATCH_ANY 其他值,這四種情況進行區分。它可以改寫為如下清晰的程式碼:
if ( expVal != NO_MATCH_OLD && ( (( expVal == TOMBSTONE) && (V != null && V != TOMBSTONE)) || (( expVal == MATCH_ANY) && (V == null || V == TOMBSTONE)) || ( expVal != V && ( expVal == null || !expVal.equals(V))) )) return V;
所以這裡排除了我們不需要update的情況。
接著看12-20行,CAS_val真正的來更新所對應idx上value的值。假如失敗21-23行會取新的V值,然後檢視是否可以重試,或者需要往新的kvs裡面插入(擴容情況)。
假如成功,那麼說明我們的key/value成功插入。
那麼14-18行
if( expVal != null ) { // Adjust sizes - a striped counter if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); } return (V==null && expVal!=null) ? TOMBSTONE : V;
我們當expVal != null說明是正常的外部呼叫,只有內部複製時候expVal才會==null。接著,putval值用來區分remove操作和非remove操作,putVal==TOMBSTONE說明當前是remove操作,那麼假如之前V存在,那麼我們map的_size會減一。假如putVal!=TOMBSTONE,那麼說明當前操作不是remove,肯能是put、putIfAbsent等操作。那麼假如V之前不存在,則_size需要加一。
到此,我們的put類操作就結束了。
我們接著來看看get操作,由於程式碼不多,所以直接貼出:
// Never returns a Prime nor a Tombstone. @Override public TypeV get( Object key ) { final int fullhash= hash (key); // throws NullPointerException if key is null final Object V = get_impl(this,_kvs,key,fullhash); assert !(V instanceof Prime); // Never return a Prime return (TypeV)V; } private static final Object get_impl( final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final int fullhash ) { final int len = len (kvs); // Count of key/value pairs, reads kvs.length final CHM chm = chm (kvs); // The CHM, for a volatile read below; reads slot 0 of kvs final int[] hashes=hashes(kvs); // The memoized hashes; reads slot 1 of kvs int idx = fullhash & (len-1); // First key hash int reprobe_cnt=0; while( true ) { // Probe table. Each read of 'val' probably misses in cache in a big // table; hopefully the read of 'key' then hits in cache. final Object K = key(kvs,idx); // Get key before volatile read, could be null final Object V = val(kvs,idx); // Get value before volatile read, could be null or Tombstone or Prime if( K == null ) return null; // A clear miss final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare if( keyeq(K,key,hashes,idx,fullhash) ) { // Key hit! Check for no table-copy-in-progress if( !(V instanceof Prime) ) // No copy? return (V == TOMBSTONE) ? null : V; // Return the value return get_impl(topmap,chm.copy_slot_and_check(topmap,kvs,idx,key),key,fullhash); // Retry in the new table } if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes key == TOMBSTONE ) // found a TOMBSTONE key, means no more keys in this table return newkvs == null ? null : get_impl(topmap,topmap.help_copy(newkvs),key,fullhash); // Retry in the new table idx = (idx+1)&(len-1); // Reprobe by 1! (could now prefetch) } }
2-8行與put操作那邊類似
12-24行,假如key==null則說明不存在,直接返回null。否則,通過讀取chm._newkvs來得到更新後的key。(這裡的volatile read語義與put時的compareAndSwapObject能讓我們讀取到更新後的值。)接著則是對比key,然後相同的情況下判斷V的值,假如找到我們需要的值則返回,否則返回空或者在下一層的kvs裡面協助複製並且繼續查詢。
26-30行,假如key值不一樣,那麼我們增加記錄一次探測次數。或則查詢下一個。或則直接去下一層查詢。(注意)這裡的重點是,在統一層kvs裡面,get只需要遍歷reprobe_limit(len)個數的key,假如之後的newkvs為空則說明不存在,所以返回null。否則繼續去下一層查詢。這種方式與put時候的策略配合,雖然增加了記憶體消耗了,但是節省了get需要的時間。
接著我們來看看重要的擴容。
首先看張圖片,大概說明了資料結構map的下一層資料的組織結構。
Figure 1:
以上是一個直觀的展示。我們來看看看 程式碼的入口:
if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or key == TOMBSTONE ) { newkvs = chm.resize(topmap,kvs); if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy return putIfMatch(topmap,newkvs,key,putval,expVal); } if( newkvs == null && // New table-copy already spotted? // Once per fresh key-insert check the hard way ((V == null && chm.tableFull(reprobe_cnt,len)) || V instanceof Prime) ) newkvs = chm.resize(topmap,kvs); // Force the new table copy to start // See if we are moving to a new table. // If so, copy our slot and retry in the new table. if( newkvs != null ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal);
所以擴容僅在兩種情況下發生:
探測key的階段,當前probe次數達到了10+1/4*len或者我們發現插入的key居然是TOMBSTONE。
找到了key,並且對應的V為空&&當前的probe次數達到了10,當前的_slots已經達到了10+1/4*len。
我們來看chm.resize:
private final Object[] resize( NonBlockingHashMap topmap, Object[] kvs) { assert chm(kvs) == this; Object[] newkvs = _newkvs; // VOLATILE READ if( newkvs != null ) // See if resize is already in progress return newkvs; // Use the new table already int oldlen = len(kvs); // Old count of K,V pairs allowed int sz = size(); // Get current table count of active K,V pairs int newsz = sz; // First size estimate if( sz >= (oldlen>>2) ) { // If we are >25% full of keys then... newsz = oldlen<<1; // Double size if( sz >= (oldlen>>1) ) // If we are >50% full of keys then... newsz = oldlen<<2; // Double double size } long tm = System.currentTimeMillis(); long q=0; if( newsz <= oldlen && // New table would shrink or hold steady? tm <= topmap._last_resize_milli+10000 && // Recent resize (less than 1 sec ago) (q=_slots.estimate_get()) >= (sz<<1) ) // 1/2 of keys are dead? newsz = oldlen<<1; // Double the existing size if( newsz < oldlen ) newsz = oldlen; int log2; for( log2=MIN_SIZE_LOG; (1<<log2) < newsz; log2++ ) ; // Compute log2 of size long r = _resizers; while( !_resizerUpdater.compareAndSet(this,r,r+1) ) r = _resizers; int megs = ((((1<<log2)<<1)+4)<<3/*word to bytes*/)>>20/*megs*/; if( r >= 2 && megs > 0 ) { // Already 2 guys trying; wait and see newkvs = _newkvs; // Between dorking around, another thread did it if( newkvs != null ) // See if resize is already in progress return newkvs; // Use the new table already try { Thread.sleep(8*megs); } catch( Exception e ) { } } newkvs = _newkvs; if( newkvs != null ) // See if resize is already in progress return newkvs; // Use the new table already newkvs = new Object[((1<<log2)<<1)+2]; // This can get expensive for big arrays newkvs[0] = new CHM(_size); // CHM in slot 0 newkvs[1] = new int[1<<log2]; // hashes in slot 1 if( _newkvs != null ) // See if resize is already in progress return _newkvs; // Use the new table already if( CAS_newkvs( newkvs ) ) { // NOW a resize-is-in-progress! topmap.rehash(); // Call for Hashtable's benefit } else // CAS failed? newkvs = _newkvs; // Reread new table return newkvs; }
4-6行,如果_newkvs已經有了,則返回它
8-16行,得到當前_size數目,如果當前元素對數達到1/4,則擴容為原來兩倍,如果達到1/2,擴容為4倍,否則newsz = sz。
18-23行,根據_size跟slots的值判斷,如果dead狀態的元素過多,則可能需要縮減map的容量。但是從25行(if( newsz < oldlen ) newsz = oldlen)來看,至少維持當前的容量。
27-42行,計算適合當前newsz的2的次方個數。計算當前參與resize的執行緒個數,假如多於2個則多sleep一會兒,然後取得_newkvs,很快能這個_newkvs已經是被構造完成的了。
44-55行,構造newkvs ,並且嘗試CAS將它替代,然後返回構造完成的_newkvs;
我們再來看help_copy:
private final Object[] help_copy( Object[] helper ) { // Read the top-level KVS only once. We'll try to help this copy along, // even if it gets promoted out from under us (i.e., the copy completes // and another KVS becomes the top-level copy). Object[] topkvs = _kvs; CHM topchm = chm(topkvs); if( topchm._newkvs == null ) return helper; // No copy in-progress topchm.help_copy_impl(this,topkvs,false); return helper; }
在當前map的_kvs中得到CHM,檢視是否需要先協助CHM去擴容。假如當前是普通put操作,則將會協助複製,否則返回。
接著是重要的help_copy_impl:
private final void help_copy_impl( NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all ) { assert chm(oldkvs) == this; Object[] newkvs = _newkvs; assert newkvs != null; // Already checked by caller int oldlen = len(oldkvs); // Total amount to copy final int MIN_COPY_WORK = Math.min(oldlen,1024); // Limit per-thread work int panic_start = -1; int copyidx=-9999; // Fool javac to think it's initialized while( _copyDone < oldlen ) { // Still needing to copy? if( panic_start == -1 ) { // No panic? copyidx = (int)_copyIdx; while( copyidx < (oldlen<<1) && // 'panic' check !_copyIdxUpdater.compareAndSet(this,copyidx,copyidx+MIN_COPY_WORK) ) copyidx = (int)_copyIdx; // Re-read if( !(copyidx < (oldlen<<1)) ) // Panic! panic_start = copyidx; // Record where we started to panic-copy } int workdone = 0; for( int i=0; i<MIN_COPY_WORK; i++ ) if( copy_slot(topmap,(copyidx+i)&(oldlen-1),oldkvs,newkvs) ) // Made an oldtable slot go dead? workdone++; // Yes! if( workdone > 0 ) // Report work-done occasionally copy_check_and_promote( topmap, oldkvs, workdone );// See if we can promote copyidx += MIN_COPY_WORK; if( !copy_all && panic_start == -1 ) // No panic? return; // Then done copying after doing MIN_COPY_WORK } copy_check_and_promote( topmap, oldkvs, 0 );// See if we can promote }
6行:取較小的數字,作為一次性複製的一段資料個數。oldlen或則1024。
10-19行:copyidx用於跟蹤複製的元素下標,panic_start 用於指示當前是否需要複製全部資料。所以這裡嘗試將_copyIdx原子地增加MIN_COPY_WORK,從而獲取對這段資料的複製權。並且檢視copyidx 是否過大,假如超過oldlen*2,則說明當前所有資料都正在被複制。所以該執行緒的工作是確保當前所有資料複製完畢。
21-29行,真正開始了我們的複製:
int workdone = 0; for( int i=0; i<MIN_COPY_WORK; i++ ) if( copy_slot(topmap,(copyidx+i)&(oldlen-1),oldkvs,newkvs) ) // Made an oldtable slot go dead? workdone++; // Yes! if( workdone > 0 ) // Report work-done occasionally copy_check_and_promote( topmap, oldkvs, workdone );// See if we can promote copyidx += MIN_COPY_WORK;
這裡的程式碼,通過將舊值複製到原本為空的newkvs上之後,然後增加_copyDone,從而嘗試promote,後來可以看到,這裡的程式碼運用了狀態機的思想,從而非常好的處理了競爭。
30-33行,不用複製全部的情況下,及時返回。最後一句copy_check_and_promote( topmap, oldkvs, 0 ); 確保將來極端情況下,由於後面的newkvs相對頂層的kvs先擴容,但是由於其並不是頂層的_kvs,所以只能留給將來的copy操作來複制。所以這裡要有copy_check_and_promote( topmap, oldkvs, 0 )。
over。
我們看一下另一個入口:
copy_slot_and_check private final Object[] copy_slot_and_check( NonBlockingHashMap topmap, Object[] oldkvs, int idx, Object should_help ) { assert chm(oldkvs) == this; Object[] newkvs = _newkvs; // VOLATILE READ // We're only here because the caller saw a Prime, which implies a // table-copy is in progress. assert newkvs != null; if( copy_slot(topmap,idx,oldkvs,_newkvs) ) // Copy the desired slot copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied // Generically help along any copy (except if called recursively from a helper) return (should_help == null) ? newkvs : topmap.help_copy(newkvs); }
這裡是一個針對單個元素的複製,並且根據should_help來判斷是直接往新的kvs裡面插入資料,還是先協助久的kvs完成複製,然後往新的kvs插入資料。
這裡也同樣呼叫了copy_slot和copy_check_and_promote,可想而知這兩個呼叫是重點。
我們來看關鍵的copy_slot(其實名字寫作transferSlot更合適):
private boolean copy_slot( NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs ) { Object key; while( (key=key(oldkvs,idx)) == null ) CAS_key(oldkvs,idx, null, TOMBSTONE); Object oldval = val(oldkvs,idx); // Read OLD table while( !(oldval instanceof Prime) ) { final Prime box = (oldval == null || oldval == TOMBSTONE) ? TOMBPRIME : new Prime(oldval); if( CAS_val(oldkvs,idx,oldval,box) ) { // CAS down a box'd version of oldval if( box == TOMBPRIME ) return true; oldval = box; // Record updated oldval break; // Break loop; oldval is now boxed by us } oldval = val(oldkvs,idx); // Else try, try again } if( oldval == TOMBPRIME ) return false; // Copy already complete here! Object old_unboxed = ((Prime)oldval)._V; assert old_unboxed != TOMBSTONE; boolean copied_into_new = (putIfMatch(topmap, newkvs, key, old_unboxed, null) == null); while( !CAS_val(oldkvs,idx,oldval,TOMBPRIME) ) oldval = val(oldkvs,idx); return copied_into_new; } // end copy_slot } // End of CHM
3-4行,假如key==null,則替換為TOMBSTONE。這裡的狀態轉換為 null->TOMBSTONE,或者保持為不為空的key。
6-18行,取得oldVal,假如oldval instanceof Prime成立,則說明已經先有其他執行緒操作過了,所以我們判斷if( oldval == TOMBPRIME ) return false;這裡這麼做的原因是val的最終狀態為TOMBPRIME 。
8-16行,根據當前的val值來構造prime物件,假如null或者值為TOMBSTONE(說明原來對應的值不存在),我們採用TOMBPRIME,否則構造一個包含val的prime。接著嘗試CAS替換原來的V。假如成功,如果box是TOMBPRIME,則直接返回true,說明成功,並且不需要實際的複製操作。否則oldVal=box,break;然後帶著這個包含val的值去繼續操作。假如CAS失敗,那麼這裡的第16行繼續嘗試oldval = val(oldkvs,idx);
20-22行,這裡是真正的複製操作。因為到了這裡說明當前複製操作並未真正完成,並且需要真正複製,所以我們從當前的val中取得之前的V值,然後呼叫putIfMatch將它插入到下一層的kvs,注意這裡第五個引數為null,意思是隻在原來值為null時才插入資料,同時根據返回值是否為null來判斷是否完成真正的插入操作。得到copied_into_new來代表是否真正完成插入。
24-25行,將val的值改為TOMBPRIME,達到了最終的狀態。
27行,返回copied_into_new。用於更新copyDone。
所以可以看出,上面的重點是key與value的狀態轉換。兩個狀態機可能的轉換路徑為:
KEY: null->key,或者 null->TOMBSTONE
VALUE: null->value->value2…valueN->Prime(value)->TOMBPRIME 或者 null->TOMBPRIME , null->value->TOMBSTONE->TOMBPRIME
好的,我們接著來看看copy_check_and_promote,正是copy_slot與copy_check_and_promote配合才實現了正確的複製。
copy_check_and_promote: // --- copy_check_and_promote -------------------------------------------- private final void copy_check_and_promote( NonBlockingHashMap topmap, Object[] oldkvs, int workdone ) { assert chm(oldkvs) == this; int oldlen = len(oldkvs); long copyDone = _copyDone; assert (copyDone+workdone) <= oldlen; if( workdone > 0 ) { while( !_copyDoneUpdater.compareAndSet(this,copyDone,copyDone+workdone) ) { copyDone = _copyDone; // Reload, retry assert (copyDone+workdone) <= oldlen; } } if( copyDone+workdone == oldlen && // Ready to promote this table? topmap._kvs == oldkvs && // Looking at the top-level table? // Attempt to promote topmap.CAS_kvs(oldkvs,_newkvs) ) { topmap._last_resize_milli = System.currentTimeMillis(); // Record resize time for next check } }
注意這裡的程式碼就非常簡單了,使用傳入的workdone來增加_copyDone的值。並且在_copyDone達到當前kvs的長度oldlen時(也就是複製都已經完成),並且當前_kvs是oldkvs,那麼我們就更新_kvs的值為newkvs,並且將resize的時間更新一下(用於下一次擴容時的統計量)。
注意這裡有個關鍵點,我們的_kvs並不是volatile型別的變數,所以這裡的CAS_kvs即使操作成功了,其他執行緒也不一定立刻讀取到_newkvs。但是這並不會影響功能的正確性,因為讀到舊的kvs,我們會發現已經滿了,並且順著如Figure 1中粉紅色的引用方向定位到正確的kvs。
到此我們的無阻賽併發雜湊表的程式碼詳解結束了。
假如把擴容的時候也算上,那麼這個Hash表並不算無鎖的,假如這幾行
boolean copied_into_new = (putIfMatch(topmap, newkvs, key, old_unboxed, null) == null); // --- // Finally, now that any old value is exposed in the new table, we can // forever hide the old-table value by slapping a TOMBPRIME down. This // will stop other threads from uselessly attempting to copy this slot // (i.e., it's a speed optimization not a correctness issue). while( !CAS_val(oldkvs,idx,oldval,TOMBPRIME) ) oldval = val(oldkvs,idx); return copied_into_new;
第1行執行之後,與第11行執行之前,某個執行緒這個部分如果程式碼執行的特別慢,同時其他的執行緒採取的是_copyIdx滿了更新整個kvs的策略,那麼剩餘所有執行緒都會被該執行緒自旋在原地,而無法取得progress。
另一方面,get時候的操作是
// to different _kvs arrays. private static final Object key(Object[] kvs,int idx) { return kvs[(idx<<1)+2]; } private static final Object val(Object[] kvs,int idx) { return kvs[(idx<<1)+3]; }
這樣普通的取元素操作,我認為存在取得舊值的可能,而且只有記憶體最終一致性的保證。所以當我們用某個執行緒1 put一個元素,另一個執行緒2 get這個元素,即是執行緒1 完成了put操作,執行緒2依然可能無法get到新值。建議這裡改為retun _unsafe.getObjectVolatile(kvs, _Obase + idx * _Oscale);
儘管在測試中並沒有觀察到這種情況。
最後,可以對比下jdk8中的ConcurrentHashMap的資料轉移transfer時候的程式碼,它是有鎖的,並且採取了類似的模式來轉移,一小段(MIN_TRANSFER_STRIDE):
同時可參考我寫的ConcurrentHashMap
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }