Java執行緒--CyclicBarrier迴圈屏障
CyclicBarrier迴圈屏障
目錄
CyclicBarrier和CountDownLatch區別
CyclicBarrier原理
下面是部分原始碼,以及原始碼中呼叫相關類的部分原始碼剖析:
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private Generation generation = new Generation(); private int count; public CyclicBarrier(int parties) { this.parties = parties; this.count = parties; } private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) { lock.lock(); try { final Generation g = generation; int index = --count; if (index == 0) { // 屏障開啟,可通行 boolean ranAction = false; try { ranAction = true; nextGeneration(); // 喚醒所有卡在barrier屏障前的執行緒,並恢復count使其可迴圈利用 return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { // index!=0 說明人還沒到齊,仍然無法開啟屏障 try { if (!timed) trip.await(); // 當前執行緒壓入條件佇列 } catch (InterruptedException ie) { } if (g != generation) return index; } } finally { lock.unlock(); } } public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { } } } public class ConditionObject implements Condition, java.io.Serializable { public ConditionObject() { } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { //從第一個節點開始,迴圈處理,喚醒所有barrier屏障前的等待執行緒 Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } public final void signalAll() { doSignalAll(first); } } public abstract class AbstractQueuedSynchronizer{ final boolean transferForSignal(Node node) { LockSupport.unpark(node.thread); // 喚醒執行緒 return true; } }
原始碼分析如下:
CyclicBarrier類的await()方法:是我們程式中經常呼叫的,其內部呼叫了Cyclic類的dowait()方法
CyclicBarrier類的dowait()方法:
獲得鎖:
臨界區:業務邏輯,判斷當前屏障能否放行
index==0,能放行,則呼叫CyclicBarrier類的nextCondition()方法
index!=0, 不能放行,當前執行緒壓入條件佇列,進行等待
釋放鎖:
CyclicBarrier類的nextCondition()方法:呼叫ConditionObject類的signalAll()方法,恢復迴圈計數功能count = parties;
ConditionObject類的signalAll()方法:迴圈處理,條件佇列中的執行緒轉入同步佇列準備執行。即喚醒所有到達屏障前的等待執行緒
原始碼分析總結:
白話文闡述:眾人都到達,才能推開屏障大門,得以通行。假定大門4斤,每人只能推動1斤,必須4人都到齊後,才能推開大門。4人通過大門後,大門自行關閉又形成屏障,需要下一組還得是4個人都到門前一起才能推開大門,如此迴圈反覆....直至所有人都通過大門。
程式上來說:屏障計數量初始化為N。每一個執行緒到達屏障前(即當前執行緒執行到await()方法處),該執行緒被阻塞,屏障計數量N-1,直至屏障降為零(即到達屏障前的執行緒個數之和達到屏障計數量N),屏障才會開啟,喚醒這N個到達屏障前的多執行緒,都可通過屏障,繼續得以執行。
適應的業務場景:
兩個或者多個執行緒的一組執行緒,必須在預定的執行點進行等待,直到這組執行緒中的所有執行緒都到達執行點為止。舉例:景區觀光小車,人不坐滿不發車。在車子想開走觀光之前(執行點),一車人(一組執行緒)必須坐滿,才啟動車輛。
CyclicBarrier示例
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
/**
* 任務
*/
class PlayTask implements Runnable
{
private CyclicBarrier cb;
public PlayTask(CyclicBarrier cb){
this.cb = cb;
}
public void run(){
try{
long sleepTime = (long)(Math.random() * 2500) ;
String tName = Thread.currentThread().getName();
System.out.println(tName+":在做事,需要耗時["+sleepTime+"]");
Thread.sleep(sleepTime);
cb.await();
System.out.println("湊夠"+cb.getParties()+",["+tName+"]已上車,出發!");
}
catch(InterruptedException | BrokenBarrierException e){
}
finally{
}
}
}
/**
* 測試
*/
public class CyclicBarrierTest
{
public static void main(String[] args)
{
int count = 3 ; //計數量
CyclicBarrier cb = new CyclicBarrier(count,new Runnable() {
public void run(){
System.out.println("\n\r有"+count+"個執行緒到達...");
}
});
System.out.println("景區,請排隊上車觀光,每"+count+"人一組上車...\n\r");
PlayTask eTask = new PlayTask(cb); //任務:湊夠3人去觀光
Thread[] threads = new Thread[count*3]; //多執行緒,每count個一組
for (int i = 0 ; i < count*3 ; ++i) {
threads[i] = new Thread(eTask,"T"+i);
threads[i].start();
}
}
}
程式結果如下:
景區,請排隊上車觀光,每3人一組上車...
T5:在做事,需要耗時[1717]
T3:在做事,需要耗時[2117]
T8:在做事,需要耗時[1261]
T1:在做事,需要耗時[286]
T0:在做事,需要耗時[173]
T4:在做事,需要耗時[221]
T6:在做事,需要耗時[2319]
T7:在做事,需要耗時[1538]
T2:在做事,需要耗時[1259]
有3個執行緒到達...
湊夠3,[T1]已上車,出發!
湊夠3,[T4]已上車,出發!
湊夠3,[T0]已上車,出發!
有3個執行緒到達...
湊夠3,[T7]已上車,出發!
湊夠3,[T8]已上車,出發!
湊夠3,[T2]已上車,出發!
有3個執行緒到達...
湊夠3,[T6]已上車,出發!
湊夠3,[T3]已上車,出發!
湊夠3,[T5]已上車,出發!
CyclicBarrier和CountDownLatch區別
1。
CountDownLatch:主執行緒等在這裡,直到N個子執行緒到達後,主執行緒才得以繼續往下執行;
CyclicBarrier:則是N個子執行緒構成一組,組內成員有快有慢,快的成員到達屏障後,無法通過,必須組內的所有成員都忙好到達了屏障,屏障才會開啟放行這一組內的N個子執行緒,使得這N個子執行緒得以通過屏障繼續執行,與主執行緒沒關係,是一組內的N個子執行緒等待彼此都到達屏障。如果非要說和CountDownLatch類似的地方就是,建構函式public CyclicBarrier(int parties, Runnable barrierAction)這樣的,這裡的Runnable就是N個子執行緒都到達了之後,才能得以執行。
2。
CountDownLatch計數為0後,無法重置,一次性的;
CyclicBarrier計數達到後,自動重置,可迴圈利用的。