1. 程式人生 > >Java8總結之併發增強

Java8總結之併發增強

java8中對併發進行了一些增強優化。簡單總結一下

原子值

從 Java5 開始,java.util.concurrent.atonic包提供了用於支援無鎖可變變數的類。

/**
 * 測試java8的併發增強
 * @author lianghaining
 *
 */
public class TestThread {

    public static AtomicLong num = new AtomicLong();

    @Test
    public void testCAS() {

        Long id = num.incrementAndGet();

    }

}

incrementAndGet 方法會自動將AtomicLong 的值加 1,並返回增加後的值,並保證該操作不能被打斷。同時在多執行緒同時併發訪問同一個例項,也能夠計算並返回正確的值。
在 java5 中提供很多設定、增加、減少值的原子操作。但如果你想要進行更復雜的更新操作,就必須使用compareAndSet方法。看下面的程式碼

    public static AtomicLong num = new AtomicLong();
    @Test
    public void testGetMaxNum() throws InterruptedException {
        Thread t1 = new
Thread(new LoopVolatile()); t1.start(); Thread t2 = new Thread(new LoopVolatile2()); t2.start(); while (t1.isAlive() || t2.isAlive()) { } System.out.println("final val is: " + num.get()); } private static class LoopVolatile implements Runnable
{
public void run() { long val = 0; while (val < 10000000L) { num.set(num.get()+1L); val++; } } } private static class LoopVolatile2 implements Runnable { public void run() { long val = 0; while (val < 10000000L) { num.set(num.get()+1L); val++; } } } 返回結果: final val is: 8596441 final val is: 8379896

看返回結果可知如果是執行緒安全的應為:20000000,但不是,而且每一次執行都不一樣。這是因為 num.set(num.get()+1L)並不是原子性的。可以看AtomicLong 中set方法。

    private volatile long value; 
    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(long newValue) {
        value = newValue;
    }

可以看出set只保證了可見性,並不能保證原子性。有興趣可以看另一個人的部落格為什麼volatile不能保證原子性而Atomic可以你應該在一個迴圈裡使用comparenAndSet來計算新值
如下:

     private static class LoopVolatile implements Runnable {
            public void run() {
                long val = 0;
                while (val < 10000000L) {
                    //num.set(num.get()+1L);
                    long update ;
                    long oldValue ;
                    do{
                        oldValue = num.get();
                        update = oldValue + 1;
                    }while(!num.compareAndSet(oldValue, update));
                    //num.updateAndGet(x->(x+1));
                    val++;
                }
            }
        }

        private static class LoopVolatile2 implements Runnable {
            public void run() {
                long val = 0;
                while (val < 10000000L) {
                    //num.set(num.get()+1L);
                    long update ;
                    long oldValue ;
                    do{
                        oldValue = num.get();
                        update = oldValue + 1;
                    }while(!num.compareAndSet(oldValue, update));
                    //num.updateAndGet(x->(x+1));
                    val++;
                }
            }
        }
        返回結果:final val is: 20000000

如果另一個執行緒也在更新num,很可能它已經先更新成功了。那麼隨後compareAndSet會返回false,並不會設定新值。此時程式再次嘗試迴圈,讀取更新後的值並試圖改變它。最終,它成功地將已有值替換為新值。這遠比有鎖要快得多。
Now。在Java8中,你不必再編寫迴圈了,只需要提供一個用來更新值的lambda表示式,更新操作自動完成。如下

 private static class LoopVolatile implements Runnable {
            public void run() {
                long val = 0;
                while (val < 10000000L) {
                    num.updateAndGet(x->(x+1));
                    //或num.accumulateAndGet(1,(x,y)-> x+y); 
                    val++;
                }
            }
        }

        private static class LoopVolatile2 implements Runnable {
            public void run() {
                long val = 0;
                while (val < 10000000L) {
                    num.updateAndGet(x->(x+1));
                    //或num.accumulateAndGet(1,(x,y)-> x+y); 
                    val++;
                }
            }
        }

除了它之外,Java 8還提供了返回原始值的 getAndUpdate 方法和 getAndAccumulate 方法。

當你有大量執行緒訪問同一個原子值時,由於AtomicXXX的樂觀鎖更新需要太多次重試,因此會導致效能嚴重下降。為此,Java8 提供了LongAdder 和 LongAccumulator 來解決該問題。這兩個原碼我以後單獨說一下,如果感興趣可以看一下
從LONGADDER看更高效的無鎖實現 皓哥的部落格有介紹過。
這裡寫說結論:LongAdder 由多個變數組成,這些變數累加的值即為當前值。多個執行緒可以更新不同的被加數,當執行緒數量增加時,會自動增加新的被加數。由於通常情況下都是直到所有工作完成後才需要總和值,所以這種方法效率很高。
如果你的業務存在高度競爭,那麼應該選擇 LongAdder 來代替AtomicLong.
LongAccumulator 將這個思想帶到了任意的累加操作中。如下

LongAccumulator adder = new LongAccumulator(Long::sum, 0);
與
LongAdder是一樣的

ConcurrentHashMap改進

在多執行緒中ConcurrentHashMap是常用的一個型別,在1.8相對於1.7有很大的不同。無論是雜湊衝突下大數量查詢時雙向連結串列的效率低下,還是在多執行緒下,分段鎖實現的複雜。1.8中都做了不同的實現。

1. 1.7實現

資料結構

jdk1.7中採用Segment + HashEntry的方式進行實現,結構如下:
enter image description here

ConcurrentHashMap初始化時,計算出Segment陣列的大小ssize和每個Segment中HashEntry陣列的大小cap,並初始化Segment陣列的第一個元素;其中ssize大小為2的冪次方,預設為16,cap大小也是2的冪次方,最小值為2,最終結果根據根據初始化容量initialCapacity進行計算,計算過程如下:

if (c * ssize < initialCapacity)
    ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
    cap <<= 1;

其中Segment在實現上繼承了ReentrantLock,這樣就自帶了鎖的功能。
put實現
當執行put方法插入資料時,根據key的hash值,在Segment陣列中找到相應的位置,如果相應位置的Segment還未初始化,則通過CAS進行賦值,接著執行Segment物件的put方法通過加鎖機制插入資料,實現如下:

場景:執行緒A和執行緒B同時執行相同Segment物件的put方法

1、執行緒A執行tryLock()方法成功獲取鎖,則把HashEntry物件插入到相應的位置;
2、執行緒B獲取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重複執行tryLock()方法嘗試獲取鎖,在多處理器環境下,重複次數為64,單處理器重複次數為1,當執行tryLock()方法的次數超過上限時,則執行lock()方法掛起執行緒B;
3、當執行緒A執行完插入操作時,會通過unlock()方法釋放鎖,接著喚醒執行緒B繼續執行;

size實現
因為ConcurrentHashMap是可以併發插入資料的,所以在準確計算元素時存在一定的難度,一般的思路是統計每個Segment物件中的元素個數,然後進行累加,但是這種方式計算出來的結果並不一樣的準確的,因為在計算後面幾個Segment的元素個數時,已經計算過的Segment同時可能有資料的插入或則刪除,在1.7的實現中,採用瞭如下方式:

try {
    for (;;) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0)
                    overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}

先採用不加鎖的方式,連續計算元素的個數,最多計算3次:
1、如果前後兩次計算結果相同,則說明計算出來的元素個數是準確的;
2、如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數;

2.1.8實現

資料結構
1.8中放棄了Segment臃腫的設計,取而代之的是採用Node + CAS + Synchronized來保證併發安全進行實現,結構如下:
enter image description here
只有在執行第一次put方法時才會呼叫initTable()初始化Node陣列,實現如下:

  /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

put實現
當執行put方法插入資料時,根據key的hash值,在Node陣列中找到相應的位置,實現如下:

 final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

1、如果相應位置的Node還未初始化,則通過CAS插入相應的資料;

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}

2、如果相應位置的Node不為空,且當前該節點不處於移動狀態,則對該節點加synchronized鎖,如果該節點的hash不小於0,則遍歷連結串列更新節點或插入新節點;

if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
        K ek;
        if (e.hash == hash &&
            ((ek = e.key) == key ||
             (ek != null && key.equals(ek)))) {
            oldVal = e.val;
            if (!onlyIfAbsent)
                e.val = value;
            break;
        }
        Node<K,V> pred = e;
        if ((e = e.next) == null) {
            pred.next = new Node<K,V>(hash, key, value, null);
            break;
        }
    }
}

3、如果該節點是TreeBin型別的節點,說明是紅黑樹結構,則通過putTreeVal方法往紅黑樹中插入節點;

else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
        oldVal = p.val;
        if (!onlyIfAbsent)
            p.val = value;
    }
}

4、如果binCount不為0,說明put操作對資料產生了影響,如果當前連結串列的個數達到8個,則通過treeifyBin方法轉化為紅黑樹,如果oldVal不為空,說明是一次更新操作,沒有對元素個數產生影響,則直接返回舊值;

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

5、如果插入的是一個新節點,則執行addCount()方法嘗試更新元素個數baseCount;
size實現
1.8中使用一個volatile型別的變數baseCount記錄元素的個數,當插入新資料或則刪除資料時,會通過addCount()方法更新baseCount,實現如下:

if ((as = counterCells) != null ||
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}

1、初始化時counterCells為空,在併發量很高時,如果存在兩個執行緒同時執行CAS修改baseCount值,則失敗的執行緒會繼續執行方法體中的邏輯,使用CounterCell記錄元素個數的變化;

2、如果CounterCell陣列counterCells為空,呼叫fullAddCount()方法進行初始化,並插入對應的記錄數,通過CAS設定cellsBusy欄位,只有設定成功的執行緒才能初始化CounterCell陣列,實現如下:

else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

3、如果通過CAS設定cellsBusy欄位失敗的話,則繼續嘗試通過CAS修改baseCount欄位,如果修改baseCount欄位成功的話,就退出迴圈,否則繼續迴圈插入CounterCell物件;

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    break;

所以在1.8中的size實現比1.7簡單多,因為元素個數儲存baseCount中,部分元素的變化個數儲存在CounterCell陣列中,實現如下:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

通過累加baseCount和CounterCell陣列中的數量,即可得到元素的總個數;
通過測試1.8的效能在get和size時有明顯的效能優勢,put的效能與1.7相當

CompletableFuture

CompletableFuture類實現了CompletionStage和Future介面。Future是Java 5新增的類,用來描述一個非同步計算的結果,但是獲取一個結果時方法較少,要麼通過輪詢isDone,確認完成後,呼叫get()獲取值,要麼呼叫get()設定一個超時時間。但是這個get()方法會阻塞住呼叫執行緒,這種阻塞的方式顯然和我們的非同步程式設計的初衷相違背。
為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴充套件功能形成了CompletableFuture。

CompletionStage是一個介面,從命名上看得知是一個完成的階段,它裡面的方法也標明是在某個執行階段得到了結果之後要做的事情。

  1. 進行變換
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

首先說明一下已Async結尾的方法都是可以非同步執行的,如果指定了執行緒池,會在指定的執行緒池中執行,如果沒有指定,預設會在ForkJoinPool.commonPool()中執行,下文中將會有好多類似的,都不詳細解釋了。關鍵的入參只有一個Function,它是函式式介面,所以使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,返回值是經過轉化後結果。
例如:

    @Test
 public void thenApply() {
   String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
   System.out.println(result);
   }

結果為:

hello world
  1. 進行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

thenAccept是針對結果進行消耗,因為他的入參是Consumer,有入參無返回值。
例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}

結果為:

hello world
  1. 對上一步的計算結果不關心,執行下一個操作。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun它的入參是一個Runnable的例項,表示當得到上一步的結果時的操作。
例如:

   @Test
    public void thenRun(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(() -> System.out.println("hello world"));
        while (true){}
    }

結果為:

hello world 
  1. 結合兩個CompletionStage的結果,進行轉化後返回
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

它需要原來的處理返回值,並且other代表的CompletionStage也要返回值之後,利用這兩個返回值,進行轉換後返回指定型別的值。
例如:

 @Test
    public void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }

結果為:

hello world
  1. 結合兩個CompletionStage的結果,進行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

它需要原來的處理返回值,並且other代表的CompletionStage也要返回值之後,利用這兩個返回值,進行消耗。
例如:

   @Test
    public void thenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> System.out.println(s1 + " " + s2));
        while (true){}
    }

結果為:

hello world
  1. 在兩個CompletionStage都執行完執行。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

不關心這兩個CompletionStage的結果,只關心這兩個CompletionStage執行完畢,之後在進行操作(Runnable)。
例如:

   @Test
    public void runAfterBoth(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true){}
    }

結果為

hello world
  1. 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

我們現實開發場景中,總會碰到有兩種渠道完成同一個事情,所以就可以呼叫這個方法,找一個最快的結果進行處理。
例如:

 @Test
    public void applyToEither() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), s -> s).join();
        System.out.println(result);
    }

結果為:

hello world
  1. 兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

例如:

  @Test
    public void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }

結果為:

hello world
  1. 兩個CompletionStage,任何一個完成了都會執行下一步的操作(Runnable)。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

例如:

    @Test
    public void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }

結果為:

hello world
  1. 當執行時出現了異常,可以通過exceptionally進行補償。
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

例如:

   @Test
    public void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結果為:

java.lang.RuntimeException: 測試一下異常情況
hello world
  1. 當執行完成時,對結果的記錄。這裡的完成時有兩種情況,一種是正常執行,返回值。另外一種是遇到異常丟擲造成程式的中斷。這裡為什麼要說成記錄,因為這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常。所以不會對結果產生任何的作用。
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

例如:

    @Test
    public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結果為:

null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world

這裡也可以看出,如果使用了exceptionally,就會對最終的結果產生影響,它沒有口子返回如果沒有異常時的正確的值,這也就引出下面我們要介紹的handle。

  1. 執行完成時,對結果的處理。這裡的完成時有兩種情況,一種是正常執行,返回值。另外一種是遇到異常丟擲造成程式的中斷。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

例如:
出現異常時

    @Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現異常
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結果為:

hello world

未出現異常時

   @Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結果為:

s1

上面就是CompletionStage介面中方法的使用例項,CompletableFuture同樣也同樣實現了Future,所以也同樣可以使用get進行阻塞獲取值,總的來說,CompletableFuture使用起來還是比較爽的,看起來也比較優雅一點。