[滄海拾遺]java併發之CountDownLatch、Semaphore和CyclicBarrier
JAVA併發包中有三個類用於同步一批執行緒的行為,分別是CountDownLatch、Semaphore和CyclicBarrier。
CountDownLatch
CountDownLatch是一個計數器閉鎖,主要的功能就是通過await()方法來阻塞住當前執行緒,然後等待計數器減少到0了,再喚起這些執行緒繼續執行。 這個類裡主要有兩個方法,一個是向下減計數器的方法:countdown(),其實現的核心程式碼如下:
public boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
很簡單,如果取得當前的狀態為0,說明這個鎖已經結束,直接返回false;如果沒有結束,然後去設定計數器減1,如果compareAndSetState不成功,則繼續迴圈執行。 而其中的一直等待計數器歸零的方法是await()。
通過CountDownLatch可以做幾件事情:
1. 主執行緒控制同時啟動一組執行緒
final CountDownLatch count = new CountDownLatch(1); for (int i = 0; i < 3; i++) { new Thread("Thread" + i) { public void run() { System.out.println(Thread.currentThread().getName() + " wait"); try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " start"); } }.start(); } //等等三秒,否則有可能3個執行緒並沒有全部進行await狀態 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } count.countDown();
2. 主執行緒等待各子執行緒全部執行完畢後再往下執行:
final CountDownLatch count = new CountDownLatch(3); for (int i = 0; i < 3; i++) { new Thread("Thread" + i) { public void run() { System.out.println(Thread.currentThread().getName() + " start"); count.countDown(); } }.start(); } try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("All end!!!");
Semaphore
Semaphore與CountDownLatch相似,不同的地方在於Semaphore的值被獲取到後是可以釋放的,並不像CountDownLatch那樣一直減到底。它也被更多地用來限制流量,類似閥門的 功能。如果限定某些資源最多有N個執行緒可以訪問,那麼超過N個主不允許再有執行緒來訪問,同時當現有執行緒結束後,就會釋放,然後允許新的執行緒進來。有點類似於鎖的lock與 unlock過程。相對來說他也有兩個主要的方法:
- 用於獲取許可權的acquire(),其底層實現與CountDownLatch.countdown()類似;
- 用於釋放許可權的release(),其底層實現與acquire()是一個互逆的過程。
用Semaphore來實現限流程式碼詳見:semaphore例子
CyclicBarrier
CyclicBarrier是用來一個關卡來阻擋住所有執行緒,等所有執行緒全部執行到關卡處時,再統一執行下一步操作,它裡面最重要的方法是await()方法,其實現如下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//取鎖,以防止在後面做減1計數時執行緒不安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//如果當前執行緒執行到了,則將計數器減1,計數器為0則說明所有執行緒均執行到這裡,可以呼叫下一步操作
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
//獲取到定義好的下一步操作,並執行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
即每個執行緒執行完後呼叫await(),然後在await()裡,執行緒先將計數器減1,如果計數器為0,則執行定義好的操作,然後再繼續執行原執行緒的內容。
這個類比之前兩個類的一個好處是有點類似於切面程式設計,可以讓我們在同類執行緒的某個切面切入一塊邏輯,並且可以同步所有的執行緒的執行速度。
例子程式碼如下:
final CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("All Threads Here");
}
});
for (int i = 0; i < 4; i++) {
new Thread("Thread" + i) {
public void run() {
System.out.println(Thread.currentThread().getName() + " wait");
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " crossed");
}
}.start();
}
最終的輸出結果為:
Thread0 wait
Thread1 wait
Thread2 wait
Thread3 wait
All Threads Here
Thread0 crossed
Thread1 crossed
Thread2 crossed
Thread3 crossed