圖解java.util.concurrent原始碼(三) Reentrantlock && Semaphore
引言
Reentrantlock和Semaphore分別是AQS在獨佔模式和共享模式下經典的實現,在理解AQS的情況下看這兩個類的程式碼會感到非常簡單,如果還沒理解AQS的話,建議先讀我這個系列的第一篇文章
複習AQS
回憶一下AQS,AQS中維護了一個state同步狀態,它的子類只需要實現以下幾個方法,並在方法中修改判斷state的值即可:
獨佔模式的同步器(比如Reentrantlock)需要實現:
- tryAcquire
- tryRelease
共享模式的同步器(比如Semaphore)則需要實現:
- tryAcquireShared
- tryReleaseShared
如果想要進一步使用AQS的ConditionObject進行執行緒間同步的話,則子類還應該實現下面的方法:
- isHeldExclusively
下面就中點分析這幾個方法。
Reentrantlock
開啟ReentrantLock最常用的三個方法看看(分別是lock,unlock和newCondition),果然全部委託給了叫做sync的內部類物件:
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
而Sync內部類其實就是AQS的實現:
abstract static class Sync extends AbstractQueuedSynchronizer
而Reentrantlock中還有兩個Sync的子類內部類:
static final class NonfairSync extends Sync
static final class FairSync extends Sync
在ReentrantLock中真正使用的是這兩個子類,分別對應非公平鎖與公平鎖。公平鎖能夠保證執行緒按照先進先出(FIFO)的方式獲得鎖,但是一般認為公平鎖的效能不如非公平鎖。
下面我們帶著兩個問題繼續閱讀:
- ReentrantLock是如何實現可重入(即同一個執行緒可以持有該鎖的情況下多次lock)的?
- 公平鎖的FIFO是怎麼實現的呢?
非公平鎖
非公平鎖NonfairSync的lock方法實現:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
發現它會先嚐試一下立即獲得鎖,如果失敗的話則退化為正常AQS獲鎖流程(即父類AQS中的acquire方法),這裡注意到acquire方法接收的引數是1。
tryAcquire方法的實現:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
發現它直接呼叫的是父類的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//state == 0表示該鎖處於空閒狀態
if (c == 0) {
//獲得鎖成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { //執行緒重入的情況
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果發現鎖處於空閒狀態(state == 0
),則嘗試獲得鎖,否則的話,先判斷一下重入的情況,如果是重入的情況(current == getExclusiveOwnerThread()
),則將同步狀態state加1(int nextc = c + acquires;
),這裡的acquires
的值只可能是1,因為我們之前看到lock
方法中始終呼叫的是acquire(1)
。
再看一下tryRelease方法,tryRelease方法在父類Sync中,也就是說公平鎖與非公鎖共用的是同一個tryRelease方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//減1
//如果釋放的執行緒不是持有鎖的執行緒,則丟擲異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //鎖已經釋放完全的狀態
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
大體上做的事情就是將同步狀態state減1,如果發現減到了零的話,則通過setExclusiveOwnerThread
將AQS的exclusiveOwnerThread
變數置空,如果已經減到零了,執行緒再次呼叫unlock方法的話,則會因為Thread.currentThread() != getExclusiveOwnerThread()
的判斷條件丟擲IllegalMonitorStateException
異常。
看到這裡我們可以回答上面提出的第一個問題了:
ReentrantLock是如何實現可重入的呢?
答:通過維護同步狀態state的含義為“執行緒重入的次數”,每次執行緒重入將其加1,釋放鎖將其減1,直到減成0,將其徹底釋放。
順手去看看為了支援執行緒間同步(newCondition
)而實現的isHeldExclusively
方法(位於Sync類中):
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
發現非常簡單,就是判斷一下持有鎖的執行緒是否是當前執行緒,我在第一篇將AQS的文章中說過,ConditionObject的signal方法會首先呼叫一下isHeldExclusively
方法確認呼叫方法的執行緒是否持有鎖。
公平鎖
FairSync的lock方法的實現:
final void lock() {
//非公平鎖與公平鎖的不同之處一
acquire(1);
}
發現非常簡單粗暴,直接呼叫AQS父類的acquire
方法,AQS中維護的CLH佇列就是FIFO的,所以這裡直接呼叫acquire即可。而之前的非公平鎖的“非公平”又體現再哪裡呢?重看一下NonfairSync的lock方法,發現其實就體現在:執行緒會先嚐試一次“插隊”,直接設定state獲得鎖,然後才會呼叫acquire方法走FIFO的CLH佇列,在這個過程中有可能造成CLH佇列中等待的執行緒被後來的執行緒給“插隊”了,就是這個"插隊"的行為導致了“不公平”。
上述的修改依舊沒能完全制止執行緒插隊的機會,AQS的acquire方法中也會先嚐試先用tryAcquire
方法插隊,然後才進入CLH佇列,所以FairSync對tryAcquire
方法也進行了細微的修改(相比NonfairSync):
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {// 初始化狀態
//hasQueuedPredecessors()是公平鎖與非公平鎖的區別二
//這個方法來自於AQS
//hasQueuedPredecessors判斷當前執行緒是否是CLH佇列的隊頭
//如果在CLH佇列中沒有前繼且CAS成功才能成功獲得鎖
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {// 重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可以看出這段程式碼和NonfairSync的tryAcquire
基本相同,除了在獲得鎖的判斷條件上添加了一個hasQueuedPredecessors
,這個方法來自於父類AQS,如果當前執行緒是CLH佇列的隊頭則返回false,否則返回true。
為什麼要做這一層防護呢?因為在AQS的acquire方法中,執行緒仍然會先嚐試呼叫tryAcquire
方法插個隊,之後才進入acquireQueued
方法:
//AQS中的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
執行緒在進入acquireQueued
方法之後就徹底是FIFO的了,所以要在前面的tryAcquire
再進行一道防護,防止在這裡"插隊"。
上面的這段文字就回答了之前提出的第二個問題,“公平鎖的FIFO是怎麼實現的呢?”
hasQueuedPredecessors
方法在將AQS時候漏講了,這裡補充一下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s; //s代表等待佇列的第一個節點
//h != t 是為了判斷CLH佇列為空的情況
//(s = h.next) == null 說明此時有另一個執行緒正在嘗試成為頭節點,詳見AQS的acquireQueued方法
//s.thread != Thread.currentThread() 此執行緒不是等待的頭節點
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
判斷條件還是有點難理解的,h != t
比較顯然,是為了判斷CLH佇列為空的情況,接下來的條件是在佇列不為空的情況下進行判斷,我逐個分析:
-
(s = h.next) == null
首先複習一下AQS中的CLH佇列,它的頭結點代表當前獲得鎖的執行緒,而頭節點的下一個節點才代表等待佇列的第一個執行緒。
所以這裡先通過
s = h.next
取到等待佇列的第一個節點賦給s。這裡
h.next
有可能為null,這就要複習一下AQS的acquireQueued方法了,當等待佇列的第一個執行緒獲得鎖時,它會將頭節點的next置空,這個置空next的執行緒顯然是呼叫hasQueuedPredecessors
的前繼之一,所以返回true -
s.thread != Thread.currentThread()
當明白s節點代表的就是等待佇列的第一個的時候,這個也就很簡單了,如果第一個不是當前執行緒,則肯定是存在前繼的,返回true即可。
Semaphore
Semaphore用來在併發下管理數量有限的資源,是典型的共享模式下的AQS的實現。
和ReentrantLock一樣,也分為公平模式和非公平模式。
Semaphore的關鍵方法如下:
- acquire 獲得許可
- release 釋放許可
Semaphore並不支援使用CondionObject進行執行緒間的同步。
看看acquire方法:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
直接呼叫了AQS的acquireSharedInterruptibly
方法,表明以共享模式使用AQS
再看看release方法:
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
也是直接調了AQS的releaseShared
方法,共享模式釋放。
非公平鎖
NonfairSync的tryAcquire
方法:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
再去其父類看nonfairTryAcquireShared
方法:
final int nonfairTryAcquireShared(int acquires) {
//CAS迴圈
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
很好懂,只要明確一點就能夠看懂Semaphore的程式碼:
- Semaphore的AQS中的同步狀態state代表的是剩餘許可的數量
上面那段程式碼其實就是通過CAS迴圈不斷嘗試減少響應數量的許可。
tryRelease
方法也非常簡單,就是通過CAS迴圈不斷嘗試增加相應數量的許可:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
公平鎖
和ReentrantLock一樣,就是加了一個hasQueuedPredecessors
的判斷而已:
protected int tryAcquireShared(int acquires) {
for (;;) {
//和非公鎖的區別
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}