1. 程式人生 > 其它 >CyclicBarrier 的使用與原始碼解析

CyclicBarrier 的使用與原始碼解析

使用

CyclicBarrier 也可以實現類似 CountDownLatch 的功能,而且比 CountDownLatch 更強大,因為 CyclicBarrier 可以重複被使用。

程式碼示例:

@Test
public void test() throws InterruptedException {
    int parties = 3;

    // 定義一個執行緒池
    // CyclicBarrier 中執行緒執行完成會阻塞等待其它執行緒到達屏障點,所以可用執行緒至少需要 parties 個
    ExecutorService executor = Executors.newFixedThreadPool(6);

    CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> {
        System.out.println("任務都到達了屏障點,主執行緒繼續執行任務");
    });

    IntStream.range(0, parties).forEach(i -> {
        executor.submit(() -> {
            System.out.println("條件" + (i + 1) + "正在執行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("條件" + (i + 1) + "執行完成,到達屏障點");

            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    });

    // parties 個執行緒到達屏障點後,接著執行構造方法傳遞的 barrierAction 任務
    // 然後會呼叫 nextGeneration() 方法,喚醒所有到達屏障點的執行緒,再重置 count 的值為 parties
    // 所以可以接著執行下面的條件4 ~ 條件6的執行緒
    // 但是沒有控制執行緒執行的順序,所以並不是條件1~條件3這三個執行緒先執行,條件4 ~ 條件6這三個執行緒後執行的

    IntStream.range(parties, parties + parties).forEach(i -> {
        executor.submit(() -> {
            System.out.println("條件" + (i + 1) + "正在執行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("條件" + (i + 1) + "執行完成,到達屏障點");

            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    });

    Thread.sleep(10000);
    executor.shutdown();

    /**
         * 可能的執行結果:
         *
         * 條件1正在執行
         * 條件2正在執行
         * 條件3正在執行
         * 條件4正在執行
         * 條件6正在執行
         * 條件5正在執行
         * 條件6執行完成,到達屏障點
         * 條件5執行完成,到達屏障點
         * 條件3執行完成,到達屏障點
         * 條件4執行完成,到達屏障點
         * 任務都到達了屏障點,主執行緒繼續執行任務
         * 條件1執行完成,到達屏障點
         * 條件2執行完成,到達屏障點
         * 任務都到達了屏障點,主執行緒繼續執行任務
         */
}

reset() 方法程式碼示例:

@Test
public void test2() throws InterruptedException {
    int parties = 3;

    // 定義一個執行緒池
    // CyclicBarrier 中執行緒執行完成會阻塞等待其它執行緒到達屏障點,所以可用執行緒至少需要 parties 個
    ExecutorService executor = Executors.newFixedThreadPool(6);

    CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> {
        System.out.println("任務都到達了屏障點,主執行緒繼續執行任務");
    });

    IntStream.range(0, parties).forEach(i -> {
        executor.submit(() -> {
            System.out.println("條件" + (i + 1) + "正在執行");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("條件" + (i + 1) + "執行完成,到達屏障點");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    });

    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    // 呼叫 reset 方法會將當前代終止,如果主執行緒呼叫 reset 方法時,有執行緒阻塞在 dowait 方法中
    // 那麼已經被阻塞的執行緒被喚醒時,會丟擲 BrokenBarrierException 異常,未阻塞的執行緒不會跑出異常

    // 測試 reset() 方法
    System.out.println("執行 reset 方法");
    cyclicBarrier.reset();

    IntStream.range(parties, parties + parties).forEach(i -> {
        executor.submit(() -> {
            System.out.println("條件" + (i + 1) + "正在執行");
            try {
                Thread.sleep(i * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("條件" + (i + 1) + "執行完成,到達屏障點");

            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    });

    Thread.sleep(10000);
    executor.shutdown();

    /**
         * 可能的執行結果 1:
         *
         * 條件1正在執行
         * 條件3正在執行
         * 條件2正在執行
         * 執行 reset 方法
         * 條件3執行完成,到達屏障點
         * 條件1執行完成,到達屏障點
         * java.util.concurrent.BrokenBarrierException
         * 	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
         * 	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
         * 	at com.github.wuchao.webdemo.core.concurrent.CyclicBarrierDemo.lambda$null$6(CyclicBarrierDemo.java:116)
         * 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         * 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         * 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         * 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         * 	at java.lang.Thread.run(Thread.java:748)
         * 條件5正在執行
         * 條件6正在執行
         * 條件4正在執行
         * 條件2執行完成,到達屏障點
         * 條件4執行完成,到達屏障點
         * 任務都到達了屏障點,主執行緒繼續執行任務
         * 條件5執行完成,到達屏障點
         * 條件6執行完成,到達屏障點
         *
         *
         * 可能的執行結果2:
         *
         * 條件1正在執行
         * 條件3正在執行
         * 條件2正在執行
         * 執行 reset 方法
         * 條件4正在執行
         * 條件5正在執行
         * 條件2執行完成,到達屏障點
         * 條件3執行完成,到達屏障點
         * 條件1執行完成,到達屏障點
         * 任務都到達了屏障點,主執行緒繼續執行任務
         * 條件6正在執行
         * 條件4執行完成,到達屏障點
         * 條件5執行完成,到達屏障點
         * 條件6執行完成,到達屏障點
         * 任務都到達了屏障點,主執行緒繼續執行任務
         */
}

原始碼

屬性

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

/** The number of parties */
// 任務執行緒數(這些執行緒全部執行完成後(即到達屏障點),才能繼續執行主執行緒任務)
private final int parties;

/* The command to run when tripped */
// 每當 parties 個任務執行緒都完成後(即 parties 個執行緒都到達了屏障點),會執行一次 barrierCommand 任務
// 建構函式如果不傳 barrierCommand,則不執行
private final Runnable barrierCommand;

/** The current generation */
// 當前代
private Generation generation = new Generation();

/**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
// parties 執行緒中,當前正在執行的任務執行緒數(即當前還有多少執行緒未到達屏障點)
// 每執行完一個任務後,count 減一
private int count;

Generation

private static class Generation {
    boolean broken = false;
}

// 建立下一代
private void nextGeneration() {
    // 喚醒上一代所有阻塞在 trip 條件的執行緒
    trip.signalAll();
    // 重置 count 為 parties
    count = parties;
    // 重置當前代
    generation = new Generation();
}

建構函式

// parties 表示有 parties 個執行緒任務全部執行完後(即到達屏障點),主執行緒才能繼續執行
// barrierAction 表示所有 parties 執行緒都到達屏障點後,需要執行的一個任務
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

breakBarrier

// 終止屏障當前代
private void breakBarrier() {
    // 設定當前代的終止標記為 true
    generation.broken = true;
    // 重置 count
    count = parties;
    // 喚醒所有阻塞在 trip 條件的執行緒
    trip.signalAll();
}

await

// 阻塞當前執行緒(不帶超時時間)
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

// 阻塞當前執行緒(帶超時時間)
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
BrokenBarrierException,
TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

// 真正阻塞執行緒的邏輯
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    // 先獲取鎖
    lock.lock();
    try {
        // 獲取當前代
        final Generation g = generation;

        // 當前代已被終止
        if (g.broken)
            // 則丟擲 BrokenBarrierException 異常
            throw new BrokenBarrierException();

        // 執行緒被中斷
        if (Thread.interrupted()) {
            // 則終止屏障
            breakBarrier();
            // 並丟擲 InterruptedException 異常
            throw new InterruptedException();
        }

        // count 自減 1
        int index = --count;
        // index 等於 0 表示 parties 個執行緒都已完成了(都到達了屏障點)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 獲取構造方法傳的 barrierCommand
                final Runnable command = barrierCommand;
                // 如果 barrierCommand 不等於 null
                if (command != null)
                    // 則執行 barrierCommand 的 run() 方法(最後一個到達屏障點的執行緒執行的此方法)
                    command.run();
                ranAction = true;
                // 開始下一代
                nextGeneration();
                return 0;
            } finally {
                // 如果執行 barrierCommand 出現異常
                if (!ranAction)
                    // 則也終止屏障
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 自旋等待,除非所有執行緒都到達了屏障點、屏障被終止、執行緒被中斷或者等待超時
        for (;;) {
            try {
                // 沒有設定超時時間
                if (!timed)
                    // 則阻塞等待
                    trip.await();
                else if (nanos > 0L)
                    // 設定了超時時間,超時等待
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            // 判斷當前代是否已被終止
            // 如果呼叫了 breakBarrier() 方法或 reset() 方法(reset 方法裡也是呼叫了 breakBarrier 方法)會喚醒當前阻塞的執行緒,然後走到這一步,此時 g.broken 是 true
            if (g.broken)
                // 丟擲 BrokenBarrierException 異常
                throw new BrokenBarrierException();

            // g != generation,表示已經建立了下一代
            // 到達屏障點,會喚醒所有阻塞的執行緒,並呼叫 nextGeneration() 方法,然後走到這一步,此時 g != generation
            if (g != generation)
                return index;

            // 等待已超時
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

程式碼中有好幾處都丟擲了異常,總結一下:

  • CyclicBarrier 被終止了,在進入 dowait 方法前和後都會判斷是否終止,終止會丟擲 BrokenBarrierException 異常。終止可以通過呼叫 breakBarrier 方法、 reset 方法(底層也是呼叫了 breakBarrier 方法 )或者 dowait 方法中會執行緒等待超時呼叫 breakBarrier 方法。
  • 執行緒被中斷。在進入 dowait 方法會判斷執行緒是否被中斷,中斷會丟擲 BrokenBarrierException 異常。
  • 執行緒都到達屏障點時,會執行 barrierCommand 任務,如果 barrierCommand 丟擲了異常,會捕獲異常並呼叫 breakBarrier 方法。

isBroken

// 判斷屏障當前代是否已被終止
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

reset

public void reset() {
    final ReentrantLock lock = this.lock;
    // 先獲取鎖
    lock.lock();
    try {
        // 終止屏障當前代
        breakBarrier();   // break the current generation
        // 開始下一代
        nextGeneration(); // start a new generation
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}