並發之Striped64(l累加器)
阿新 • • 發佈:2018-06-03
機制 協助 AD clas 很多 com numa 想要 就是 並發之Striped64(累加器)
對於該類的實現思想:
Striped64是在java8中添加用來支持累加器的並發組件,它可以在並發環境下使用來做某種計數,Striped64的設計思路是在競爭激烈的時候盡量分散競爭,在實現上,Striped64維護了一個base Count和一個Cell數組,計數線程會首先試圖更新base變量,如果成功則退出計數,否則會認為當前競爭是很激烈的,那麽就會通過Cell數組來分散計數,Striped64根據線程來計算哈希,然後將不同的線程分散到不同的Cell數組的index上,然後這個線程的計數內容就會保存在該Cell的位置上面,基於這種設計,最後的總計數需要結合base以及散落在Cell數組中的計數內容。這種設計思路類似於java7的ConcurrentHashMap實現,也就是所謂的分段鎖算法,ConcurrentHashMap會將記錄根據key的hashCode來分散到不同的segment上,線程想要操作某個記錄只需要鎖住這個記錄對應著的segment就可以了,而其他segment並不會被鎖住,其他線程任然可以去操作其他的segment,這樣就顯著提高了並發度,雖然如此,java8中的ConcurrentHashMap實現已經拋棄了java7中分段鎖的設計,而采用更為輕量級的CAS來協調並發,效率更佳。
為了更加清楚的明確我上面的闡述,請看下面的圖示:
讓我們來看看Cell這個類;它被Contended修飾,目的是為了防止變量的偽共享;
/**
* cell表,容量為2的次冪
*/
transient volatile Cell[] cells;
/**
* 基礎值,在更新操作時基於CAS無鎖技術實現原子更新
*/
transient volatile long base;
/**
* 自旋鎖 用於保護創建或者擴展Cell表。
*/
transient volatile int cellsBusy;
在這裏講下為什麽要使用自旋鎖:本人理解,自旋鎖的一個最大特點或者最佳的使用場景就是要求線程持有鎖的時間較短,那麽在累加器中這種場景最為符合了;至於什麽是自旋鎖,在接下來會講到;
@sun.misc.Contended static final class Cell {
//保存要累加的值
volatile long value;
Cell(long x) { value = x; }
//使用Unsafe類的cas來更新value的值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
// 獲取value值在Cell對象中的偏移量,以便迅速定位
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
在LongAdder類中,當我們調用public void add(long x)方法進行累加的時候,看看都做了些什麽:
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x))){
//調用Striped64的方法
longAccumulate(x, null, uncontended);
}
}
}
1 (as=cells)!=null;數組不為空;如果並發量不大,那麽就不需要數組的操作或者協助,此時數組一定為null;但是此時數組不為空,那麽一定存在競爭; 2 !casBase(b = base, b + x);表示CAS更新操作;如果一個線程去CAS失敗,那麽表示正在有一個線程正在CAS操作,表示競爭激烈; 在條件1和條件2成立的情況下,表明當前線程的CAS更新操作失敗;或者cell數組不為空; uncontended表示線程對cell數組中的元素操作是否不存在競爭,不存在返回true; 3 默認情況下不存在競爭; 4 看第二個判斷為什麽還有判斷as==null呢,因為上述的第一個判斷是||的情況,有可能CAS失敗,但是數組還沒有初始化; 5 as == null 表示數組沒有初始化 6 m = as.length - 1<0表示數組的長度為0 7 (a = as[getProbe() & m]);獲取當前線程的ThreadLocalRandomProb(當前本地線程探測值)然後對cell數組的長度取余數,該值為null,說明這個地方從來沒有過線程做累加; 8 !(uncontended = a.cas(v = a.value, v + x));表示線程對cell數組中的某一個變量的值得CAS更新失敗,那麽這個位置存在競爭或者沖突;uncontended默認為true,不存在沖突 上述中5、6表示數組沒有初始化 上述中7表示將當前線程散列到cell數組中的位置沒有其他線程做過累加 上述中的8表示產生了沖突,uncontended=false; 上述四個條件成立一個就會進入 longAccumulate(x, null, uncontended);方法中 第一步:進入longAccumulate()方法表示此事存在競爭,或者並發很激烈; 1) 線程第一次進入該方法,那麽ThreadLocalRandomProb(當前本地線程探測值)的值為0,那麽第一次強制初始化;然後獲取該線程的ThreadLocalRandomProb,即在Cell數組中的位置;並且第一次初始化數組,而且線程在該位置上沒有存在競爭,那麽設置uncontended=true
int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; }
2) 此時設置該位置上的線程要對數據進行更新,設置cas更新成功與否的標誌;如果CAS成功表示false; //cas沖突標誌,表示當前線程hash到的Cells數組的位置,做cas累加操作時與其它線程發生了沖突,cas失敗;collide=true代表有沖突,collide=false代表無沖突 boolean collide = false; 高並發下計數功能最好的數據結構就是LongAdder與DoubleAdder,低並發下效率也非常優秀,這是我見過的java並發包中設計的最為巧妙的類,從軟硬件方面將java並發累加操作優化到了極致,所以應該我們應該弄清楚它的每一行代碼為什麽要這樣做,它倆的實現大同小異,下面以LongAdder類為例介紹下它的實現。
Striped64類
public class LongAdder extends Striped64 implements Serializable LongAdder繼承了Striped64類,來實現累加功能的,它是實現高並發累加的工具類; Striped64的設計核心思路就是通過內部的分散計算來避免競爭。 Striped64內部包含一個base和一個Cell[] cells數組,又叫hash表。 沒有競爭的情況下,要累加的數通過cas累加到base上;如果有競爭的話,會將要累加的數累加到Cells數組中的某個cell元素裏面。所以整個Striped64的值為sum=base+∑[0~n]cells。 Striped64內部三個重要的成員變量: /** * 存放Cell的hash表,大小為2的冪。 */ transient volatile Cell[] cells; /** * 基礎值, * 1. 在沒有競爭時會更新這個值; * 2. 在cells初始化的過程中,cells處於不可用的狀態,這時候也會嘗試將通過cas操作值累加到base。 */ transient volatile long base; /** * 自旋鎖,通過CAS操作加鎖,用於保護創建或者擴展Cell表。 */ transient volatile int cellsBusy;成員變量cells
cells數組是LongAdder高性能實現的必殺器: AtomicInteger只有一個value,所有線程累加都要通過cas競爭value這一個變量,高並發下線程爭用非常嚴重; 而LongAdder則有兩個值用於累加,一個是base,它的作用類似於AtomicInteger裏面的value,在沒有競爭的情況不會用到cells數組,它為null,這時使用base做累加,有了競爭後cells數組就上場了,第一次初始化長度為2,以後每次擴容都是變為原來的兩倍,直到cells數組的長度大於等於當前服務器cpu的數量為止就不在擴容(想下為什麽到超過cpu數量的時候就不再擴容);每個線程會通過線程對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value做累加,這樣相當於將線程綁定到了cells中的某個cell對象上;成員變量cellsBusy
cellsBusy,它有兩個值0 或1,它的作用是當要修改cells數組時加鎖,防止多線程同時修改cells數組,0為無鎖,1為加鎖,加鎖的狀況有三種 1. cells數組初始化的時候; 2. cells數組擴容的時候; 3. 如果cells數組中某個元素為null,給這個位置創建新的Cell對象的時候;成員變量base
它有兩個作用: 1. 在開始沒有競爭的情況下,將累加值累加到base 2. 在cells初始化的過程中,cells不可用,這時會嘗試將值累加到base上;Cell內部類
//為提高性能,使用註解@sun.misc.Contended,用來避免偽共享, @sun.misc.Contended static final class Cell { //用來保存要累加的值 volatile long value; Cell(long x) { value = x; } //使用UNSAFE類的cas來更新value值 final boolean cas(long cmp, long val) { returnUNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; //value在Cell類中存儲位置的偏移量; private static final long valueOffset; //這個靜態方法用於獲取偏移量 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } 這個類很簡單,final類型,內部有一個value值,使用cas來更新它的值;Cell類唯一需要註意的地方就是Cell類的註解@sun.misc.Contended。偽共享
要理解Contended註解的作用,要先弄清楚什麽是偽共享,會有什麽影響,如何解決偽共享。緩存行cache line
要理解偽共享先要弄清楚什麽是cache line,cpu的緩存系統中是以緩存行(cache line)為單位存儲的,緩存行是2的整數冪個連續字節,一般為32-256個字節。最常見的緩存行大小是64個字節,cache line是cache和memory之間數據傳輸的最小單元。 大多數現代cpu都one-die了L1和L2cache。對於L1 cache,大多是write though的;L2 cache則是write back的,不會立即寫回memory,這就會導致cache和memory的內容的不一致;另外,對於mp(multi processors)的環境,由於cache是cpu私有的,不同cpu的cache的內容也存在不一致的問題,因此很多mp的的計算架構,不論是ccnuma還是smp都實現了cache coherence的機制,即不同cpu的cache一致性機制。 Write-through(直寫模式)在數據更新時,同時寫入緩存Cache和後端存儲。 此模式的優點是操作簡單;缺點是因為數據修改需要同時寫入存儲,數據寫入速度較慢。 Write-back(回寫模式)在數據更新時只寫入緩存Cache。只在數據被替換出緩存時, 被修改的緩存數據才會被寫到後端存儲。此模式的優點是數據寫入速度快,因為不需要寫存儲; 缺點是一旦更新後的數據未被寫入存儲時出現系統掉電的情況,數據將無法找回 cache coherence的一種實現是通過cache-snooping協議,每個cpu通過對bus的snoop實現對其它cpu讀寫cache的監控: 當cpu1要寫cache時,其它cpu就會檢查自己cache中對應的cache line,如果是dirty的,就write back到memory,並且會將cpu1的相關cache line刷新;如果不是dirty的,就invalidate該cache line. 當cpu1要讀cache時,其它cpu就會將自己cache中對應的cache line中標記為dirty的部分write back到memory,並且會將cpu1的相關cache line刷新。 所以,提高cpu的cache hit rate,減少cache和memory之間的數據傳輸,將會提高系統的性能。 因此,在程序和二進制對象的內存分配中保持cache line aligned就十分重要,如果不保證cache line對齊,出現多個cpu中並行運行的進程或者線程同時讀寫同一個cache line的情況的概率就會很大。這時cpu的cache和memory之間會反復出現write back和refresh情況,這種情形就叫做cache thrashing。 為了有效的避免cache thrashing,通常有以下兩種途徑: 對於heap的分配,很多系統在malloc調用中實現了強制的alignment. 對於stack的分配,很多編譯器提供了stack aligned的選項。 當然,如果在編譯器指定了stack aligned,程序的尺寸將會變大,會占用更多的內存。因此,這中間的取舍需要仔細考慮; 關於偽共享詳情請看這裏介紹以及這裏; 為了解決這個問題在jdk1.6會采用long padding的方式,就是在防止被偽共享的變量的前後加上7個long類型的變量,如下所示: public class VolatileLongPadding { volatile long p0, p1, p2, p3, p4, p5, p6; volatile long v = 0L; volatile long q0, q1, q2, q3, q4, q5, q6; } jdk1.7的某個版本後會優化掉long padding,為了解決這個問題,在jdk1.8中加入了@sun.misc.Contended;LongAdder
前面說了一大堆,現在終於進入到正題了。LongAdder –>add方法
add方法是LongAdder累加的方法,傳入的參數x為要累加的值; public void add(long x) { Cell[] as; long b, v; int m; Cell a; /** * 如果一下兩種條件則繼續執行if內的語句 * 1. cells數組不為null(不存在爭用的時候,cells數組一定為null,一旦對base的cas操作失敗,才會初始化cells數組) * 2. 如果cells數組為null,如果casBase執行成功,則直接返回,如果casBase方法執行失敗(casBase失敗,說明第一次爭用沖突產生,需要對cells數組初始化)進入if內; * casBase方法很簡單,就是通過UNSAFE類的cas設置成員變量base的值為base+要累加的值 * casBase執行成功的前提是無競爭,這時候cells數組還沒有用到為null,可見在無競爭的情況下是類似於AtomticInteger處理方式,使用cas做累加。 */ if ((as = cells) != null || !casBase(b = base, b + x)) { //uncontended判斷cells數組中,當前線程要做cas累加操作的某個元素是否#不#存在爭用,如果cas失敗則存在爭用;uncontended=false代表存在爭用,uncontended=true代表不存在爭用。 boolean uncontended = true; /** *1. as == null : cells數組未被初始化,成立則直接進入if執行cell初始化 *2. (m = as.length - 1) < 0: cells數組的長度為0 *條件1與2都代表cells數組沒有被初始化成功,初始化成功的cells數組長度為2; *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的長度不為0,則通過getProbe方法獲取當前線程Thread的threadLocalRandomProbe變量的值,初始為0,然後執行threadLocalRandomProbe&(cells.length-1 ),相當於m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置為null,這說明這個位置從來沒有線程做過累加,需要進入if繼續執行,在這個位置創建一個新的Cell對象; *4. !(uncontended = a.cas(v = a.value, v + x)):嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作,並返回操作結果,如果失敗了則進入if,重新計算一個threadLocalRandomProbe; 如果進入if語句執行longAccumulate方法,有三種情況 1. 前兩個條件代表cells沒有初始化, 2. 第三個條件指當前線程hash到的cells數組中的位置還沒有其它線程做過累加操作, 3. 第四個條件代表產生了沖突,uncontended=false **/ if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }longAccumulate方法
三個參數第一個為要累加的值,第二個為null,第三個為wasUncontended表示調用方法之前的add方法是否未發生競爭; final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //獲取當前線程的threadLocalRandomProbe值作為hash值,如果當前線程的threadLocalRandomProbe為0,說明當前線程是第一次進入該方法,則強制設置線程的threadLocalRandomProbe為ThreadLocalRandom類的成員靜態私有變量probeGenerator的值,後面會詳細將hash值的生成; //另外需要註意,如果threadLocalRandomProbe=0,代表新的線程開始參與cell爭用的情況 //1.當前線程之前還沒有參與過cells爭用(也許cells數組還沒初始化,進到當前方法來就是為了初始化cells數組後爭用的),是第一次執行base的cas累加操作失敗; //2.或者是在執行add方法時,對cells某個位置的Cell的cas操作第一次失敗,則將wasUncontended設置為false,那麽這裏會將其重新置為true;第一次執行操作失敗; //凡是參與了cell爭用操作的線程threadLocalRandomProbe都不為0; int h; if ((h = getProbe()) == 0) { //初始化ThreadLocalRandom; ThreadLocalRandom.current(); // force initialization //將h設置為0x9e3779b9 h = getProbe(); //設置未競爭標記為true wasUncontended = true; } //cas沖突標誌,表示當前線程hash到的Cells數組的位置,做cas累加操作時與其它線程發生了沖突,cas失敗;collide=true代表有沖突,collide=false代表無沖突 boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; //這個主幹if有三個分支 //1.主分支一:處理cells數組已經正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4) //2.主分支二:處理cells數組沒有初始化或者長度為0的情況;(這個分支處理add方法的四個條件中的1和2) //3.主分支三:處理如果cell數組沒有初始化,並且其它線程正在執行對cells數組初始化的操作,及cellbusy=1;則嘗試將累加值通過cas累加到base上 //先看主分支一 if ((as = cells) != null && (n = as.length) > 0) { /** *內部小分支一:這個是處理add方法內部if分支的條件3:如果被hash到的位置為null,說明沒有線程在這個位置設置過值,沒有競爭,可以直接使用,則用x值作為初始值創建一個新的Cell對象,對cells數組使用cellsBusy加鎖,然後將這個Cell對象放到cells[m%cells.length]位置上 */ if ((a = as[(n - 1) & h]) == null) { //cellsBusy == 0 代表當前沒有線程cells數組做修改 if (cellsBusy == 0) { //將要累加的x值作為初始值創建一個新的Cell對象, Cell r = new Cell(x); //如果cellsBusy=0無鎖,則通過cas將cellsBusy設置為1加鎖 if (cellsBusy == 0 && casCellsBusy()) { //標記Cell是否創建成功並放入到cells數組被hash的位置上 boolean created = false; try { Cell[] rs; int m, j; //再次檢查cells數組不為null,且長度不為空,且hash到的位置的Cell為null if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //將新的cell設置到該位置 rs[j] = r; created = true; } } finally { //去掉鎖 cellsBusy = 0; } //生成成功,跳出循環 if (created) break; //如果created為false,說明上面指定的cells數組的位置cells[m%cells.length]已經有其它線程設置了cell了,繼續執行循環。 continue; } } //如果執行的當前行,代表cellsBusy=1,有線程正在更改cells數組,代表產生了沖突,將collide設置為false collide = false; /** *內部小分支二:如果add方法中條件4的通過cas設置cells[m%cells.length]位置的Cell對象中的value值設置為v+x失敗,說明已經發生競爭,將wasUncontended設置為true,跳出內部的if判斷,最後重新計算一個新的probe,然後重新執行循環; */ } else if (!wasUncontended) //設置未競爭標誌位true,繼續執行,後面會算一個新的probe值,然後重新執行循環。 wasUncontended = true; /** *內部小分支三:新的爭用線程參與爭用的情況:處理剛進入當前方法時threadLocalRandomProbe=0的情況,也就是當前線程第一次參與cell爭用的cas失敗,這裏會嘗試將x值加到cells[m%cells.length]的value ,如果成功直接退出 */ else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; /** *內部小分支四:分支3處理新的線程爭用執行失敗了,這時如果cells數組的長度已經到了最大值(大於等於cup數量),或者是當前cells已經做了擴容,則將collide設置為false,後面重新計算prob的值 else if (n >= NCPU || cells != as) collide = false; /** *內部小分支五:如果發生了沖突collide=false,則設置其為true;會在最後重新計算hash值後,進入下一次for循環 */ else if (!collide) //設置沖突標誌,表示發生了沖突,需要再次生成hash,重試。 如果下次重試任然走到了改分支此時collide=true,!collide條件不成立,則走後一個分支 collide = true; /** *內部小分支六:擴容cells數組,新參與cell爭用的線程兩次均失敗,且符合庫容條件,會執行該分支 */ else if (cellsBusy == 0 && casCellsBusy()) { try { //檢查cells是否已經被擴容 if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //為當前線程重新計算hash值 h = advanceProbe(h); //這個大的分支處理add方法中的條件1與條件2成立的情況,如果cell表還未初始化或者長度為0,先嘗試獲取cellsBusy鎖。 }else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table //初始化cells數組,初始容量為2,並將x值通過hash&1,放到0個或第1個位置上 if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //解鎖 cellsBusy = 0; } //如果init為true說明初始化成功,跳出循環 if (init) break; } /** *如果以上操作都失敗了,則嘗試將值累加到base上; */ else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }關於hash的生成
hash是LongAdder定位當前線程應該將值累加到cells數組哪個位置上的,所以hash的算法是非常重要的,下面就來看看它的實現。 java的Thread類裏面有一個成員變量 @sun.misc.Contended("tlr") int threadLocalRandomProbe; threadLocalRandomProbe這個變量的值就是LongAdder用來hash定位Cells數組位置的,平時線程的這個變量一般用不到,它的值一直都是0。 在LongAdder的父類Striped64裏通過getProbe方法獲取當前線程threadLocalRandomProbe的值: static final int getProbe() { //PROBE是threadLocalRandomProbe變量在Thread類裏面的偏移量,所以下面語句獲取的就是threadLocalRandomProbe的值; return UNSAFE.getInt(Thread.currentThread(), PROBE); }threadLocalRandomProbe的初始化
線程對LongAdder的累加操作,在沒有進入longAccumulate方法前,threadLocalRandomProbe一直都是0,當發生爭用後才會進入longAccumulate方法中,進入該方法第一件事就是判斷threadLocalRandomProbe是否為0,如果為0,則將其設置為0x9e3779b9 int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); //設置未競爭標記為true wasUncontended = true; } 重點在這行ThreadLocalRandom.current(); public static ThreadLocalRandom current() { if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0) localInit(); return instance; } 在current方法中判斷如果probe的值為0,則執行locaInit()方法,將當前線程的probe設置為非0的值,該方法實現如下: static final void localInit() { //private static final AtomicInteger probeGenerator = new AtomicInteger(); //private static final int PROBE_INCREMENT = 0x9e3779b9; int p = probeGenerator.addAndGet(PROBE_INCREMENT); //prob不能為0 int probe = (p == 0) ? 1 : p; // skip 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); //獲取當前線程 Thread t = Thread.currentThread(); UNSAFE.putLong(t, SEED, seed); //將probe的值更新為probeGenerator的值 UNSAFE.putInt(t, PROBE, probe); } probeGenerator 是static 類型的AtomicInteger類,每執行一次localInit()方法,都會將probeGenerator 累加一次0x9e3779b9這個值;,0x9e3779b9這個數字的得來是 2^32 除以一個常數,這個常數就是傳說中的黃金比例 1.6180339887;然後將當前線程的threadLocalRandomProbe設置為probeGenerator 的值,如果probeGenerator 為0,這取1;threadLocalRandomProbe重新生成
就是將prob的值左右移位 、異或操作三次 static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } probe從=1開始反復執行10次,結果如下: 1 270369 67634689 -1647531835 307599695 -1896278063 745495504 632435482 435756210 2005365029 -1378868364並發之Striped64(l累加器)