1. 程式人生 > >concurrent(四)同步屏障 CyclicBarrier & 源碼分析

concurrent(四)同步屏障 CyclicBarrier & 源碼分析

ret https 初始 else ring class http display thread

參考文檔:
Java多線程系列--“JUC鎖”10之 CyclicBarrier原理和示例:https://www.cnblogs.com/skywang12345/p/3533995.html
簡介
CyclicBarrier是一個同步輔助類,允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環的 barrier。基於ReentrantLock實現
舉個栗子

技術分享圖片
/**
 * 簡單模擬一下對戰平臺中玩家需要完全準備好了,才能進入遊戲的場景。
 * 
 * @author BFD_526
 * 
 */
public
class CyclicBarrierTest { public static void main(String[] args) { test(); } // 同步屏障 static void test() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { service.execute(
new Player("玩家" + i, barrier)); } service.shutdown(); } // 同步屏障重置 static void test1() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { service.execute(
new Player("玩家" + i, barrier)); } for (int i = 5; i < 10; i++) { service.execute(new Player("玩家" + i, barrier)); } service.shutdown(); } // 在同步屏障結束後,啟動優先線程 static void test2() { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier ba = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("所有玩家已就位"); } }); for (int i = 0; i < 5; i++) { service.execute(new Player("玩家" + i, ba)); } } } class Player implements Runnable { private final String name; private final CyclicBarrier barrier; public Player(String name, CyclicBarrier barrier) { this.name = name; this.barrier = barrier; } public void run() { try { TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3))); System.out.println(name + "已準備,等待其他玩家準備..."); barrier.await(); TimeUnit.SECONDS.sleep(1 + (new Random().nextInt(3))); System.out.println(name + "已加入遊戲"); } catch (InterruptedException e) { System.out.println(name + "離開遊戲"); } catch (BrokenBarrierException e) { System.out.println(name + "離開遊戲"); } } }
View Code

源碼分析

技術分享圖片

函數列表

CyclicBarrier(int parties):創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作
CyclicBarrier(int parties, Runnable barrierAction):創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的線程執行
int await():在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待
int await(long timeout, TimeUnit unit):在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間
int getNumberWaiting():返回當前在屏障處等待的參與者數目
int getParties():返回要求啟動此 barrier 的參與者數目
boolean isBroken():查詢此屏障是否處於損壞狀態
void reset():將屏障重置為其初始狀態

await()

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 獲取“獨占鎖(lock)”
    lock.lock();
    try {
        // 保存“當前的generation”
        final Generation g = generation;
        // 若“當前generation已損壞”,則拋出異常。
        if (g.broken)
            throw new BrokenBarrierException();
        // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
       // 將“count計數器”-1
       int index = --count;
       // 如果index=0,則意味著“有parties個線程到達barrier”
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不為null,則執行該動作
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚醒所有等待線程,並更新generation
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }
        // 當前線程一直阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,
        // 當前線程才繼續執行。
        for (;;) {
            try {
                // 如果不是“超時等待”,則調用awati()進行等待;否則,調用awaitNanos()進行等待
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待過程中,線程被中斷,則執行下面的函數
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            // 如果“當前generation已經損壞”,則拋出異常
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果“generation已經換代”,則返回index
            if (g != generation)
                return index;
            // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放“獨占鎖(lock)”
        lock.unlock();
    }
}

generation是CyclicBarrier的一個成員變量,它的定義如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中,同一批的線程屬於同一代,即同一個Generation;CyclicBarrier中通過generation對象,記錄屬於哪一代
當有parties個線程到達barrier,generation就會被更新換代
換代:

//換代
private
void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

concurrent(四)同步屏障 CyclicBarrier & 源碼分析