CyclicBarrier原始碼探究 (JDK 1.8)
阿新 • • 發佈:2020-03-13
`CyclicBarrier`也叫回環柵欄,能夠實現讓一組執行緒執行到柵欄處並阻塞,等到所有執行緒都到達柵欄時再一起執行的功能。“迴環”意味著`CyclicBarrier`可以多次重複使用,相比於`CountDownLatch`只能使用一次,`CyclicBarrier`可以節省許多資源,並且還可以在構造器中傳入任務,當柵欄條件滿足時執行這個任務。`CyclicBarrier`是使用了`ReentrantLock`,主要方法在執行時都會加鎖,因此併發效能不是很高。
## 1.相關欄位
```
//重入鎖,CyclicBarrier內部通過重入鎖實現執行緒安全
private final ReentrantLock lock = new ReentrantLock();
//執行緒阻塞時的等待條件
private final Condition trip = lock.newCondition();
//需要等待的執行緒數
private final int parties;
//柵欄開啟之後首先執行的任務
private final Runnable barrierCommand;
//記錄當前的分代標記
private Generation generation = new Generation();
//當前還需要等待多少個執行緒執行到柵欄位置
private int count;
```
需要注意的是`generation`欄位,用於標記柵欄當前處在哪一代。當滿足一定的條件時(例如呼叫了`reset`方法,或者柵欄開啟等),柵欄狀態會切換到下一代,實際就是`new`一個新的`Generation`物件,這是`CyclicBarrier`的內部類,程式碼非常簡單,如下:
```
private static class Generation {
boolean broken = false; //標記柵欄是否被破壞
}
```
實際使用的過程中,會利用`generation`欄位判斷當前是否在同一個分代,而使用`broker`欄位判斷柵欄是否被破壞。
## 2.建構函式
`CyclicBarrier`有兩個過載的建構函式,建構函式只是對上述的相關欄位進行初始化,如下:
```
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
```
## 3.核心方法
- `await`
`await`是開發時最常用到的方法了,同`CountDownLatch`一樣,`CyclicBarrier`也提供了兩個`await`方法,一個不帶引數,一個帶有超時引數,其內部只是簡單呼叫了一下`dowait`方法:
```
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
```
接下來看看至關重要的`dowait`方法:
```
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//加重入鎖
lock.lock();
try {
//首先獲取年齡代資訊
final Generation g = generation;
//如果柵欄狀態被破壞,丟擲異常,例如先啟動的執行緒呼叫了breakBarrier方法,後啟動的執行緒就能夠看到g.broker=true
if (g.broken)
throw new BrokenBarrierException();
//檢測執行緒的中斷狀態,如果執行緒設定了中斷狀態,則通過breakBarrier設定柵欄為已破壞狀態,並喚醒其他執行緒
//如果這裡能夠檢測到中斷狀態,那隻可能是在await方法外部設定的
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//每呼叫一次await,就將需要等待的執行緒數減1
int index = --count;
//index=0表示這是最後一個到達的執行緒,由該執行緒執行下面的邏輯
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//如果在構造器中傳入了第二個任務引數,就在放開柵欄前先執行這個任務
if (command != null)
command.run();
ranAction = true;
//正常結束,需要喚醒阻塞的執行緒,並換代
nextGeneration();
return 0;
} finally {
//try程式碼塊如果正常執行,ranAction就一定等於true,而try程式碼塊唯一可能發生異常的地方就是command.run(),
//因此這裡為了保證在任務執行失敗時,將柵欄標記為已破壞,喚醒阻塞執行緒
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
////柵欄沒被破壞,執行緒沒有被中斷,且不是最後一個到達柵欄的執行緒,就會執行下面的自旋,排隊等待
for (;;) {
try {
//沒有設定超時標記,就加入等待佇列
if (!timed)
trip.await();
//設定了超時標記,但目前還沒有超時,則繼續等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果執行緒等待的過程中被中斷,會執行到這裡
//g == generation表示當前還在同一個年齡分代中,!g.broker表示當前柵欄狀態沒有被破壞
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//上面的條件不滿足,說明:1)g!=generation,說明執行緒執行到這裡時已經換代了
//2)沒有換代,但是柵欄被破壞了
//無論哪種情況,都只是簡單地設定一下當前執行緒的中斷狀態
Thread.currentThread().interrupt();
}
}
//柵欄被破壞,丟擲異常
//注意,在breakBarrier方法中會喚醒所有等待條件的執行緒,這些執行緒會執行到這裡,判斷柵欄已經被破壞,都會丟擲異常
if (g.broken)
throw new BrokenBarrierException();
//距離上一次設定g變數的值已經過去很長時間了,在執行過程中generation可能已經發生改變,
//當前執行緒還是前幾代的,不需要再迴圈阻塞了,直接返回上一代剩餘需要等待的執行緒數
//注意:程式碼中breakBarrier方法和nextGeneration方法都會喚醒阻塞的執行緒,但是breakBarrier在上一個判斷就被攔截了,
//因此走到這裡的有三種情況:
//a)最後一個執行緒正常執行,柵欄開啟導致其他執行緒被喚醒;不屬於當前代的執行緒直接返回,
//屬於當前代的則可能因為沒到柵欄開放條件要繼續迴圈阻塞
//b)柵欄被重置(呼叫了reset方法),此時g!=negeration,全都直接返回
//c)執行緒等待超時了,不屬於當前代的返回就可以了,屬於當前代的則要設定generation.broken = true
if (g != generation)
return index;
//如果執行緒等待超時,標記柵欄為破壞狀態並丟擲異常,如果還沒超時,則自旋後又重新阻塞
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//別忘了解鎖
lock.unlock();
}
}
```
`dowait`的方法邏輯是:每一個呼叫`await`方法的執行緒都會將計數`count`減`1`,最後一個執行緒將`count`減為`0`時,順帶還要執行`barrierCommand`指定的任務,並將`generation`切換到下一代,當然,最重要的還是要喚醒之前在柵欄處阻塞的執行緒。由於`trip`對應的`Condition`物件沒有任何地方會修改,因此`trip.signalAll()`會喚醒所有在該條件上等待的執行緒,如果執行緒在等待的過程中,其他執行緒將`generation`更新到下一代,就會出現被喚醒的執行緒中有部分還屬於之前那一代的情況。
接下來將會對`dowait`用到的一些方法進行簡單介紹。
- `breakBarrier`
`dowait`方法有四個地方呼叫了`breakBarrier`,從名字可以看出,該方法會將`generation.broken`設定為`true`,除此之外,還會還原`count`的值,並且喚醒所有被阻塞的執行緒:
```
private void breakBarrier() {
generation.broken = true;
count = parties;
//喚醒所有的阻塞執行緒
trip.signalAll();
}
```
縱觀`CyclicBarrier`原始碼,`generation.broken`統一在`breakBarrier`方法中被設定為`true`,而一旦將`generation.broken`設定為`true`之後,程式碼中檢查到這個狀態之後都會丟擲異常,柵欄就沒辦法再使用了(可以手動呼叫`reset`進行重置),而原始碼中會在以下幾種情況呼叫`breakBarrier`方法:
1) 當前執行緒被中斷
2)通過構造器傳入的任務執行失敗
3) 條件等待時被中斷
4) 執行緒等待超時
5) 顯式呼叫`reset`方法
- `nextGeneration`
```
private void nextGeneration() {
// 喚醒所有的阻塞執行緒
trip.signalAll();
// 開啟下一代
count = parties;
generation = new Generation();
}
```
- `reset`
`reset`方法主要是結束這一代,並切換到下一代
```
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
```
介紹到這裡,整個`CyclicBarrier`已經差不多介紹完了,但是內部的流程遠遠沒有這麼簡單,因為很大一部分邏輯封裝在`AbstractQueuedSynchronizer`中,這個類定義了阻塞的執行緒如何加入等待佇列,又如何被喚醒,因此如果想要深入瞭解執行緒等待的邏輯,還需要仔細研究`AbstractQueuedSynchronizer`才行。本文不會對這部分內容進行介紹,後面有時間的話將會專門對其進行