concurrent(四)同步屏障 CyclicBarrier & 源碼分析
參考文檔:
Java多線程系列--“JUC鎖”10之 CyclicBarrier原理和示例:https://www.cnblogs.com/skywang12345/p/3533995.html
簡介
CyclicBarrier是一個同步輔助類,允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環的 barrier。基於ReentrantLock實現
舉個栗子
/** * 簡單模擬一下對戰平臺中玩家需要完全準備好了,才能進入遊戲的場景。 * * @author BFD_526 * */ publicView Codeclass CyclicBarrierTest { public static void main(String[] args) { test(); } // 同步屏障 static void test() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, barrier)); } service.shutdown(); } // 同步屏障重置 static void test1() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, barrier)); } for (int i = 5; i < 10; i++) { service.execute(new Player("玩家" + i, barrier)); } service.shutdown(); } // 在同步屏障結束後,啟動優先線程 static void test2() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier ba = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("所有玩家已就位"); } }); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, ba)); } } } class Player implements Runnable { private final String name; private final CyclicBarrier barrier; public Player(String name, CyclicBarrier barrier) { this.name = name; this.barrier = barrier; } public void run() { try { TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3))); System.out.println(name + "已準備,等待其他玩家準備..."); barrier.await(); TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3))); System.out.println(name + "已加入遊戲"); } catch (InterruptedException e) { System.out.println(name + "離開遊戲"); } catch (BrokenBarrierException e) { System.out.println(name + "離開遊戲"); } } }
源碼分析
函數列表
CyclicBarrier(int parties):創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作
CyclicBarrier(int parties, Runnable barrierAction):創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的線程執行
int await():在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待
int await(long timeout, TimeUnit unit):在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間
int getNumberWaiting():返回當前在屏障處等待的參與者數目
int getParties():返回要求啟動此 barrier 的參與者數目
boolean isBroken():查詢此屏障是否處於損壞狀態
void reset():將屏障重置為其初始狀態
await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 獲取“獨占鎖(lock)” lock.lock(); try { // 保存“當前的generation” final Generation g = generation; // 若“當前generation已損壞”,則拋出異常。 if (g.broken) throw new BrokenBarrierException(); // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 將“count計數器”-1 int index = --count; // 如果index=0,則意味著“有parties個線程到達barrier” if (index == 0) { // tripped boolean ranAction = false; try { // 如果barrierCommand不為null,則執行該動作 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 喚醒所有等待線程,並更新generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 當前線程一直阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生, // 當前線程才繼續執行。 for (;;) { try { // 如果不是“超時等待”,則調用awati()進行等待;否則,調用awaitNanos()進行等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果等待過程中,線程被中斷,則執行下面的函數 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 如果“當前generation已經損壞”,則拋出異常 if (g.broken) throw new BrokenBarrierException(); // 如果“generation已經換代”,則返回index if (g != generation) return index; // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放“獨占鎖(lock)” lock.unlock(); } }
generation是CyclicBarrier的一個成員變量,它的定義如下:
private Generation generation = new Generation(); private static class Generation { boolean broken = false; }
在CyclicBarrier中,同一批的線程屬於同一代,即同一個Generation;CyclicBarrier中通過generation對象,記錄屬於哪一代
當有parties個線程到達barrier,generation就會被更新換代
換代:
//換代
private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
concurrent(四)同步屏障 CyclicBarrier & 源碼分析