多執行緒之迴圈柵欄CyclicBarrier及原理
阿新 • • 發佈:2019-02-18
一、迴圈柵欄CyclicBarrier
CyclicBarrier它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。比如將計數器設定為10,那麼湊齊第一批10個執行緒後,計數器就會歸0,然後接著湊齊下一批10個執行緒,這就是迴圈柵欄內在的含義。
CountDownLatch和CyclicBarrier的區別:
1、 CountDownLatch的作用是允許1或N個執行緒等待其他執行緒完成執行;而CyclicBarrier則是允許N個執行緒相互等待。2、 CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用。
構造方法:
CyclicBarrier(int parties)CyclicBarrier(int parties, Runnable barrierAction)
其中parties表示計數總數,也就是參與的執行緒總數。barrierAction表示當計數器一次計數完成後,系統會執行的動作。
主要方法:
int await():在所有參與執行緒都已經在此 barrier 上呼叫 await 方法之前,將一直等待。int await(long timeout, TimeUnit unit):在所有參與執行緒都已經在此屏障上呼叫 await 方法之前將一直等待,或者超出了指定的等待時間。
int getNumberWaiting():返回當前在屏障處等待的參與者數目。
int getParties():返回要求啟動此 barrier 的參與執行緒數目。
boolean isBroken():查詢此屏障是否處於損壞狀態。
void reset():將屏障重置為其初始狀態。
例子:
司令官讓士兵集合完成任務的場景
執行結果:public class CyclicBarrierDemo { public static class Soldier implements Runnable{ private String soldierName; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic,String soldierName){ this.cyclic = cyclic; this.soldierName = soldierName; } @Override public void run() { try{ //等待其他士兵到齊 cyclic.await(); doWork(); //等待所有士兵完成工作 cyclic.await(); }catch (InterruptedException e){ e.printStackTrace(); }catch (BrokenBarrierException e){ e.printStackTrace(); } } void doWork(){ try { Thread.sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println(soldierName+" :任務完成"); } } public static class BarrierRun implements Runnable{ boolean flag; int N; public BarrierRun(boolean flag,int N){ this.flag = flag; this.N = N; } @Override public void run() { if(flag){ System.out.println("司令:[士兵"+N+"個,任務完成!]"); }else{ System.out.println("司令:[士兵"+N+"個,集合完畢!]"); flag = true; } } } public static void main(String[] args) { final int N = 10; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N,new BarrierRun(flag,N)); //設定屏障點,主要是為了執行這個方法 System.out.println("集合隊伍!"); for(int i = 0;i < N;i++){ System.out.println("士兵"+i+" 報道!"); allSoldier[i] =new Thread(new Soldier(cyclic,"士兵"+i)); allSoldier[i].start(); } } }
可以看到這段程式碼10個士兵執行緒,先後到達了2次屏障點,每次到達公共屏障點之後執行了BarrierRun的run方法,先後輸出了集合完畢和任務完成。重點在於每個士兵都相互等待,直到所有的士兵一起加入,才進行下一步。
二、CyclicBarrier原始碼分析
與之前介紹的同步類不同,CyclicBarrier是用ReentrantLock跟Condition來實現的。
首先看他的成員變數:
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
//參與執行緒的數量
private final int parties;
//所有參與執行緒到達屏障點後執行的動作
private final Runnable barrierCommand;
//當前的generation,同一批執行緒屬於同一個generation,每跨越一次屏障點就換一個新的generation
private Generation generation = new Generation();
//還在等待的執行緒個數(注意:這裡的等待不是condition佇列中的等待執行緒,而是還沒執行await()方法的執行緒,得明白每個等待的含義)
private int count;
構造方法:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
我們主要分析他的await()方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
await()方法呼叫了dowait()方法:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); //拿到lock鎖
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
//如果執行緒被中斷,就通過breakBarrier()破壞掉當前的generation,並喚醒所有的在condition佇列中等待的執行緒
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//還在等待的執行緒數-1
int index = --count;
if (index == 0) { //所有執行緒均呼叫了await(),即到達公共屏障點
boolean ranAction = false; //用來判斷到達屏障點之後執行的動作是否正常執行
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//注意這裡是run()而不是start(),也就是說是同步
ranAction = true;
nextGeneration();//一次屏障跨越,”更新換代”,即喚醒所有的condition佇列中的執行緒,並把count置為parties,然後換一個新的generation
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) { //自旋
try {
if (!timed)
trip.await();//將當前執行緒加入到condition佇列,釋放lock鎖資源,使得下一個執行緒能獲得鎖資源進入lock塊
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//等待過程執行緒被中斷
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 如果“當前generation已經損壞”,則丟擲異常。
if (g.broken)
throw new BrokenBarrierException();
// 如果“generation已經更新換代”,則返回index。
if (g != generation)
return index;
//”定時等待”且時間已到,則終止cyclicBarrier,喚醒condition佇列中所有的執行緒
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//釋放lock鎖
lock.unlock();
}
}
再看上面程式碼提到的breakBarrier()和nextGeneration()方法:
private void breakBarrier() {
generation.broken = true; //設定當前generation被破壞
count = parties; //重置count
trip.signalAll(); //喚醒condition佇列中的所有等待執行緒
}
private void nextGeneration() {
trip.signalAll(); //喚醒condition佇列中的所有等待執行緒
count = parties; //重置count
generation = new Generation(); //”更新換代”
}
總結:
CyclicBarrier的關鍵在於利用ReentrantLock和Condition,每個呼叫await()方法的執行緒都被加入到condition佇列中進行等待,所有參與執行緒都呼叫了await()之後,執行設定的後續動作barrierCommand,再喚醒condition佇列中的所有等待執行緒,重置count,並"更新換代"。