CountDownLatch、ReentrantLock原始碼解析
1.AQS
因業務中在用多執行緒並行執行程式碼塊中會用到CountDownLatch來控制多執行緒之間任務是否完成的通知,最近突然想去看一下CountDownLatch在await及喚醒是如何實現的,便開始了閱讀原始碼及查閱資料,然後打開了一個新大門。發現它是基於AbstractQueuedSynchronizer(下文簡稱AQS)框架實現的。
所以我們先了解AQS是幹什麼的。它提供的功能可以概括為兩點:獲取資源,如果獲取失敗加入等待佇列並且休眠該執行緒;釋放資源,同時檢查是否符合喚醒等待佇列中執行緒的條件,如符合就喚醒執行緒繼續執行。
ReentrantLock,CountDownLatch,ReentrantReadWriteLock等都是基於這個類做的。
它提供了幾個方法讓子類進行復寫以實現各自的功能:
tryAcquire(int arg)//嘗試獲取獨佔資源
tryRelease(int arg)//嘗試釋放獨佔資源
isHeldExclusively()//判斷當前執行緒是否獲取獨佔資源
tryAcquireShared(int arg)//嘗試獲取共享資源
tryReleaseShared(int arg)//嘗試釋放共享資源
這5個方法可以分為兩類獨佔類介面(前3個)和共享類介面(後兩個),因為一個子類中一般只需要(也應該如此,ReentrantReadWriteLock同時需要獨佔和共享,但也是分成兩個類來實現的)實現其中一類方法簇,所以作者並沒有把他們寫成抽象方法,這樣對AQS的子類更友好。
這裡對AQS獨佔和共享概念解釋一下,這是對AQS的資源(也就是state欄位)的描述。即在滿足可以獲取資源的條件後,在佇列中的等待的執行緒是喚醒一個(一個執行緒獨佔)還是說等待的執行緒都可以喚醒(共享)。
對於AQS的state欄位,也就是執行緒搶奪的資源,不同的子類有不同的定義,標題中提到的CountDownLatch和ReentrantLock正好是對state有不同的概念,看到對這兩個類的分析,大家就自然清楚了。AQS內部實現了對資源的獲取,釋放邏輯,讓子類實現的主要是嘗試獲取和釋放資源的場景邏輯。
下面對AQS內部邏輯進行分析,不過其子類根本不用關注這些邏輯,這些方法主要實現了:獲取資源,獲取失敗後加入佇列並且讓執行緒阻塞,釋放資源滿足執行緒監視的條件後,從等待佇列中剔除並喚醒相應的執行緒。,所以在看CountDownLatch,ReentrantLock程式碼時可以不細看這個方法的實現,先了解每個方法的作用,搞清楚子類的邏輯後,可以在慢慢研究AQS內部控制。
1.1 獲取共享資源
//AQS原始碼
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
獲取資源的程式碼,首先呼叫tryAcquireShared 這個方法來判斷是否可以獲取到鎖(這個方法交友子類實現其判斷是否可以獲取的邏輯),如果可以獲取任何程式碼都不執行,如果不可以獲取就進入if執行doAcquireShared(arg)
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//1
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {//2
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//3
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因為不能獲取到資源,所有執行緒要加入到等待佇列中並且執行緒進入阻塞狀態。這裡要說一下AQS的等待佇列是由一個雙向連結串列實現的,節點Node有前後節點,當前執行緒,等待狀態值這幾個欄位組成。所以1處就是在連結串列中加入一個共享模式的節點。這個方法我就不寫了,裡面用了CAS+自旋的無鎖的方式 確保在多執行緒下可以正確插入。
在2處判斷當前節點是不是當前佇列中第一個節點(head頭節點就是一個空節點,head指向的下一個節點,才是等待佇列中的第一個節點執行緒),如果是第一個,在次檢測一下是否可以獲取到鎖,如果可以獲取鎖了,該執行緒就直接從佇列裡剔除,繼續執行了(可能有人問之前不是嘗試獲取過一次嗎,這裡幹嘛還在嘗試,我認為是為了減少執行緒無價值的狀態變更吧),如果依舊不能獲取資源,進行3處,shouldParkAfterFailedAcquire這個方法是判斷該執行緒該不該進行休眠
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//2
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
可以看到通過LockSupport.park把執行緒置成阻塞狀態。
1.2 釋放共享資源
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {//1
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
可以看到1處是一個死迴圈,如果當前節點釋放後,會一直便利直到佇列為空或者進入不可喚醒狀態
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
釋放共享節點AQS從佇列中剔除並且uppark該執行緒。
2.CountDownLatch實現
CountDownLatch的核心就是實現AQS的Sync內部類
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {//1
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {//2
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
可以看到1,2處實現了tryAcquireShared和tryReleaseShared方法,結合上面AQS原始碼分析應該很容易明白了。
public CountDownLatch(int count) {//CDL類構造方法
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
可以看到await方法呼叫的AQS的方法,該方法 if (tryAcquireShared(arg) < 0) 執行嘗試呼叫的方法,這個由Sync實現,其實現就判斷當前state是否等於0,等於說明coutnDown呼叫初始化的次數,不用阻塞。countDown也就會對state-1在Sync使用CAS進行了處理。
3.ReentrantLock
可重入鎖提供了公平與非公平鎖兩種模式,他們的唯一區別就是在搶佔鎖的順序,所以在內部實現的公平與非公平也是在lock()方法有所區別,公平模式下,有一個FIFO的佇列進行排序。
這裡以非公平鎖為列
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
這裡state為0表示當前沒有執行緒持有該鎖,>0表示有執行緒持有該鎖,因為是可重入,所以當前執行緒lock一次,state就會+1,如下程式碼1處所示
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;//1
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可重入鎖使用的是AQS的獨佔模式,在細節上有些不同,AQS也是有兩套方法,整個邏輯是一致的,不同點
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在喚醒等待程序這塊,只會喚醒一次AQS佇列的第一個節點。在共享方法裡是一個迴圈。