1. 程式人生 > >可迴圈使用的屏障CyclicBarrier(原始碼分析)

可迴圈使用的屏障CyclicBarrier(原始碼分析)

    前文有分析了發令槍的原理,這裡先總結一下CountDownLatch和CyclicBarrier的區別。

    1.發令槍是一次性的,無法重置,迴圈屏障可重複使用(reset)

    2.發令槍是在所有任務都執行結束統一退出的時候使用,迴圈屏障是還沒開始任務前統一步調的時候使用。

舉個例子,在計算銀行的交易總金額是我們需要使用多個執行緒去計算每個時間段的總金額,如果我們想要得知該銀行的一天的總金額,那就得使用發令槍保證所有時間段內的總金額被計算完畢後執行總金額的計算。如果我們希望計算的執行緒在同一時刻平行計算的話,就得使用屏障。下面有一個簡單例子,我沒有使用迴圈的功能:

  ExecutorService executor= Executors.newFixedThreadPool(5);
        final CyclicBarrier cyclicBarrier=new CyclicBarrier(5,new Runnable(){
            public void run() {
                System.out.println("所有運動員準備就緒");

            }
        });
        for (int i = 1; i < 6; i++) {
            executor.execute(new Runner(i,cyclicBarrier));
        }
        cyclicBarrier.reset();

輸出結果是:

1號運動員準備就緒
當前時間:2018-05-04 14:11:15
2號運動員準備就緒
當前時間:2018-05-04 14:11:16
3號運動員準備就緒
當前時間:2018-05-04 14:11:17
4號運動員準備就緒
當前時間:2018-05-04 14:11:18
5號運動員準備就緒
當前時間:2018-05-04 14:11:19
所有運動員準備就緒
5號運動員起跑時間:2018-05-04 14:11:19
5號運動員跑起來了
4號運動員起跑時間:2018-05-04 14:11:19
4號運動員跑起來了
3號運動員起跑時間:2018-05-04 14:11:19
3號運動員跑起來了
1號運動員起跑時間:2018-05-04 14:11:19
1號運動員跑起來了
2號運動員起跑時間:2018-05-04 14:11:19
2號運動員跑起來了

Runner的程式碼如下:

 static class Runner extends  Thread{
        int Number;
        CyclicBarrier cyclicBarrier;
        public Runner(int Number,CyclicBarrier cyclicBarrier){
            this.Number = Number;
            this.cyclicBarrier = cyclicBarrier;
        }
        public void run() {
            try {
                Thread.sleep(Number*1000); //為了使結果更清晰
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Number+"號運動員準備就緒");
            System.out.println("當前時間:"+sdf.format(new Date()));
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            doRun(Number);
        }
        private void doRun(int number) {
            System.out.println(Number+"號運動員起跑時間:"+sdf.format(new Date()));
            System.out.println(Number+"號運動員跑起來了");
        }
    }

從程式碼裡面可以清楚的觀察到,前4個運動員都停在了 cyclicBarrier.await();而一旦有5個運動員都準備就緒之後就可以開始跑步競賽了。因此掌握了它的構造和使await返回的原因就能基本瞭解它的實現原理。

初始化構造的時候我們傳入了一個執行緒物件,parties為5.CyclicBarrier內聚了一個Generation和持有了一個RentrantLock以及一個Condition,那麼它的實現原理猜都能猜到,就是通過判斷條件將未滿足條件時的執行緒加入到Condition的等待佇列中去,然後等到條件滿足後使用signalAll把所有等待佇列裡的執行緒喚醒。

 private static class Generation {
        boolean broken = false;
    }

    private final ReentrantLock lock = new ReentrantLock();

    private final Condition trip = lock.newCondition();

    private final int parties;

    private final Runnable barrierCommand;

    private Generation generation = new Generation();

先不管Generation時用來幹嘛的,我們到await方法去一探究竟:

return dowait(false, 0L);  

看來實際實行的dowait方法:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//加鎖
        try {
            final Generation g = generation;  //有一個**物件,先忽略

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();//響應中斷
                throw new InterruptedException();
            }

            int index = --count;   //count在構造的時候被傳入的parties賦值
            if (index == 0) {  // 滿足條件
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//這裡就是實現我們傳入的執行緒在滿足條件後第一個執行的條件
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)  //這裡若是仍然true,意味著上面的程式碼未執行成功
                        breakBarrier();
                }
            }

            
            for (;;) {//不滿足條件
                try {
                    if (!timed)   
                        trip.await();//因為await無引數,所以我們的程式會執行到這-----core
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//nanos=0,如果不為0的時候執行緒會等待一段時間都自行返回
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

看來猜想完全正確,接下來貼出nextGeneration的程式碼:

  private void nextGeneration() {

        trip.signalAll();  //core

        count = parties;
        generation = new Generation();  //可迴圈就是說的這個,每次喚醒完成之後重置狀態
    }

 breakBarrier()時用來響應中斷的,若是某處await執行的時候碰到了外部中斷,這時候會在該方法喚醒所有在等待佇列的執行緒。最後講一下reset函式,
 public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

實際上是理解成人工地未釋放當前狀態,然後重置回之前地狀態。

從另一方面看發令槍和可迴圈屏障都可以實現同一個功能,比如說統一步調一起走,對於發令槍來說可以線上程任務開始時使用await阻塞,然後在主執行緒countdown釋放所有執行緒。對於可迴圈屏障來說只要await放好就ok了。