一行一行原始碼分析清楚AbstractQueuedSynchronizer(三)
一、前言
這是AQS系列的最後一篇了,第一篇我們通過ReetrantLock公平鎖分析了AQS的核心,第二篇講了非公平鎖和Condition,還寫一些執行緒中斷的知識。在這一篇我們將講解AQS共享模式的使用。有了前面的知識,相信這篇你會感覺很輕鬆。
本文先用 CountDownLatch 將共享模式說清楚,然後順著把其他 AQS 相關的類 CyclicBarrier、Semaphore 的原始碼一起過一下。
二、CountDownLatch
CountDownLatch 這個類是比較典型的 AQS 的共享模式的使用,這是一個高頻使用的類。latch 的中文意思是門栓、柵欄,具體怎麼解釋我就不廢話了,大家隨意,看兩個例子就知道在哪裡用、怎麼用了。
我們看下 Doug Lea 在 java doc 中給出的例子,這個例子非常實用,我經常會寫到這個程式碼。
假設我們有 N ( N > 0 ) 個任務,那麼我們會用 N 來初始化一個 CountDownLatch,然後將這個 latch 的引用傳遞到各個執行緒中,在每個執行緒完成了任務後,呼叫 latch.countDown() 代表完成了一個任務。呼叫 latch.await() 的方法的執行緒會阻塞,直到所有的任務完成。
class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = Executors.newFixedThreadPool(8); // 建立 N 個任務,提交給執行緒池來執行 for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); // 等待所有的任務完成,這個方法才會返回 doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); // 這個執行緒的任務完成了,呼叫 countDown 方法 doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ...} }
所以說 CountDownLatch 非常實用,我們常常會將一個比較大的任務進行拆分,然後開啟多個執行緒來執行,等所有執行緒都執行完了以後,再往下執行其他操作。這裡例子中,只有 main 執行緒呼叫了 await 方法。
我們再來看另一個例子,這個例子很典型,用了兩個 CountDownLatch:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); // 這邊插入一些程式碼,確保上面的每個執行緒先啟動起來,才執行下面的程式碼。 doSomethingElse(); // don't let run yet // 因為這裡 N == 1,所以,只要呼叫一次,那麼所有的 await 方法都可以通過 startSignal.countDown(); // let all threads proceed doSomethingElse(); // 等待所有任務結束 doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { // 為了讓所有執行緒同時開始任務,我們讓所有執行緒先阻塞在這裡 // 等大家都準備好了,再開啟這個門栓 startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ...} }
這個例子中,doneSignal 同第一個例子的使用,我們說說這裡的 startSignal。N 個新開啟的執行緒都呼叫了startSignal.await() 進行阻塞等待,它們阻塞在柵欄上,只有當條件滿足的時候(startSignal.countDown()),它們才能同時通過這個柵欄,目的是讓所有的執行緒站在一個起跑線上。
如果始終只有一個執行緒呼叫 await 方法等待任務完成,那麼 CountDownLatch 就會簡單很多,所以之後的原始碼分析讀者一定要在腦海中構建出這麼一個場景:有 m 個執行緒是做任務的,有 n 個執行緒在某個柵欄上等待這 m 個執行緒做完任務,直到所有 m 個任務完成後,n 個執行緒同時通過柵欄。
我們用以下程式來分析原始碼,t1 和 t2 負責呼叫 countDown() 方法,t3 和 t4 呼叫 await 方法阻塞:
import java.util.concurrent.CountDownLatch;
/**
* @author chenbjf
* @version 1.0
* @date 2021/10/28 13:42
*/
public class CountDownLatchExample {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
});
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
System.out.println("thread3 從await中醒過來");
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
});
Thread thread4 = new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
System.out.println("thread4 從await中醒過來");
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
});
thread1.start();
thread2.start();
thread3.start();
thread4.start();
}
}
上述程式經過10秒左右會返回結果如下(順序可能不是下面的樣子):
thread3 從await中醒過來
thread4 從await中醒過來
接下來先看一下CountDownLatch的建構函式,如下:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 老套路了,內部封裝一個 Sync 類繼承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//AQS中的state屬性設定為入參的count值。
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
程式碼都是套路,先分析套路:AQS 裡面的 state 是一個整數值,這邊用一個 int count 引數其實初始化就是設定了這個值,所有呼叫了 await 方法的等待執行緒會掛起,然後有其他一些執行緒會做 state = state - 1 操作,當 state 減到 0 的同時,那個將 state 減為 0 的執行緒會負責喚醒 所有呼叫了 await 方法的執行緒。都是套路啊,只是 Doug Lea 的套路很深,程式碼很巧妙,不然我們也沒有要分析原始碼的必要。
對於 CountDownLatch,我們僅僅需要關心兩個方法,一個是 countDown() 方法,另一個是 await() 方法。countDown() 方法每次呼叫都會將 state 減 1,直到 state 的值為 0;而 await 是一個阻塞方法,當 state 減為 0 的時候,await 方法才會返回。await 可以被多個執行緒呼叫,讀者這個時候腦子裡要有個圖:所有呼叫了 await 方法的執行緒阻塞在 AQS 的阻塞佇列中,等待條件滿足(state == 0),將執行緒從佇列中一個個喚醒過來。
接下來,我們按照流程一步一步走:先 await 等待,然後被喚醒,await 方法返回。首先,我們來看 await() 方法,它代表執行緒阻塞,等待 state 的值減為 0。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 這也是老套路了,我在第二篇的中斷那一節說過了
if (Thread.interrupted())
throw new InterruptedException();
// t3 和 t4 呼叫 await 的時候,state 都大於 0(state 此時為 2)。
// 也就是說,這個 if 返回 true,然後往裡看
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//通過名字可以知道這個方法是獲取共享鎖,並且此方法是可中斷的(中斷的時候丟擲 InterruptedException 退出這個方法)。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//先將執行緒新增到阻塞佇列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 同上,只要 state 不等於 0,那麼這個方法返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//接著走到這裡,現在park在這裡
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我們來仔細分析這個方法,執行緒 t3 經過第 1 步 addWaiter 入隊以後,我們應該可以得到這個:
由於 tryAcquireShared 這個方法會返回 -1,所以 if (r >= 0) 這個分支不會進去。到 shouldParkAfterFailedAcquire 的時候,t3 將 head 的 waitStatus 值設定為 -1,如下:
然後進入到 parkAndCheckInterrupt 的時候,t3 掛起。我們再分析 t4 入隊,t4 會將前驅節點 t3 所在節點的 waitStatus 設定為 -1,t4 入隊後,應該是這樣的:
然後,t4 也掛起。接下來,t3 和 t4 就等待喚醒了。接下來,我們來看喚醒的流程。為了讓下面的示意圖更豐富些,我們假設用 10 初始化 CountDownLatch。
我們再一步步看具體的流程。首先,我們看 countDown() 方法:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 只有當 state 減為 0 的時候,tryReleaseShared 才返回 true
// 否則只是簡單的 state = state - 1 那麼 countDown() 方法就結束了
// 將 state 減到 0 的那個操作才是最複雜的,繼續往下吧
if (tryReleaseShared(arg)) {
// 喚醒 await 的執行緒
doReleaseShared();
return true;
}
return false;
}
// 這個方法很簡單,用自旋的方法實現 state 減 1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
//countDown 方法就是每次呼叫都將 state 值減 1,如果 state 減到 0 了,那麼就呼叫下面的方法進行喚醒阻塞佇列中的執行緒:
// 呼叫這個方法的時候,state == 0
// 這個方法先不要看所有的程式碼,按照思路往下到我寫註釋的地方,我們先跑通一個流程,其他的之後還會仔細分析
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// t3 入隊的時候,已經將頭節點的 waitStatus 設定為 Node.SIGNAL(-1) 了
if (ws == Node.SIGNAL) {
// 將 head 的 waitStatue 設定為 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是這裡,喚醒 head 的後繼節點,也就是阻塞佇列中的第一個節點
// 在這裡,也就是喚醒 t3
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
一旦 t3 被喚醒後,我們繼續回到 await 的這段程式碼,parkAndCheckInterrupt 返回,我們先不考慮中斷的情況:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 2. 這裡是下一步
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 1. 喚醒後這個方法返回
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//接下來,t3 會進到 setHeadAndPropagate(node, r) 這個方法,先把 head 給佔了,然後喚醒佇列中其他的執行緒:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 下面說的是,喚醒當前 node 之後的節點,即 t3 已經醒了,馬上喚醒 t4
// 類似的,如果 t4 後面還有 t5,那麼 t4 醒了以後,馬上將 t5 給喚醒了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 又是這個方法,只是現在的 head 已經不是原來的空節點了,是 t3 的節點了
doReleaseShared();
}
}
又回到這個方法了,那麼接下來,我們好好分析 doReleaseShared 這個方法,我們根據流程,頭節點 head 此時是 t3 節點了:
// 呼叫這個方法的時候,state == 0
private void doReleaseShared() {
for (;;) {
Node h = head;
// 1. h == null: 說明阻塞佇列為空
// 2. h == tail: 說明頭結點可能是剛剛初始化的頭節點,
// 或者是普通執行緒節點,但是此節點既然是頭節點了,那麼代表已經被喚醒了,阻塞佇列沒有其他節點了
// 所以這兩種情況不需要進行喚醒後繼節點
if (h != null && h != tail) {
int ws = h.waitStatus;
// t4 將頭節點(此時是 t3)的 waitStatus 設定為 Node.SIGNAL(-1) 了
if (ws == Node.SIGNAL) {
// 這裡 CAS 失敗的場景請看下面的解讀
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是這裡,喚醒 head 的後繼節點,也就是阻塞佇列中的第一個節點
// 在這裡,也就是喚醒 t4
unparkSuccessor(h);
}
else if (ws == 0 &&
// 這個 CAS 失敗的場景是:執行到這裡的時候,剛好有一個節點入隊,入隊會將這個 ws 設定為 -1
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果到這裡的時候,前面喚醒的執行緒已經佔領了 head,那麼再迴圈
// 否則,就是 head 沒變,那麼退出迴圈,
// 退出迴圈是不是意味著阻塞佇列中的其他節點就不喚醒了?當然不是,喚醒的執行緒之後還是會呼叫這個方法的
if (h == head) // loop if head changed
break;
}
}
我們分析下最後一個 if 語句,然後才能解釋第一個 CAS 為什麼可能會失敗:
- h == head:說明頭節點還沒有被剛剛用 unparkSuccessor 喚醒的執行緒(這裡可以理解為 t4)佔有,此時 break 退出迴圈。
- h != head:頭節點被剛剛喚醒的執行緒(這裡可以理解為 t4)佔有,那麼這裡重新進入下一輪迴圈,喚醒下一個節點(這裡是 t4 )。我們知道,等到 t4 被喚醒後,其實是會主動喚醒 t5、t6、t7...,那為什麼這裡要進行下一個迴圈來喚醒 t5 呢?我覺得是出於吞吐量的考慮。
滿足上面的 2 的場景,那麼我們就能知道為什麼上面的 CAS 操作 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 會失敗了?因為當前進行 for 迴圈的執行緒到這裡的時候,可能剛剛喚醒的執行緒 t4 也剛剛好到這裡了,那麼就有可能 CAS 失敗了。for 迴圈第一輪的時候會喚醒 t4,t4 醒後會將自己設定為頭節點,如果在 t4 設定頭節點後,for 迴圈才跑到 if (h == head),那麼此時會返回 false,for 迴圈會進入下一輪。t4 喚醒後也會進入到這個方法裡面,那麼 for 迴圈第二輪和 t4 就有可能在這個 CAS 相遇,那麼就只會有一個成功了。
三、CyclicBarrier
字面意思是“可重複使用的柵欄”或“週期性的柵欄”,總之不是用了一次就沒用了的,CyclicBarrier 相比 CountDownLatch 來說,要簡單很多,其原始碼沒有什麼高深的地方,它是 ReentrantLock 和 Condition 的組合使用。看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重複使用(Cyclic)。
首先,CyclicBarrier 的原始碼實現和 CountDownLatch 大相徑庭,CountDownLatch 基於 AQS 的共享模式的使用,而 CyclicBarrier 基於 Condition 來實現。
因為 CyclicBarrier 的原始碼相對來說簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那麼這裡的原始碼是毫無壓力的,就是幾個特殊概念罷了。
先用一張圖來描繪下 CyclicBarrier 裡面的一些概念,和它的基本使用流程:
看圖我們也知道了,CyclicBarrier 的原始碼最重要的就是 await() 方法了。
public class CyclicBarrier {
// 我們說了,CyclicBarrier 是可以重複使用的,我們把每次從開始使用到穿過柵欄當做"一代",或者"一個週期"
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
// CyclicBarrier 是基於 Condition 的
// Condition 是“條件”的意思,CyclicBarrier 的等待執行緒通過 barrier 的“條件”是大家都到了柵欄上
private final Condition trip = lock.newCondition();
// 參與的執行緒數
private final int parties;
// 如果設定了這個,代表越過柵欄之前,要執行相應的操作
private final Runnable barrierCommand;
// 當前所處的“代”
private Generation generation = new Generation();
// 還沒有到柵欄的執行緒數,這個值初始為 parties,然後遞減
// 還沒有到柵欄的執行緒數 = parties - 已經到柵欄的數量
private int count;
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
首先,先看怎麼開啟新的一代:
// 開啟新的一代,當最後一個執行緒到達柵欄上的時候,呼叫這個方法來喚醒其他執行緒,同時初始化“下一代”
private void nextGeneration() {
// 首先,需要喚醒所有的在柵欄上等待的執行緒
trip.signalAll();
// 更新 count 的值
count = parties;
// 重新生成“新一代”
generation = new Generation();
}
開啟新的一代,類似於重新例項化一個 CyclicBarrier 例項,看看怎麼打破一個柵欄:
private void breakBarrier() {
// 設定狀態 broken 為 true
generation.broken = true;
// 重置 count 為初始值 parties
count = parties;
// 喚醒所有已經在等待的執行緒
trip.signalAll();
}
這兩個方法之後用得到,現在開始分析最重要的等待通過柵欄方法 await 方法:
/ 不帶超時機制
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 帶超時機制,如果超時丟擲 TimeoutException 異常
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
繼續往裡看:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 先要獲取到鎖,然後在 finally 中要記得釋放鎖
// 如果記得 Condition 部分的話,我們知道 condition 的 await() 會釋放鎖,被 signal() 喚醒的時候需要重新獲取鎖
lock.lock();
try {
final Generation g = generation;
// 檢查柵欄是否被打破,如果被打破,丟擲 BrokenBarrierException 異常
if (g.broken)
throw new BrokenBarrierException();
// 檢查中斷狀態,如果中斷了,丟擲 InterruptedException 異常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// index 是這個 await 方法的返回值
// 注意到這裡,這個是從 count 遞減後得到的值
int index = --count;
// 如果等於 0,說明所有的執行緒都到柵欄上了,準備通過
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果在初始化的時候,指定了通過柵欄前需要執行的操作,在這裡會得到執行
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 如果 ranAction 為 true,說明執行 command.run() 的時候,沒有發生異常退出的情況
ranAction = true;
// 喚醒等待的執行緒,然後開啟新的一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 進到這裡,說明執行指定操作的時候,發生了異常,那麼需要打破柵欄
// 之前我們說了,打破柵欄意味著喚醒所有等待的執行緒,設定 broken 為 true,重置 count 為 parties
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 如果是最後一個執行緒呼叫 await,那麼上面就返回了
// 下面的操作是給那些不是最後一個到達柵欄的執行緒執行的
for (;;) {
try {
// 如果帶有超時機制,呼叫帶超時的 Condition 的 await 方法等待,直到最後一個執行緒呼叫 await
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果到這裡,說明等待的執行緒在 await(是 Condition 的 await)的時候被中斷
if (g == generation && ! g.broken) {
// 打破柵欄
breakBarrier();
// 打破柵欄後,重新丟擲這個 InterruptedException 異常給外層呼叫的方法
throw ie;
} else {
// 到這裡,說明 g != generation, 說明新的一代已經產生,即最後一個執行緒 await 執行完成,
// 那麼此時沒有必要再丟擲 InterruptedException 異常,記錄下來這個中斷資訊即可
// 或者是柵欄已經被打破了,那麼也不應該丟擲 InterruptedException 異常,
// 而是之後丟擲 BrokenBarrierException 異常
Thread.currentThread().interrupt();
}
}
// 喚醒後,檢查柵欄是否是“破的”
if (g.broken)
throw new BrokenBarrierException();
// 這個 for 迴圈除了異常,就是要從這裡退出了
// 我們要清楚,最後一個執行緒在執行完指定任務(如果有的話),會呼叫 nextGeneration 來開啟一個新的代
// 然後釋放掉鎖,其他執行緒從 Condition 的 await 方法中得到鎖並返回,然後到這裡的時候,其實就會滿足 g != generation 的
// 那什麼時候不滿足呢?barrierCommand 執行過程中丟擲了異常,那麼會執行打破柵欄操作,
// 設定 broken 為true,然後喚醒這些執行緒。這些執行緒會從上面的 if (g.broken) 這個分支拋 BrokenBarrierException 異常返回
// 當然,還有最後一種可能,那就是 await 超時,此種情況不會從上面的 if 分支異常返回,也不會從這裡返回,會執行後面的程式碼
if (g != generation)
return index;
// 如果醒來發現超時了,打破柵欄,丟擲異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
好了,我想我應該講清楚了吧,我好像幾乎沒有漏掉任何一行程式碼吧?
下面開始收尾工作。
首先,我們看看怎麼得到有多少個執行緒到了柵欄上,處於等待狀態:
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
判斷一個柵欄是否被打破了,這個很簡單,直接看 broken 的值即可:
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
前面我們在說 await 的時候也幾乎說清楚了,什麼時候柵欄會被打破,總結如下:
- 中斷,我們說了,如果某個等待的執行緒發生了中斷,那麼會打破柵欄,同時丟擲 InterruptedException 異常;
- 超時,打破柵欄,同時丟擲 TimeoutException 異常;
- 指定執行的操作丟擲了異常,這個我們前面也說過。
最後,我們來看看怎麼重置一個柵欄:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
我們設想一下,如果初始化時,指定了執行緒 parties = 4,前面有 3 個執行緒呼叫了 await 等待,在第 4 個執行緒呼叫 await 之前,我們呼叫 reset 方法,那麼會發生什麼?
首先,打破柵欄,那意味著所有等待的執行緒(3個等待的執行緒)會喚醒,await 方法會通過丟擲 BrokenBarrierException 異常返回。然後開啟新的一代,重置了 count 和 generation,相當於一切歸零了。
怎麼樣,CyclicBarrier 原始碼很簡單吧。
四、Semaphore
有了 CountDownLatch 的基礎後,分析 Semaphore 會簡單很多。Semaphore 是什麼呢?它類似一個資源池(讀者可以類比執行緒池),每個執行緒需要呼叫 acquire() 方法獲取資源,然後才能執行,執行完後,需要 release 資源,讓給其他的執行緒用。
大概大家也可以猜到,Semaphore 其實也是 AQS 中共享鎖的使用,因為每個執行緒共享一個池嘛。
套路解讀:建立 Semaphore 例項的時候,需要一個引數 permits,這個基本上可以確定是設定給 AQS 的 state 的,然後每個執行緒呼叫 acquire 的時候,執行 state = state - 1,release 的時候執行 state = state + 1,當然,acquire 的時候,如果 state = 0,說明沒有資源了,需要等待其他執行緒 release。
建構函式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
這裡和 ReentrantLock 類似,用了公平策略和非公平策略。看 acquire 方法:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
這幾個方法也是老套路了,大家基本都懂了吧,這邊多了兩個可以傳參的 acquire 方法,不過大家也都懂的吧,如果我們需要一次獲取超過一個的資源,會用得著這個的。
我們接下來看不丟擲 InterruptedException 異常的 acquireUninterruptibly() 方法吧:
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
前面說了,Semaphore 分公平策略和非公平策略,我們對比一下兩個 tryAcquireShared 方法:
// 公平策略:
protected int tryAcquireShared(int acquires) {
for (;;) {
// 區別就在於是不是會先判斷是否有執行緒在排隊,然後才進行 CAS 減操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 非公平策略:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
也是老套路了,所以從原始碼分析角度的話,我們其實不太需要關心是不是公平策略還是非公平策略,它們的區別往往就那麼一兩行。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
由於 tryAcquireShared(arg) 返回小於 0 的時候,說明 state 已經小於 0 了(沒資源了),此時 acquire 不能立馬拿到資源,需要進入到阻塞佇列等待,雖然貼了很多程式碼,不在乎多這點了:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個方法我就不介紹了,執行緒掛起後等待有資源被 release 出來。接下來,我們就要看 release 的方法了:
// 任務介紹,釋放一個資源
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// 溢位,當然,我們一般也不會用這麼大的數
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared 方法總是會返回 true,然後是 doReleaseShared,這個也是我們熟悉的方法了,我就貼下程式碼,不分析了,這個方法用於喚醒所有的等待執行緒:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
五、總結
寫到這裡,終於把 AbstractQueuedSynchronizer 基本上說完了,對於 Java 併發,Doug Lea 真的是神一樣的存在。日後我們還會接觸到很多 Doug Lea 的程式碼,希望我們大家都可以朝著大神的方向不斷打磨自己的技術,少一些高大上的架構,多一些實實在在的優秀程式碼吧。