1. 程式人生 > >Java執行緒--CyclicBarrier迴圈屏障

Java執行緒--CyclicBarrier迴圈屏障

CyclicBarrier迴圈屏障

目錄

CyclicBarrier迴圈屏障

CyclicBarrier原理 

CyclicBarrier示例 

 CyclicBarrier和CountDownLatch區別


CyclicBarrier原理 

CyclicBarrier

下面是部分原始碼,以及原始碼中呼叫相關類的部分原始碼剖析: 



public class CyclicBarrier {
    
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private Generation generation = new Generation();
    private int count;

    public CyclicBarrier(int parties) {
	this.parties = parties;
        this.count = parties;
    }

    private void nextGeneration() {
        trip.signalAll();
	count = parties;
        generation = new Generation();
    }
    private void breakBarrier() {
	generation.broken = true;
        count = parties;
        trip.signalAll();
        
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos) {
       
	lock.lock();
        
	try {
            final Generation g = generation;

            int index = --count;
            if (index == 0) {		// 屏障開啟,可通行
                boolean ranAction = false;
                try {
                    ranAction = true;
                    nextGeneration();   // 喚醒所有卡在barrier屏障前的執行緒,並恢復count使其可迴圈利用
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            
            for (;;) {		        // index!=0 說明人還沒到齊,仍然無法開啟屏障
                try {
                    if (!timed)
                        trip.await();	// 當前執行緒壓入條件佇列
                } catch (InterruptedException ie) {
                }

                if (g != generation)
                    return index;
            }
        } finally {
            lock.unlock();
        }
    }

    

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
        }
    }

    
}

public class ConditionObject implements Condition, java.io.Serializable {

	public ConditionObject() { }
		   
	private void doSignalAll(Node first) {
		lastWaiter = firstWaiter = null;
		
		do {	//從第一個節點開始,迴圈處理,喚醒所有barrier屏障前的等待執行緒
			Node next = first.nextWaiter;
			first.nextWaiter = null;
			transferForSignal(first);
			first = next;
		} while (first != null);
	}
	
	
	public final void signalAll() {
		doSignalAll(first);
	}        
}

public abstract class AbstractQueuedSynchronizer{

	final boolean transferForSignal(Node node) {
        LockSupport.unpark(node.thread);	// 喚醒執行緒
        return true;
    }
}

原始碼分析如下:

CyclicBarrier類的await()方法:是我們程式中經常呼叫的,其內部呼叫了Cyclic類的dowait()方法

CyclicBarrier類的dowait()方法:

獲得鎖:

臨界區:業務邏輯,判斷當前屏障能否放行

               index==0,能放行,則呼叫CyclicBarrier類的nextCondition()方法

               index!=0, 不能放行,當前執行緒壓入條件佇列,進行等待

釋放鎖:

CyclicBarrier類的nextCondition()方法:呼叫ConditionObject類的signalAll()方法,恢復迴圈計數功能count = parties;

ConditionObject類的signalAll()方法:迴圈處理,條件佇列中的執行緒轉入同步佇列準備執行。即喚醒所有到達屏障前的等待執行緒


原始碼分析總結:

白話文闡述:眾人都到達,才能推開屏障大門,得以通行。假定大門4斤,每人只能推動1斤,必須4人都到齊後,才能推開大門。4人通過大門後,大門自行關閉又形成屏障,需要下一組還得是4個人都到門前一起才能推開大門,如此迴圈反覆....直至所有人都通過大門。

程式上來說:屏障計數量初始化為N。每一個執行緒到達屏障前(即當前執行緒執行到await()方法處),該執行緒被阻塞,屏障計數量N-1,直至屏障降為零(即到達屏障前的執行緒個數之和達到屏障計數量N),屏障才會開啟,喚醒這N個到達屏障前的多執行緒,都可通過屏障,繼續得以執行。


適應的業務場景:

兩個或者多個執行緒的一組執行緒,必須在預定的執行點進行等待,直到這組執行緒中的所有執行緒都到達執行點為止。舉例:景區觀光小車,人不坐滿不發車。在車子想開走觀光之前(執行點),一車人(一組執行緒)必須坐滿,才啟動車輛。

CyclicBarrier示例 

import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

/**
* 任務
*/
class PlayTask implements Runnable
{
    private CyclicBarrier cb;

    public PlayTask(CyclicBarrier cb){
	this.cb = cb;
    }

    public void run(){
	try{
	    long sleepTime = (long)(Math.random() * 2500) ;
            String tName = Thread.currentThread().getName();
	    System.out.println(tName+":在做事,需要耗時["+sleepTime+"]");
	    Thread.sleep(sleepTime);
	    cb.await();
	    System.out.println("湊夠"+cb.getParties()+",["+tName+"]已上車,出發!");
	}
	catch(InterruptedException | BrokenBarrierException e){
	}
	finally{
	}
    }
}

/**
* 測試
*/
public class CyclicBarrierTest 
{
    public static void main(String[] args) 
    {
	int count = 3 ; //計數量

	CyclicBarrier cb = new CyclicBarrier(count,new Runnable() {

	    public void run(){
		System.out.println("\n\r有"+count+"個執行緒到達...");
	    }
	});

	System.out.println("景區,請排隊上車觀光,每"+count+"人一組上車...\n\r");

	PlayTask eTask = new PlayTask(cb);	//任務:湊夠3人去觀光		

        Thread[] threads = new Thread[count*3]; //多執行緒,每count個一組

	for (int i = 0 ; i < count*3 ; ++i) { 
	    threads[i] = new Thread(eTask,"T"+i);
	    threads[i].start();
	} 
    }
}

程式結果如下:

景區,請排隊上車觀光,每3人一組上車...

T5:在做事,需要耗時[1717]
T3:在做事,需要耗時[2117]
T8:在做事,需要耗時[1261]
T1:在做事,需要耗時[286]
T0:在做事,需要耗時[173]
T4:在做事,需要耗時[221]
T6:在做事,需要耗時[2319]
T7:在做事,需要耗時[1538]
T2:在做事,需要耗時[1259]

有3個執行緒到達...
湊夠3,[T1]已上車,出發!
湊夠3,[T4]已上車,出發!
湊夠3,[T0]已上車,出發!

有3個執行緒到達...
湊夠3,[T7]已上車,出發!
湊夠3,[T8]已上車,出發!
湊夠3,[T2]已上車,出發!

有3個執行緒到達...
湊夠3,[T6]已上車,出發!
湊夠3,[T3]已上車,出發!
湊夠3,[T5]已上車,出發!

 CyclicBarrier和CountDownLatch區別

1。

CountDownLatch:主執行緒等在這裡,直到N個子執行緒到達後,主執行緒才得以繼續往下執行;

CyclicBarrier:則是N個子執行緒構成一組,組內成員有快有慢,快的成員到達屏障後,無法通過,必須組內的所有成員都忙好到達了屏障,屏障才會開啟放行這一組內的N個子執行緒,使得這N個子執行緒得以通過屏障繼續執行,與主執行緒沒關係,是一組內的N個子執行緒等待彼此都到達屏障。如果非要說和CountDownLatch類似的地方就是,建構函式public CyclicBarrier(int parties, Runnable barrierAction)這樣的,這裡的Runnable就是N個子執行緒都到達了之後,才能得以執行。

2。

CountDownLatch計數為0後,無法重置,一次性的;

CyclicBarrier計數達到後,自動重置,可迴圈利用的。