Java併發學習(十一)-LongAdder和LongAccumulator探究
Java8在atomic包新增了5個類,分別是Striped64
,LongAdder
,LongAccumulator
,DoubleAdder
,DoubleAccumulator
。其中,Sriped64作為父類,其他分別是long和double的具體實現。
下面首先從父類Striped64
這個類開始講,其幾個類都是遵從它的結構進行實現的。
What is Striped64
Striped64,就向一個AtomicLong,裡面維持一個volatile的base,還有一個cell陣列,cell陣列主要是儲存執行緒需要增加或減少的值,它能夠將競爭的執行緒分散到自己內部的私有cell數組裡面,所以當併發量很大的時候,執行緒會被部分分發去訪問內部的cell陣列。這裡先看程式碼結構:
偽共享
在學習Striped64程式碼時候,遇到了一個新的知識點,就是偽共享(False Sharing):
@sun.misc.Contended static final class Cell
Cell的函式頭定義是這樣的,加了:@sun.misc.Contended
這個有什麼用呢?
在cpu執行指令時,通常會把指令載入到緩衝行(cache line)中,cpu的快取系統中是以快取行(cache line)為單位儲存的,快取行是2的整數冪個連續位元組,一般為32-256個位元組。最常見的快取行大小是64個位元組,cache line是cache和memory之間資料傳輸的最小單元。但是由於系統可能會把兩個不同的cpu處理的指令放到一起。
也就是,著一個cache line,既有cpu1所需執行的指令,又有cpu2所需執行的指令。
這樣一來,當出現競爭時,如果cpu1獲得了所有權,快取子系統將會使核心2中對應的快取行失效。當cpu2獲得了所有權然後執行更新操作,核心1就要使自己對應的快取行失效。這會來來回回的經過L3快取,大大影響了效能。如果互相競爭的核心位於不同的插槽,就要額外橫跨插槽連線,問題可能更加嚴重。
這個問題在Java7中,是通過填充long欄位來實現cache line的獨立性,,也就是本cache line裡面就只有我的變數:
public class VolatileLongPadding {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long v = 0L;
volatile long q0, q1, q2, q3, q4, q5, q6;
}
所以,加入@sun.misc.Contended
就是為了解決這個問題,讓當前執行緒執行操作變數處於一個獨立的cache line裡面。
Striped64程式碼結構
@SuppressWarnings("serial")
abstract class Striped64 extends Number {
/**
* 用@sun.misc.Contended來杜絕為共享。用來儲存衝突時需要增加的格子。cell
* CAS方式。
*/
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) {
value = x;
}
//CAS操作
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** Number of CPUS, to place bound on table size
* cpu的個數,繫結的table。 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
//cells陣列,大小為2的倍數
transient volatile Cell[] cells;
//基礎的值。不衝突下直接在base上增加,通過CAS更改。
transient volatile long base;
//判斷cells是否有執行緒在使用的變數,通過CAS去鎖定。
transient volatile int cellsBusy;
...
上述程式碼主要變數:
- cells:用於儲存衝突的執行緒
- base:基礎的數值,當沒有衝突或衝突很少時,就會在base上操作,而不用加入cell,也就是AtomicLong的原理
- cellsBusy:判斷cells是否被一個執行緒使用了,如果一個執行緒使用,就不自旋一次換個probe來進行。
Striped64裡面有兩個主要的方法:longAccumulate
和doubleAccumulate
,兩個方法方法實現幾乎一模一樣,這裡主要以longAccumulate
為例分析:
先說說主要思路,如果能夠通過CAS修改base成功,那麼直接退出(併發裡量不大),否則去cells裡面佔一個非空的坑(併發量大),並把要操作的值賦值儲存在一個Cell裡面。
所以這樣,只能去尋找沒有值的cell,如果有值,就說明以前有執行緒也是由於競爭base失敗從而來cells這裡報個到,到時候當去總數時才能加自己。
接下來看具體實現程式碼:
/**
* 裡面可以重新改變table大小,或者建立新的cells。
* @param x 增加的long值
* @param fn 函數語言程式設計,代表一個一個待執行操作的函式
* @param wasUncontended
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
//如果當前執行緒沒有初始化,就初始化下當前執行緒。
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
//cell有值。
if ((a = as[(n - 1) & h]) == null) {
//進入這個方法,就說明這個位置沒執行緒,所以你可以進來。進來後再看能不能獲取到cell鎖。
if (cellsBusy == 0) { // Try to attach new Cell
//新建一個cell,並且嘗試加進去
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//擴容操作。
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//這是cell初始化的過程
//直接修改base不成功,所以來修改cells做文章。
//cell為null,但是cellsBusy=0,但是有,加入一個cell中。
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; //最開始cells的大小為2
rs[h & 1] = new Cell(x); //給要增加的x,做一個cell的坑。
cells = rs;
init = true;
}
} finally {
//釋放鎖
cellsBusy = 0;
}
if (init)
break;
} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
//cell為null並且cellsBusy為1,也就是說,現在有人用cells,我就去嘗試更新base吧,接用CAS這個base來實現
break; // Fall back on using base
}
}
在doubleAccumulate中,基本程式碼與longAccumulate一致,但是,Cell裡面定義的value是long型別的,所以程式碼中進行了如下轉化,把double儲存在long中:
Cell r = new Cell(Double.doubleToRawLongBits(x))
接下來看兩組具體實現的子類。
LongAdder和LongAccumulator
按照Doug Lea的描述,LongAdder效能優於AtomicLong,從上面分析可以知道,當併發量不高時,其實二者效能差不多,但是一旦併發量超過一定限度,毫無疑問CAS會失敗,對於AtomicLong則沒有其他解決方法,而LongAdder則可以通過cells陣列來進行部分的“分流”操作。
進入正題,說說LongAdder,首先看看它裡面方法:
- public void add(long x):增加x
- public void increment() :自增
- public void decrement() :自減
- public long sum() :求和
- public void reset():重置cell陣列
- public long sumThenReset():求和並重置
這裡主要看看add方法,其他都好理解:
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
//如果cells不為null,或者CAS base變數失敗,說明衝突了,
//置uncontended為true
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//當as為null,或as的長度小於等於1,或a為null,,
//判斷符合呼叫父類方法進行相加操作
longAccumulate(x, null, uncontended);
}
}
思路還是很簡單的,而對於LongAccumulator,則是支援各種自定義運算的long運算。因為傳入了一個方法:
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
程式設計師可以自己定義二元的LongBinaryOperator並進行運算。LongAdder是相加,則LongAccumulator則是進行自己定義的操作。
DoubleAdder和DoubleAccumulator
對於DoubleAdder
和DoubleAccumulator
,則是將其轉化為long型別後進行運算的:
//long轉為double
public static native double longBitsToDouble(long bits);
//double轉為long
public static native long doubleToRawLongBits(double value);
具體實現思路與long型別實現一致,不再贅述。