CyclicBarrier 的使用與原始碼解析
阿新 • • 發佈:2021-12-05
使用
CyclicBarrier 也可以實現類似 CountDownLatch 的功能,而且比 CountDownLatch 更強大,因為 CyclicBarrier 可以重複被使用。
程式碼示例:
@Test public void test() throws InterruptedException { int parties = 3; // 定義一個執行緒池 // CyclicBarrier 中執行緒執行完成會阻塞等待其它執行緒到達屏障點,所以可用執行緒至少需要 parties 個 ExecutorService executor = Executors.newFixedThreadPool(6); CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> { System.out.println("任務都到達了屏障點,主執行緒繼續執行任務"); }); IntStream.range(0, parties).forEach(i -> { executor.submit(() -> { System.out.println("條件" + (i + 1) + "正在執行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("條件" + (i + 1) + "執行完成,到達屏障點"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); // parties 個執行緒到達屏障點後,接著執行構造方法傳遞的 barrierAction 任務 // 然後會呼叫 nextGeneration() 方法,喚醒所有到達屏障點的執行緒,再重置 count 的值為 parties // 所以可以接著執行下面的條件4 ~ 條件6的執行緒 // 但是沒有控制執行緒執行的順序,所以並不是條件1~條件3這三個執行緒先執行,條件4 ~ 條件6這三個執行緒後執行的 IntStream.range(parties, parties + parties).forEach(i -> { executor.submit(() -> { System.out.println("條件" + (i + 1) + "正在執行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("條件" + (i + 1) + "執行完成,到達屏障點"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); Thread.sleep(10000); executor.shutdown(); /** * 可能的執行結果: * * 條件1正在執行 * 條件2正在執行 * 條件3正在執行 * 條件4正在執行 * 條件6正在執行 * 條件5正在執行 * 條件6執行完成,到達屏障點 * 條件5執行完成,到達屏障點 * 條件3執行完成,到達屏障點 * 條件4執行完成,到達屏障點 * 任務都到達了屏障點,主執行緒繼續執行任務 * 條件1執行完成,到達屏障點 * 條件2執行完成,到達屏障點 * 任務都到達了屏障點,主執行緒繼續執行任務 */ }
reset() 方法程式碼示例:
@Test public void test2() throws InterruptedException { int parties = 3; // 定義一個執行緒池 // CyclicBarrier 中執行緒執行完成會阻塞等待其它執行緒到達屏障點,所以可用執行緒至少需要 parties 個 ExecutorService executor = Executors.newFixedThreadPool(6); CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> { System.out.println("任務都到達了屏障點,主執行緒繼續執行任務"); }); IntStream.range(0, parties).forEach(i -> { executor.submit(() -> { System.out.println("條件" + (i + 1) + "正在執行"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("條件" + (i + 1) + "執行完成,到達屏障點"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 呼叫 reset 方法會將當前代終止,如果主執行緒呼叫 reset 方法時,有執行緒阻塞在 dowait 方法中 // 那麼已經被阻塞的執行緒被喚醒時,會丟擲 BrokenBarrierException 異常,未阻塞的執行緒不會跑出異常 // 測試 reset() 方法 System.out.println("執行 reset 方法"); cyclicBarrier.reset(); IntStream.range(parties, parties + parties).forEach(i -> { executor.submit(() -> { System.out.println("條件" + (i + 1) + "正在執行"); try { Thread.sleep(i * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("條件" + (i + 1) + "執行完成,到達屏障點"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); }); Thread.sleep(10000); executor.shutdown(); /** * 可能的執行結果 1: * * 條件1正在執行 * 條件3正在執行 * 條件2正在執行 * 執行 reset 方法 * 條件3執行完成,到達屏障點 * 條件1執行完成,到達屏障點 * java.util.concurrent.BrokenBarrierException * at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250) * at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362) * at com.github.wuchao.webdemo.core.concurrent.CyclicBarrierDemo.lambda$null$6(CyclicBarrierDemo.java:116) * at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) * at java.util.concurrent.FutureTask.run(FutureTask.java:266) * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) * at java.lang.Thread.run(Thread.java:748) * 條件5正在執行 * 條件6正在執行 * 條件4正在執行 * 條件2執行完成,到達屏障點 * 條件4執行完成,到達屏障點 * 任務都到達了屏障點,主執行緒繼續執行任務 * 條件5執行完成,到達屏障點 * 條件6執行完成,到達屏障點 * * * 可能的執行結果2: * * 條件1正在執行 * 條件3正在執行 * 條件2正在執行 * 執行 reset 方法 * 條件4正在執行 * 條件5正在執行 * 條件2執行完成,到達屏障點 * 條件3執行完成,到達屏障點 * 條件1執行完成,到達屏障點 * 任務都到達了屏障點,主執行緒繼續執行任務 * 條件6正在執行 * 條件4執行完成,到達屏障點 * 條件5執行完成,到達屏障點 * 條件6執行完成,到達屏障點 * 任務都到達了屏障點,主執行緒繼續執行任務 */ }
原始碼
屬性
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ // 任務執行緒數(這些執行緒全部執行完成後(即到達屏障點),才能繼續執行主執行緒任務) private final int parties; /* The command to run when tripped */ // 每當 parties 個任務執行緒都完成後(即 parties 個執行緒都到達了屏障點),會執行一次 barrierCommand 任務 // 建構函式如果不傳 barrierCommand,則不執行 private final Runnable barrierCommand; /** The current generation */ // 當前代 private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ // parties 執行緒中,當前正在執行的任務執行緒數(即當前還有多少執行緒未到達屏障點) // 每執行完一個任務後,count 減一 private int count;
Generation
private static class Generation {
boolean broken = false;
}
// 建立下一代
private void nextGeneration() {
// 喚醒上一代所有阻塞在 trip 條件的執行緒
trip.signalAll();
// 重置 count 為 parties
count = parties;
// 重置當前代
generation = new Generation();
}
建構函式
// parties 表示有 parties 個執行緒任務全部執行完後(即到達屏障點),主執行緒才能繼續執行
// barrierAction 表示所有 parties 執行緒都到達屏障點後,需要執行的一個任務
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
breakBarrier
// 終止屏障當前代
private void breakBarrier() {
// 設定當前代的終止標記為 true
generation.broken = true;
// 重置 count
count = parties;
// 喚醒所有阻塞在 trip 條件的執行緒
trip.signalAll();
}
await
// 阻塞當前執行緒(不帶超時時間)
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 阻塞當前執行緒(帶超時時間)
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
// 真正阻塞執行緒的邏輯
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 先獲取鎖
lock.lock();
try {
// 獲取當前代
final Generation g = generation;
// 當前代已被終止
if (g.broken)
// 則丟擲 BrokenBarrierException 異常
throw new BrokenBarrierException();
// 執行緒被中斷
if (Thread.interrupted()) {
// 則終止屏障
breakBarrier();
// 並丟擲 InterruptedException 異常
throw new InterruptedException();
}
// count 自減 1
int index = --count;
// index 等於 0 表示 parties 個執行緒都已完成了(都到達了屏障點)
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 獲取構造方法傳的 barrierCommand
final Runnable command = barrierCommand;
// 如果 barrierCommand 不等於 null
if (command != null)
// 則執行 barrierCommand 的 run() 方法(最後一個到達屏障點的執行緒執行的此方法)
command.run();
ranAction = true;
// 開始下一代
nextGeneration();
return 0;
} finally {
// 如果執行 barrierCommand 出現異常
if (!ranAction)
// 則也終止屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 自旋等待,除非所有執行緒都到達了屏障點、屏障被終止、執行緒被中斷或者等待超時
for (;;) {
try {
// 沒有設定超時時間
if (!timed)
// 則阻塞等待
trip.await();
else if (nanos > 0L)
// 設定了超時時間,超時等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 判斷當前代是否已被終止
// 如果呼叫了 breakBarrier() 方法或 reset() 方法(reset 方法裡也是呼叫了 breakBarrier 方法)會喚醒當前阻塞的執行緒,然後走到這一步,此時 g.broken 是 true
if (g.broken)
// 丟擲 BrokenBarrierException 異常
throw new BrokenBarrierException();
// g != generation,表示已經建立了下一代
// 到達屏障點,會喚醒所有阻塞的執行緒,並呼叫 nextGeneration() 方法,然後走到這一步,此時 g != generation
if (g != generation)
return index;
// 等待已超時
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 釋放鎖
lock.unlock();
}
}
程式碼中有好幾處都丟擲了異常,總結一下:
- CyclicBarrier 被終止了,在進入 dowait 方法前和後都會判斷是否終止,終止會丟擲 BrokenBarrierException 異常。終止可以通過呼叫 breakBarrier 方法、 reset 方法(底層也是呼叫了 breakBarrier 方法 )或者 dowait 方法中會執行緒等待超時呼叫 breakBarrier 方法。
- 執行緒被中斷。在進入 dowait 方法會判斷執行緒是否被中斷,中斷會丟擲 BrokenBarrierException 異常。
- 執行緒都到達屏障點時,會執行 barrierCommand 任務,如果 barrierCommand 丟擲了異常,會捕獲異常並呼叫 breakBarrier 方法。
isBroken
// 判斷屏障當前代是否已被終止
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
reset
public void reset() {
final ReentrantLock lock = this.lock;
// 先獲取鎖
lock.lock();
try {
// 終止屏障當前代
breakBarrier(); // break the current generation
// 開始下一代
nextGeneration(); // start a new generation
} finally {
// 釋放鎖
lock.unlock();
}
}