1. 程式人生 > >[滄海拾遺]java併發之CountDownLatch、Semaphore和CyclicBarrier

[滄海拾遺]java併發之CountDownLatch、Semaphore和CyclicBarrier

JAVA併發包中有三個類用於同步一批執行緒的行為,分別是CountDownLatch、Semaphore和CyclicBarrier。

CountDownLatch

CountDownLatch是一個計數器閉鎖,主要的功能就是通過await()方法來阻塞住當前執行緒,然後等待計數器減少到0了,再喚起這些執行緒繼續執行。 這個類裡主要有兩個方法,一個是向下減計數器的方法:countdown(),其實現的核心程式碼如下:

public boolean tryReleaseShared(int releases) {  
	// Decrement count; signal when transition to zero  
	for (;;) {   
	int c = getState();     
	if (c == 0)    
		return false;     
	int nextc = c-1;    
	if (compareAndSetState(c, nextc))    
		return nextc == 0;    
	}    
}    

很簡單,如果取得當前的狀態為0,說明這個鎖已經結束,直接返回false;如果沒有結束,然後去設定計數器減1,如果compareAndSetState不成功,則繼續迴圈執行。 而其中的一直等待計數器歸零的方法是await()。 
通過CountDownLatch可以做幾件事情:

1. 主執行緒控制同時啟動一組執行緒

final CountDownLatch count = new CountDownLatch(1);
for (int i = 0; i < 3; i++) {
    new Thread("Thread" + i) {
        public void run() {
            System.out.println(Thread.currentThread().getName() + " wait");
            try {
                count.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " start");
        }
    }.start();
}
//等等三秒,否則有可能3個執行緒並沒有全部進行await狀態
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
count.countDown();

2. 主執行緒等待各子執行緒全部執行完畢後再往下執行:

final CountDownLatch count = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
    new Thread("Thread" + i) {
        public void run() {
            System.out.println(Thread.currentThread().getName() + " start");
            count.countDown();
        }
    }.start();
}
try {
    count.await();
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println("All end!!!");    

Semaphore

Semaphore與CountDownLatch相似,不同的地方在於Semaphore的值被獲取到後是可以釋放的,並不像CountDownLatch那樣一直減到底。它也被更多地用來限制流量,類似閥門的 功能。如果限定某些資源最多有N個執行緒可以訪問,那麼超過N個主不允許再有執行緒來訪問,同時當現有執行緒結束後,就會釋放,然後允許新的執行緒進來。有點類似於鎖的lock與 unlock過程。相對來說他也有兩個主要的方法:

  1. 用於獲取許可權的acquire(),其底層實現與CountDownLatch.countdown()類似;
  2. 用於釋放許可權的release(),其底層實現與acquire()是一個互逆的過程。

用Semaphore來實現限流程式碼詳見:semaphore例子

CyclicBarrier

CyclicBarrier是用來一個關卡來阻擋住所有執行緒,等所有執行緒全部執行到關卡處時,再統一執行下一步操作,它裡面最重要的方法是await()方法,其實現如下:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    //取鎖,以防止在後面做減1計數時執行緒不安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
       //如果當前執行緒執行到了,則將計數器減1,計數器為0則說明所有執行緒均執行到這裡,可以呼叫下一步操作
       int index = --count;
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               //獲取到定義好的下一步操作,並執行
	   final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
		throw ie;
	    } else {
		// We're about to finish waiting even if we had not
		// been interrupted, so this interrupt is deemed to
		// "belong" to subsequent execution.
		Thread.currentThread().interrupt();
	    }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}    

即每個執行緒執行完後呼叫await(),然後在await()裡,執行緒先將計數器減1,如果計數器為0,則執行定義好的操作,然後再繼續執行原執行緒的內容。 
這個類比之前兩個類的一個好處是有點類似於切面程式設計,可以讓我們在同類執行緒的某個切面切入一塊邏輯,並且可以同步所有的執行緒的執行速度。
例子程式碼如下:

final CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {

    @Override
    public void run() {
        System.out.println("All Threads Here");

    }
});
for (int i = 0; i < 4; i++) {
    new Thread("Thread" + i) {
        public void run() {
            System.out.println(Thread.currentThread().getName() + " wait");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " crossed");
        }
    }.start();
}  

最終的輸出結果為:

Thread0 wait
Thread1 wait 
Thread2 wait 
Thread3 wait 
All Threads Here 
Thread0 crossed 
Thread1 crossed 
Thread2 crossed 
Thread3 crossed