1. 程式人生 > >JDK 原始碼解析 —— CyclicBarrier

JDK 原始碼解析 —— CyclicBarrier

一. 簡介 CyclicBarrier 是一個讓一系列執行緒集合互相等待直到一個公共屏障點(barrier point)的同步輔助工具。這個屏障被稱為迴圈屏障,是因為它可以在等待執行緒釋放後被重用。 CyclicBarrier 支援一個可選的 Runnable 命令,在最後一個執行緒到達後執行一次 Runnable 命令。 二. 簡單使用示例

CyclicBarrier(3) 等到 3 個執行緒都到了,這個物件還可以重用,而 CountDownLatch 則不能重用,從 Cyclic 名字就可以看出這個類物件可以迴圈使用

public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final  CyclicBarrier cb = new CyclicBarrier(3);//建立CyclicBarrier物件並設定3個公共屏障點
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("執行緒" + Thread.currentThread().getName() +
                                "即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");
                        cb.await();//到此如果沒有達到公共屏障點,則該執行緒處於等待狀態,如果達到公共屏障點則所有處於等待的執行緒都繼續往下執行

                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("執行緒" + Thread.currentThread().getName() +
                                "即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");
                        cb.await();
                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("執行緒" + Thread.currentThread().getName() +
                                "即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");
                        cb.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

三. CyclicBarrier 作用圖示 讓所有執行緒都執行到同一個點(屏障點)後,再繼續執行
四. 程式碼解析
  1. 重要變數
// 每次對柵欄的使用可以表現為一個 generation 例項。當條件 trip 改變或者重置 generation 也會
    // 隨之改變。可以有多個 generation 和使用柵欄的執行緒關聯,但是隻有一個可以獲得鎖。
    private static class Generation {
        boolean broken = false;
    }

    /** 守護柵欄入口的鎖 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 等待條件,直到所有執行緒到達柵欄 */
    private final Condition trip = lock.newCondition();
    /** 要屏障的執行緒數 */
    private final int parties;
    /* 當執行緒都到達柵欄,執行的 Runnable */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    //還要等待多少個執行緒到達。執行緒到達屏障點就減去 1。
    //每次新建 generation 的時候或者屏障 broken,count重新設定為 parties 引數值
    private int count;


  1. await() 方法:等待到所有參與的執行緒都到達屏障點。如果當前執行緒不是最後一個到達的,當前執行緒停止執行,進入睡眠,直到以下幾種情況發生
  • 最後的執行緒到達
  • 其他執行緒中斷當前執行緒
  • 其他執行緒中斷中斷等待執行緒中的一條
  • 在等待所有執行緒到達屏障前有執行緒超時
  • 其他執行緒在此屏障中呼叫 reset(將屏障設定為初始狀態)
如果當前執行緒:
  • 設定了中斷狀態
  • 在等待時中斷
那麼,就會丟擲 InterruptedException,並且當前執行緒中斷狀態被清除。 如果在任何執行緒等待過程中屏障被重置(即呼叫 reset() 方法),那麼所有的執行緒都會丟擲 BrokenBarrierException,並且這個屏障置於 broken 狀態。 如果當前執行緒是最後一個到達屏障的執行緒,並且屏障的構造器傳入了 Runnable 引數,那麼在其他執行緒執行前,先執行 Runnable。如果在屏障執行中發生了異常,那麼異常會在當前執行緒中被傳播,屏障將被置於 broken 狀態。 返回值:返回當前執行緒到達的下標 
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();// 加了鎖,以下操作為執行緒安全操作
    try {
        final Generation g = generation;

        if (g.broken)  // 如果屏障狀態 broken,則丟擲屏障 broken 異常
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       int index = --count;
       if (index == 0) {  // tripped 說明是最後一個到達的執行緒
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null) // 如果有 Runnable,先執行
                   command.run();
               ranAction = true;
               nextGeneration();// 喚醒 Condition 佇列的所有執行緒,既然是 Cyclic 的,所以也會重置狀態以便重用屏障,這是和 CountDownLatch 的區別
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {// 如果不是最後一個到達的執行緒,就進入迴圈等待
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

五. 總結 CyclicBarrier 是利用了 Condition 介面,定義了一個叫做 trip 的 Condition,當所有執行緒到達後執行緒才能從 Condition 佇列中移到 AQS 的等待佇列繼續執行。關於 Condition,可以參考博主的另一篇博文:http://blog.csdn.net/wenniuwuren/article/details/51447767 六. 參考資料 JDK 7 原始碼