Java併發(十三):同步屏障CyclicBarrier
阿新 • • 發佈:2018-11-19
CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。
一、應用舉例
public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier; static class CyclicBarrierThread extends Thread { public void run() { System.out.println(Thread.currentThread().getName()+ "到了"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Overridepublic void run() { System.out.println("人到齊了,開會吧...."); } }); for (int i = 0; i < 5; i++) { new CyclicBarrierThread().start(); } } }
二、類結構
public class CyclicBarrier { private static class Generation { // 內部類,當有parties個執行緒到達barrier,就會更新換代boolean broken = false; // 是否損壞 } private final ReentrantLock lock = new ReentrantLock(); // 重入鎖 private final Condition trip = lock.newCondition(); private final int parties; // 等待執行緒總數量 private final Runnable barrierCommand; // 達到等待執行緒數量後執行的執行緒 private Generation generation = new Generation(); // 當有parties個執行緒到達barrier,就會更新換代 private int count; // 記錄當前執行緒數量 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } }
三、原理解析
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); // 代失效,喚醒所有執行緒 throw new InterruptedException(); } int index = --count; // 計數 if (index == 0) { // 達到要求數量 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); // 達到等待執行緒數量後執行barrierCommand ranAction = true; nextGeneration(); // 喚醒本代所有執行緒,生成新一代,重置count return 0; } finally { if (!ranAction) breakBarrier(); } } // 執行緒數量未達到要求數量,將執行緒掛起等待 for (;;) { try { if (!timed) trip.await(); // 將執行緒加入condition佇列掛起 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 特殊情況處理 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } // 代失效,喚醒所有執行緒 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } // 喚醒本代所有執行緒,生成新一代,重置count private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }