Java高併發程式設計筆記11之JDK8對併發的新支援
1. LongAdder
和AtomicLong類似的使用方式,但是效能比AtomicLong更好。
LongAdder與AtomicLong都是使用了原子操作來提高效能。但是LongAdder在AtomicLong的基礎上進行了熱點分離,熱點分離類似於有鎖操作中的減小鎖粒度,將一個鎖分離成若干個鎖來提高效能。在無鎖中,也可以用類似的方式來增加CAS的成功率,從而提高效能。
LongAdder原理圖:
AtomicLong的實現方式是內部有個value 變數,當多執行緒併發自增,自減時,均通過CAS 指令從機器指令級別操作保證併發的原子性。唯一會制約AtomicLong高效的原因是高併發,高併發意味著CAS的失敗機率更高, 重試次數更多,越多執行緒重試,CAS失敗機率又越高,變成惡性迴圈,AtomicLong效率降低。
而LongAdder將把一個value拆分成若干cell,把所有cell加起來,就是value。所以對LongAdder進行加減操作,只需要對不同的cell來操作,不同的執行緒對不同的cell進行CAS操作,CAS的成功率當然高了(試想一下3+2+1=6,一個執行緒3+1,另一個執行緒2+1,最後是8,LongAdder沒有乘法除法的API)。
可是在併發數不是很高的情況,拆分成若干的cell,還需要維護cell和求和,效率不如AtomicLong的實現。LongAdder用了巧妙的辦法來解決了這個問題。
初始情況,LongAdder與AtomicLong是相同的,只有在CAS失敗時,才會將value拆分成cell,每失敗一次,都會增加cell的數量,這樣在低併發時,同樣高效,在高併發時,這種“自適應”的處理方式,達到一定cell數量後,CAS將不會失敗,效率大大提高。
LongAdder是一種以空間換時間的策略。
2. CompletableFuture
實現CompletionStage介面(40餘個方法),大多數方法多數應用在函數語言程式設計中。並且支援流式呼叫
CompletableFuture是Java 8中對Future的增強版
簡單實現:
Future最令人詬病的就是要等待,要自己去檢查任務是否完成了,在Future中,任務完成的時間是不可控的。而CompletableFuture的import java.util.concurrent.CompletableFuture; public class AskThread implements Runnable { CompletableFuture<Integer> re = null; public AskThread(CompletableFuture<Integer> re) { this.re = re; } @Override public void run() { int myRe = 0; try { myRe = re.get() * re.get(); } catch (Exception e) { } System.out.println(myRe); } public static void main(String[] args) throws InterruptedException { final CompletableFuture<Integer> future = new CompletableFuture<Integer>(); new Thread(new AskThread(future)).start(); // 模擬長時間的計算過程 Thread.sleep(1000); // 告知完成結果 future.complete(60); } }
最大改進在於,任務完成的時間也開放了出來。
future.complete(60);
用來設定完成時間。CompletableFuture的非同步執行:
public static Integer calc(Integer para) {
try {
// 模擬一個長時間的執行
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return para * para;
}
public static void main(String[] args) throws InterruptedException,
ExecutionException {
final CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> calc(50));
System.out.println(future.get());
}
CompletableFuture的流式呼叫:
public static Integer calc(Integer para) {
try {
// 模擬一個長時間的執行
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return para * para;
}
public static void main(String[] args) throws InterruptedException,
ExecutionException {
CompletableFuture<Void> fu = CompletableFuture
.supplyAsync(() -> calc(50))
.thenApply((i) -> Integer.toString(i))
.thenApply((str) -> "\"" + str + "\"")
.thenAccept(System.out::println);
fu.get();
}
組合多個CompletableFuture:
public static Integer calc(Integer para) {
return para / 2;
}
public static void main(String[] args) throws InterruptedException,
ExecutionException {
CompletableFuture<Void> fu = CompletableFuture
.supplyAsync(() -> calc(50))
.thenCompose(
(i) -> CompletableFuture.supplyAsync(() -> calc(i)))
.thenApply((str) -> "\"" + str + "\"")
.thenAccept(System.out::println);
fu.get();
}
這幾個例子更多是側重Java8的一些新特性,這裡就簡單舉下例子來說明特性,就不深究了。
CompletableFuture跟效能上關係不大,更多的是為了支援函數語言程式設計,在功能上的增強。當然開放了完成時間的設定是一大亮點。
3. StampedLock
在上一篇中剛剛提到了鎖分離,而鎖分離的重要的實現就是ReadWriteLock。而StampedLock則是ReadWriteLock的一個改進。StampedLock與ReadWriteLock的區別在於,StampedLock認為讀不應阻塞寫,StampedLock認為當讀寫互斥的時候,讀應該是重讀,而不是不讓寫執行緒寫。這樣的設計解決了讀多寫少時,使用ReadWriteLock會產生寫執行緒飢餓現象。
所以StampedLock是一種偏向於寫執行緒的改進。
StampedLock示例:
import java.util.concurrent.locks.StampedLock;
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
上述程式碼模擬了寫執行緒和讀執行緒, StampedLock根據stamp來檢視是否互斥,寫一次stamp變增加某個值
tryOptimisticRead()
就是剛剛所說的讀寫不互斥的情況。每次讀執行緒要讀時,會先判斷
if (!sl.validate(stamp))
validate中會先檢視是否有寫執行緒在寫,然後再判斷輸入的值和當前的 stamp是否相同,即判斷是否讀執行緒將讀到最新的資料。如果有寫執行緒在寫,或者 stamp數值不同,則返回失敗。如果判斷失敗,當然可以重複的嘗試去讀,在示例程式碼中,並沒有讓其重複嘗試讀,而採用的是將樂觀鎖退化成普通的讀鎖去讀,這種情況就是一種悲觀的讀法。
stamp = sl.readLock();
StampedLock的實現思想:
CLH自旋鎖:當鎖申請失敗時,不會立即將讀執行緒掛起,在鎖當中會維護一個等待執行緒佇列,所有申請鎖,但是沒有成功的執行緒都記錄在這個佇列中。每一個節點(一個節點代表一個執行緒),儲存一個標記位(locked),用於判斷當前執行緒是否已經釋放鎖。當一個執行緒試圖獲得鎖時,取得當前等待佇列的尾部節點作為其前序節點。並使用類似如下程式碼判斷前序節點是否已經成功釋放鎖
while (pred.locked) {
}
這個迴圈就是不斷等前面那個結點釋放鎖,這樣的自旋使得當前執行緒不會被作業系統掛起,從而提高了效能。當然不會進行無休止的自旋,會在若干次自旋後掛起執行緒。