Java8總結之併發增強
java8中對併發進行了一些增強優化。簡單總結一下
原子值
從 Java5 開始,java.util.concurrent.atonic包提供了用於支援無鎖可變變數的類。
/**
* 測試java8的併發增強
* @author lianghaining
*
*/
public class TestThread {
public static AtomicLong num = new AtomicLong();
@Test
public void testCAS() {
Long id = num.incrementAndGet();
}
}
incrementAndGet 方法會自動將AtomicLong 的值加 1,並返回增加後的值,並保證該操作不能被打斷。同時在多執行緒同時併發訪問同一個例項,也能夠計算並返回正確的值。
在 java5 中提供很多設定、增加、減少值的原子操作。但如果你想要進行更復雜的更新操作,就必須使用compareAndSet方法。看下面的程式碼
public static AtomicLong num = new AtomicLong();
@Test
public void testGetMaxNum() throws InterruptedException {
Thread t1 = new Thread(new LoopVolatile());
t1.start();
Thread t2 = new Thread(new LoopVolatile2());
t2.start();
while (t1.isAlive() || t2.isAlive()) {
}
System.out.println("final val is: " + num.get());
}
private static class LoopVolatile implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.set(num.get()+1L);
val++;
}
}
}
private static class LoopVolatile2 implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.set(num.get()+1L);
val++;
}
}
}
返回結果:
final val is: 8596441
final val is: 8379896
看返回結果可知如果是執行緒安全的應為:20000000,但不是,而且每一次執行都不一樣。這是因為 num.set(num.get()+1L)
並不是原子性的。可以看AtomicLong 中set方法。
private volatile long value;
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(long newValue) {
value = newValue;
}
可以看出set只保證了可見性,並不能保證原子性。有興趣可以看另一個人的部落格為什麼volatile不能保證原子性而Atomic可以你應該在一個迴圈裡使用comparenAndSet來計算新值
如下:
private static class LoopVolatile implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
//num.set(num.get()+1L);
long update ;
long oldValue ;
do{
oldValue = num.get();
update = oldValue + 1;
}while(!num.compareAndSet(oldValue, update));
//num.updateAndGet(x->(x+1));
val++;
}
}
}
private static class LoopVolatile2 implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
//num.set(num.get()+1L);
long update ;
long oldValue ;
do{
oldValue = num.get();
update = oldValue + 1;
}while(!num.compareAndSet(oldValue, update));
//num.updateAndGet(x->(x+1));
val++;
}
}
}
返回結果:final val is: 20000000
如果另一個執行緒也在更新num
,很可能它已經先更新成功了。那麼隨後compareAndSet會返回false,並不會設定新值。此時程式再次嘗試迴圈,讀取更新後的值並試圖改變它。最終,它成功地將已有值替換為新值。這遠比有鎖要快得多。
Now。在Java8中,你不必再編寫迴圈了,只需要提供一個用來更新值的lambda表示式,更新操作自動完成。如下
private static class LoopVolatile implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.updateAndGet(x->(x+1));
//或num.accumulateAndGet(1,(x,y)-> x+y);
val++;
}
}
}
private static class LoopVolatile2 implements Runnable {
public void run() {
long val = 0;
while (val < 10000000L) {
num.updateAndGet(x->(x+1));
//或num.accumulateAndGet(1,(x,y)-> x+y);
val++;
}
}
}
除了它之外,Java 8還提供了返回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。
當你有大量執行緒訪問同一個原子值時,由於AtomicXXX的樂觀鎖更新需要太多次重試,因此會導致效能嚴重下降。為此,Java8 提供了LongAdder 和 LongAccumulator 來解決該問題。這兩個原碼我以後單獨說一下,如果感興趣可以看一下
從LONGADDER看更高效的無鎖實現 皓哥的部落格有介紹過。
這裡寫說結論:LongAdder 由多個變數組成,這些變數累加的值即為當前值。多個執行緒可以更新不同的被加數,當執行緒數量增加時,會自動增加新的被加數。由於通常情況下都是直到所有工作完成後才需要總和值,所以這種方法效率很高。
如果你的業務存在高度競爭,那麼應該選擇 LongAdder 來代替AtomicLong.
LongAccumulator 將這個思想帶到了任意的累加操作中。如下
LongAccumulator adder = new LongAccumulator(Long::sum, 0);
與
LongAdder是一樣的
ConcurrentHashMap改進
在多執行緒中ConcurrentHashMap是常用的一個型別,在1.8相對於1.7有很大的不同。無論是雜湊衝突下大數量查詢時雙向連結串列的效率低下,還是在多執行緒下,分段鎖實現的複雜。1.8中都做了不同的實現。
1. 1.7實現
資料結構
jdk1.7中採用Segment + HashEntry的方式進行實現,結構如下:
ConcurrentHashMap初始化時,計算出Segment陣列的大小ssize和每個Segment中HashEntry陣列的大小cap,並初始化Segment陣列的第一個元素;其中ssize大小為2的冪次方,預設為16,cap大小也是2的冪次方,最小值為2,最終結果根據根據初始化容量initialCapacity進行計算,計算過程如下:
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
其中Segment在實現上繼承了ReentrantLock,這樣就自帶了鎖的功能。
put實現
當執行put方法插入資料時,根據key的hash值,在Segment陣列中找到相應的位置,如果相應位置的Segment還未初始化,則通過CAS進行賦值,接著執行Segment物件的put方法通過加鎖機制插入資料,實現如下:
場景:執行緒A和執行緒B同時執行相同Segment物件的put方法
1、執行緒A執行tryLock()方法成功獲取鎖,則把HashEntry物件插入到相應的位置;
2、執行緒B獲取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重複執行tryLock()方法嘗試獲取鎖,在多處理器環境下,重複次數為64,單處理器重複次數為1,當執行tryLock()方法的次數超過上限時,則執行lock()方法掛起執行緒B;
3、當執行緒A執行完插入操作時,會通過unlock()方法釋放鎖,接著喚醒執行緒B繼續執行;
size實現
因為ConcurrentHashMap是可以併發插入資料的,所以在準確計算元素時存在一定的難度,一般的思路是統計每個Segment物件中的元素個數,然後進行累加,但是這種方式計算出來的結果並不一樣的準確的,因為在計算後面幾個Segment的元素個數時,已經計算過的Segment同時可能有資料的插入或則刪除,在1.7的實現中,採用瞭如下方式:
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
先採用不加鎖的方式,連續計算元素的個數,最多計算3次:
1、如果前後兩次計算結果相同,則說明計算出來的元素個數是準確的;
2、如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數;
2.1.8實現
資料結構
1.8中放棄了Segment臃腫的設計,取而代之的是採用Node + CAS + Synchronized來保證併發安全進行實現,結構如下:
只有在執行第一次put方法時才會呼叫initTable()初始化Node陣列,實現如下:
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put實現
當執行put方法插入資料時,根據key的hash值,在Node陣列中找到相應的位置,實現如下:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
1、如果相應位置的Node還未初始化,則通過CAS插入相應的資料;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
2、如果相應位置的Node不為空,且當前該節點不處於移動狀態,則對該節點加synchronized鎖,如果該節點的hash不小於0,則遍歷連結串列更新節點或插入新節點;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
3、如果該節點是TreeBin型別的節點,說明是紅黑樹結構,則通過putTreeVal方法往紅黑樹中插入節點;
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
4、如果binCount不為0,說明put操作對資料產生了影響,如果當前連結串列的個數達到8個,則通過treeifyBin方法轉化為紅黑樹,如果oldVal不為空,說明是一次更新操作,沒有對元素個數產生影響,則直接返回舊值;
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
5、如果插入的是一個新節點,則執行addCount()方法嘗試更新元素個數baseCount;
size實現
1.8中使用一個volatile型別的變數baseCount記錄元素的個數,當插入新資料或則刪除資料時,會通過addCount()方法更新baseCount,實現如下:
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
1、初始化時counterCells為空,在併發量很高時,如果存在兩個執行緒同時執行CAS修改baseCount值,則失敗的執行緒會繼續執行方法體中的邏輯,使用CounterCell記錄元素個數的變化;
2、如果CounterCell陣列counterCells為空,呼叫fullAddCount()方法進行初始化,並插入對應的記錄數,通過CAS設定cellsBusy欄位,只有設定成功的執行緒才能初始化CounterCell陣列,實現如下:
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
3、如果通過CAS設定cellsBusy欄位失敗的話,則繼續嘗試通過CAS修改baseCount欄位,如果修改baseCount欄位成功的話,就退出迴圈,否則繼續迴圈插入CounterCell物件;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
所以在1.8中的size實現比1.7簡單多,因為元素個數儲存baseCount中,部分元素的變化個數儲存在CounterCell陣列中,實現如下:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
通過累加baseCount和CounterCell陣列中的數量,即可得到元素的總個數;
通過測試1.8的效能在get和size時有明顯的效能優勢,put的效能與1.7相當
CompletableFuture
CompletableFuture類實現了CompletionStage和Future介面。Future是Java 5新增的類,用來描述一個非同步計算的結果,但是獲取一個結果時方法較少,要麼通過輪詢isDone,確認完成後,呼叫get()獲取值,要麼呼叫get()設定一個超時時間。但是這個get()方法會阻塞住呼叫執行緒,這種阻塞的方式顯然和我們的非同步程式設計的初衷相違背。
為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴充套件功能形成了CompletableFuture。
CompletionStage是一個介面,從命名上看得知是一個完成的階段,它裡面的方法也標明是在某個執行階段得到了結果之後要做的事情。
- 進行變換
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
首先說明一下已Async結尾的方法都是可以非同步執行的,如果指定了執行緒池,會在指定的執行緒池中執行,如果沒有指定,預設會在ForkJoinPool.commonPool()中執行,下文中將會有好多類似的,都不詳細解釋了。關鍵的入參只有一個Function,它是函式式介面,所以使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,返回值是經過轉化後結果。
例如:
@Test
public void thenApply() {
String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
System.out.println(result);
}
結果為:
hello world
- 進行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
thenAccept是針對結果進行消耗,因為他的入參是Consumer,有入參無返回值。
例如:
@Test
public void thenAccept(){
CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}
結果為:
hello world
- 對上一步的計算結果不關心,執行下一個操作。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenRun它的入參是一個Runnable的例項,表示當得到上一步的結果時的操作。
例如:
@Test
public void thenRun(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenRun(() -> System.out.println("hello world"));
while (true){}
}
結果為:
hello world
- 結合兩個CompletionStage的結果,進行轉化後返回
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
它需要原來的處理返回值,並且other代表的CompletionStage也要返回值之後,利用這兩個返回值,進行轉換後返回指定型別的值。
例如:
@Test
public void thenCombine() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> s1 + " " + s2).join();
System.out.println(result);
}
結果為:
hello world
- 結合兩個CompletionStage的結果,進行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
它需要原來的處理返回值,並且other代表的CompletionStage也要返回值之後,利用這兩個返回值,進行消耗。
例如:
@Test
public void thenAcceptBoth() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> System.out.println(s1 + " " + s2));
while (true){}
}
結果為:
hello world
- 在兩個CompletionStage都執行完執行。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
不關心這兩個CompletionStage的結果,只關心這兩個CompletionStage執行完畢,之後在進行操作(Runnable)。
例如:
@Test
public void runAfterBoth(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
while (true){}
}
結果為
hello world
- 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
我們現實開發場景中,總會碰到有兩種渠道完成同一個事情,所以就可以呼叫這個方法,找一個最快的結果進行處理。
例如:
@Test
public void applyToEither() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), s -> s).join();
System.out.println(result);
}
結果為:
hello world
- 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
例如:
@Test
public void acceptEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).acceptEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), System.out::println);
while (true){}
}
結果為:
hello world
- 兩個CompletionStage,任何一個完成了都會執行下一步的操作(Runnable)。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
例如:
@Test
public void runAfterEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
while (true) {
}
}
結果為:
hello world
- 當執行時出現了異常,可以通過exceptionally進行補償。
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
例如:
@Test
public void exceptionally() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
結果為:
java.lang.RuntimeException: 測試一下異常情況
hello world
- 當執行完成時,對結果的記錄。這裡的完成時有兩種情況,一種是正常執行,返回值。另外一種是遇到異常丟擲造成程式的中斷。這裡為什麼要說成記錄,因為這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常。所以不會對結果產生任何的作用。
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
例如:
@Test
public void whenComplete() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println(s);
System.out.println(t.getMessage());
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
結果為:
null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world
這裡也可以看出,如果使用了exceptionally,就會對最終的結果產生影響,它沒有口子返回如果沒有異常時的正確的值,這也就引出下面我們要介紹的handle。
- 執行完成時,對結果的處理。這裡的完成時有兩種情況,一種是正常執行,返回值。另外一種是遇到異常丟擲造成程式的中斷。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
例如:
出現異常時
@Test
public void handle() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//出現異常
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
結果為:
hello world
未出現異常時
@Test
public void handle() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
結果為:
s1
上面就是CompletionStage介面中方法的使用例項,CompletableFuture同樣也同樣實現了Future,所以也同樣可以使用get進行阻塞獲取值,總的來說,CompletableFuture使用起來還是比較爽的,看起來也比較優雅一點。