CountDownLatch原始碼探究 (JDK 1.8)
CountDownLatch
能夠實現讓執行緒等待某個計數器倒數到零的功能,之前對它的瞭解也僅僅是簡單的使用,對於其內部如何實現執行緒等待卻不是很瞭解,最好的辦法就是通過看原始碼來了解底層的實現細節。CountDownLatch
的原始碼並不是很複雜,因為其核心的功能是依賴AbstractQueuedSynchronizer
(下文簡稱AQS
)來實現的。CountDownLatch
常用的方法很少,但是因為涉及到AQS
,邏輯有些繞,要理清中間的邏輯稍微要費一些時間。
1.內部類Sync
CountDownLatch
的核心功能是通過內部類Sync
實現的,這個類繼承了AQS
:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //構造器,根據傳入的整數初始化狀態欄位state Sync(int count) { setState(count); } int getCount() { return getState(); } //tryAcquireShared唯一的作用是檢視狀態欄位是不是等於0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero //自旋,在兩種條件下會退出自旋:a)state欄位已經為0;b)執行緒成功地將state欄位減1 for (;;) { int c = getState(); //如果state已經為0,就返回false if (c == 0) return false; int nextc = c-1; //從下面的語句可以看到,只有當state=0才會返回true if (compareAndSetState(c, nextc)) return nextc == 0; } } }
2.構造器
CountDownLatch
只有一個構造器,在構造器中會初始化sync
欄位,結合Sync
類的定義可知,構造器的唯一工作是將state
欄位初始化為傳入的引數:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
3.節點狀態waitStatus
等待的執行緒會構造成節點放在等待佇列中,節點的狀態waitStatus
有如下幾種:
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
注意,在CountDownLatch
中並沒有用到CONDITION
狀態,因此後文將會直接忽略該狀態,當waitStatus > 0
時,指的就是CANCELLED
狀態。
4.核心方法
await()
當計數器沒不等於0
時,await()
方法會讓當前執行緒掛起,該方法呼叫了AQS
類的acquireSharedInterruptibly
方法,如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//顯然,tryAcquireShared方法只有在state=0時才返回1,表示計數器已歸零,此時方法直接返回,被阻塞的執行緒就可以繼續執行
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
通常,呼叫await()
的執行緒在執行到acquireSharedInterruptibly
方法時,計數器並不為0
,那麼當前執行緒就需要執行doAcquireSharedInterruptibly
方法中的阻塞邏輯了。由於該方法內部呼叫了三個主要方法:addWaiter
、shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
,在解析的過程中難免會穿插對這些方法的介紹,從而引入跳躍性。為了避免跳躍性引發的閱讀和理解上的困難,這裡準備先介紹addWaiter
方法。
addWaiter
private Node addWaiter(Node mode) {
//將當前執行緒構造成一個Node節點
Node node = new Node(Thread.currentThread(), mode);
//獲取尾節點
Node pred = tail;
//尾節點不為空,說明佇列已完成初始化
if (pred != null) {
//將node節點放到對尾,這裡的做法是先將node的prev指標指向尾節點,然後通過原子操作將新新增的node更新成尾節點,成功的話addWaiter方法結束
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//原子操作成功的話,更新原尾節點的next指標
pred.next = node;
return node;
}
}
//執行到這裡有兩種情況:1)尾節點為空,即佇列還沒初始化;2)佇列已初始化,但是上文將node節點設定成尾節點失敗,此時node節點還沒有真正新增進佇列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果佇列還沒初始化,則先初始化,做法是將一個空節點作為頭結點,然後讓尾節點也指向這個空節點
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//這裡會一直自旋,直到成功地將node節點更新成尾節點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter
方法的主要作用就是將當前執行緒新增到等待佇列的隊尾,如果佇列還沒初始化,則先初始化,enq
方法使用自旋避免入隊失敗。
doAcquireSharedInterruptibly
接下來正式開始介紹doAcquireSharedInterruptibly
方法,原始碼如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//將當前執行緒新增到等待佇列,注意引數是Node.SHARED,下文會用到
final Node node = addWaiter(Node.SHARED);
//該欄位在state=0時才會被設定為false
boolean failed = true;
try {
//又是自旋,該自旋的終止條件有兩種:1)state=0,計數器正常結束,執行return語句返回;2)執行緒響應中斷異常,跳出自旋
for (;;) {
//獲取node的前驅節點
final Node p = node.predecessor();
//如果前驅節點是頭結點,則執行if程式碼塊的邏輯
if (p == head) {
//獲取state欄位的狀態,如果state=0則返回1,否則返回-1
int r = tryAcquireShared(arg);
//r>=0,說明計數器結束了,需要喚醒阻塞的執行緒
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
//計數器正常結束時,會將failed設定為false,避免執行finally中的語句
failed = false;
return;
}
}
//執行到這裡說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裡
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果執行緒被中斷,那麼failed=true,執行cancelAcquire方法
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly
先通過addWaiter
方法將當前執行緒新增到等待佇列尾部,然後開始自旋。如果state
欄位不為0
,那麼會執行到末尾的條件語句:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
先來看看shouldParkAfterFailedAcquire
幹了些什麼:
//注意pred是node的前驅節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果已經是SIGNAL狀態,則之間返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//ws>0只能是cancelled狀態,此時通過修改指標將這些cancelled的節點從佇列刪除
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驅節點的狀態既不是SIGNAL,也不是CANCELLED,那麼只可能是0或者PROPAGATE,就把前驅節點的狀態更新為 Node.SIGNAL。注意:1)CONDITION狀態在CountDownLatch中並沒有用到;2)節點新建的時候狀態都是0,是在這裡才被修改成了SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
之前對節點的SIGNAL
狀態是怎麼來的一直有點迷糊,看了上面的程式碼才發現是在最後一個else
分支中設定的。從shouldParkAfterFailedAcquire
原始碼瞭解到,該方法只有在前驅節點狀態是SIGNAL
時才返回true
,此時才有機會執行parkAndCheckInterrupt
方法。parkAndCheckInterrupt
是真正讓執行緒掛起的地方,來看看其原始碼:
private final boolean parkAndCheckInterrupt() {
//執行緒最終會阻塞在這裡,執行緒恢復之後也將從這裡繼續執行
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt
方法藉助LockSupport
實現執行緒阻塞,被阻塞的執行緒在被喚醒後會返回當前執行緒的中斷狀態(注意Thread.interrupted()
會清除執行緒的中斷狀態)。好了,到這裡整個邏輯就比較清楚了,如果執行緒是正常被喚醒(即state=0
),那麼parkAndCheckInterrupt
返回false
,doAcquireSharedInterruptibly
方法會接著自旋一次,這裡再次將自旋程式碼貼出:
for (;;) {
//獲取node的前驅節點
final Node p = node.predecessor();
//如果前驅節點是頭結點,則執行if程式碼塊的邏輯
if (p == head) {
//獲取state欄位的狀態,如果state=0則返回1,否則返回-1
int r = tryAcquireShared(arg);
//r>=0,說明計數器結束了,需要喚醒阻塞的執行緒
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//執行到這裡說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裡
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
那麼setHeadAndPropagate
方法做了些什麼事呢,看看它的原始碼(刪掉了原始碼中的註釋):
//回憶一下,顯然propagate=1,node是當前插入到對尾的新節點
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//把node設定為頭結點
setHead(node);
//此時propagate > 0的條件已經滿足,直接執行if程式碼塊的邏輯
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果沒有下一個節點,或者下一個節點的isShared返回true,就釋放。還記得嗎,在構造新節點的時候addWaiter的引數是Node.SHARED,這裡就是判斷這個欄位
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
接下來看一下doReleaseShared
是如何實現的:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
//獲取頭結點
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果頭結點的狀態是SIGNAL,那麼會將其狀態修改為0,該步驟一直自旋直到成功為止
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//成功修改頭結點的狀態後,會執行下面這個方法
unparkSuccessor(h);
}
//如果頭結點狀態已經改成0了,就再次將其狀態更新為Node.PROPAGATE,目的是???
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
頭結點的狀態成功更新為0
後,會執行unparkSuccessor
方法的邏輯,該方法原始碼如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//獲取後繼節點
Node s = node.next;
//如果沒有後繼節點,或者後繼節點是CANCELLED狀態,則執行下面的程式碼塊
if (s == null || s.waitStatus > 0) {
s = null;
//從佇列末尾向開頭遍歷,找到靠近頭結點的第一個不為CANCELLED狀態的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到這樣的非CANCELLED節點,就將其喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor
的主要工作是將頭結點後面第一個非CANCELLED
狀態的節點所對應的執行緒喚醒。
cancelAcquire
到目前為止,並沒有發現CANCELLED
狀態是在哪裡設定,因為還有一個方法沒有分析。doAcquireSharedInterruptibly
中的finally
語句塊會處理執行緒被中斷的情況,執行的是cancelAcquire
方法的邏輯,其原始碼如下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//執行緒中斷後,將其對應的節點中儲存的執行緒清空
node.thread = null;
// Skip cancelled predecessors
//從佇列中刪除狀態為CANCELLED的節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//CANCELLED狀態在這裡設定
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
//如果當前是尾節點,其第一個非CANCELLED狀態的前驅節點設定為新的尾節點,pred後面的節點將會被GC回收。注意,下面的兩個原子操作,不管是否成功,都沒有重試
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
//當前執行緒對應的節點不是尾節點,其有後繼節點並且後繼節點不是CANCELLED狀態,通過修改指標將當前執行緒節點從佇列刪除
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//根據前面的if條件,在以下幾種情況時會執行到這裡,喚醒node節點的後繼節點
//1)pred=head,即當前被中斷的執行緒前面的所有執行緒都是CANCELLED狀態
//2)pred!=head,但是pred節點的狀態不等於SIGNAL,且將pred節點的狀態修改為SIGNAL失敗
//3)pred節點記錄的執行緒是null,目前已知頭結點的thread欄位確實為null,除此之外還有其他情況嗎???
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
分析到這裡,才剛把await()
的邏輯分析完,但是僅僅分析程式碼仍然是不夠的,因為本人分析到這裡的時候,腦袋仍然是蒙的,主要原因是缺少一個全域性的認識。程式碼放在這裡都能看懂,但是程式碼為什麼這樣寫?當計數器結束(即state=0
)時,佇列中的等待執行緒是一起全部換新,還是一個一個依次喚醒?執行緒被喚醒後重新執行doAcquireSharedInterruptibly
中的自旋時,和第一次執行到底有哪些地方不一樣呢?因此,有必要對以上的邏輯進行整體梳理。
看完這部分原始碼之後,發現核心的邏輯都包含在doAcquireSharedInterruptibly
中,現在是時候回過頭來整理一下該方法的邏輯了。
假設有現在有一個執行緒t1
執行了await
方法,由於等待佇列還沒初始化,因此先構造一個空的頭節點,並且把t1
構造成節點加到佇列中,如下圖:
接著,在shouldParkAfterFailedAcquire
方法中修改頭結點的狀態:
現在又有新的t2
執行緒執行了await
,此時佇列的結構將更新為下圖:
即每新增一個節點到等待隊尾,就將其前驅節點的狀態更新為Node.SIGNAL
(即-1
),然後所有的執行緒都阻塞在parkAndCheckInterrupt
方法裡。現在,計數器已經結束,最後一個執行countDown
方法的執行緒順帶執行了doReleaseShared
方法,將頭結點的waitStatus
更新成了0
,如下圖:
繼續向下執行到unparkSuccessor
方法,喚醒執行緒t1
,t1
從parkAndCheckInterrupt
方法中醒來,繼續自旋。t1
的前置節點就是頭結點head
,且state=0
,t1
開始執行setHeadAndPropagate
,將自己設定為頭結點,並在setHead
方法中將thread
和prev
欄位都設定為空,如下圖:
執行緒t1
接著執行doReleaseShared
方法,把頭節點(此時t1
就是頭結點)狀態更新為0
,並喚醒t2
,開始執行await
之後的邏輯,如下圖:
喚醒t2
後,t1
退出await
方法,此時佇列如下:
t2
開始執行後,同樣把自己設定為頭結點,如下:
在執行setHeadAndPropagate
方法時,t2
沒有後繼節點了,仍然會執行doReleaseShared
方法,但是在doReleaseShared
方法中,t2
即使頭結點也是尾節點,那就什麼也不做,直接結束並退出await
方法,此時佇列裡就只剩下一個頭結點了。
countDown
現在,終於可以開始看看countDown
方法的邏輯了:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//之前分析過,該方法會將state的值減1,並且只有在減1後state=0才會返回true,表示計數器結束了
if (tryReleaseShared(arg)) {
//喚醒後繼節點中第一個不為CANCELLED狀態的節點
doReleaseShared();
return true;
}
return false;
}
當一個執行緒將state
修改成0
時,順便還要執行doReleaseShared
方法,這個方法會將頭結點的後繼節點喚醒。
有一個小細節需要注意,doReleaseShared
方法在原始碼中有兩個地方呼叫,一個入口就是剛講的countDown
方法,另一個就是從await
方法進入,在setHeadAndPropagate
中呼叫,但是二者是有先後順序的是,是countDown
方法喚醒最前面的執行緒之後,再由該執行緒依次喚醒後面的執行緒