併發系列(八)-----AQS詳解獨佔模式資源的獲取與釋放
一 簡介
上一篇總結了AQS的整體架構,以及它的組成,和它們之間的關係。AQS主要的三部分分別是volatile修飾的變數、同步佇列和等待佇列其中,同步佇列在上篇總結中已經介紹過了,不知道的話可以點這裡AQS的框架組成以及同步佇列原始碼解析。這一片文章主要總結獨佔模式下資源的獲取。
二 資源的獲取原始碼解析
在上一篇總結中,最後過原始碼的時候看到addWater(),同時我們也提出兩個猜想獲取資源的兩種方式
猜想一:執行緒上來就直接獲取,如果獲取成功的話那就執行了,獲取失敗的話被封裝成節點新增到同步佇列中
猜想二:執行緒一上來看看佇列中有沒有要同步的節點,如果有的話那就不獲取資源了直接新增到同步佇列中,等待上一個節點喚醒。
現在看一下操作stste的方法有哪些
/** * 返回當前同步狀態 */ protected final int getState() { return state; } /** * 設定當前的同步狀態 */ protected final void setState(int newState) { state = newState; } /** * 使用CAS來更新state的值 * * @param except 期望值 * @param update 更新值 * @return 更新是否成功 */ protected final boolean compareAndSetState(int except, int update) { return unsafe.compareAndSwapInt(this, stateOffset, except, update); }
AQS下面這個方法就是來獲取資源state的
/** * 以獨佔模式獲取,忽略中斷。實現 至少呼叫一次{@link #tryAcquire}, * 成功迴歸。 否則,執行緒可能會排隊 * * @param arg 資源請求 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
看到上面的程式碼我們看到了比較熟悉的方法就是addWaier(),這個方法返回一個封裝好執行緒的節點,被當作引數傳遞到acquireQueued()這個方法中。但是acquireQueued執行不執行取決與前面的tryAcquire()這個方法。當tryAcquire()返回false的時候才會去執行acquireQueued()這個方法。再看一下tryAcquire()這個方法。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
盡然丟擲一個異常,在第一遍的文章說過AQS是基於模板方法的框架,既然使用的是模板方法那就需要子類去實現了,在ReentrantLock內部類Sync中是AQS的實現。下面是原始碼
@Override
protected final boolean tryAcquire(int acquire) {
return nonfairTryAcquire(acquire);
}
final boolean nonfairTryAcquire(int acquires) {
//獲取但當前執行緒
final Thread current = Thread.currentThread();
//獲取資源的狀態
int c = getState();
if (c == 0) {
//如果資源的狀態為0的話說明state是沒有執行緒持有當前資源的
if (compareAndSetState(0, acquires)) {
//使用CAS替換,如果成功那就將獨佔執行緒設為當前執行緒,也就意味著當前執行緒
//擁有執行時間了
setExclusiveOwnerThread(current);
//獲取的資源返回true
return true;
}
} else if (current == getExclusiveOwnerThread()) {
//如果state不為0的話獨佔執行緒是當前執行緒的話那麼給state加一,這裡是
//重入鎖的實現
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
//如果沒獲取到資源的話返回false
return false;
}
好了tryAcquire()方法幹什麼的已經知道了,如果獲取到資源的話返回true,沒有獲取到資源的話就返回false,現在再看acquire()這個方法,當執行緒獲取到資源的時候返回的是true ,!true也就是false,那就不必在在執行&&面的語句了,如果沒有獲取到資源,就要執行後面的語句了,首先將當前執行緒包裝成一個節點新增到對列尾部並返回這個節點。既然包裝了,那就要處理將這個節點了。acquireQueued方法就是幹這個的,下面是原始碼
/**
* 將競爭節點設定為頭節點,同時當前節點不是頭節點的話
*
* @param node 要獲取頭節點的節點
* @param arg state狀態引數
* @return 返回true表示執行緒發生了中斷
*/
final boolean acquireQueued(final Node node, int arg) {
//這個變數來看是否要取消節點的競爭
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
//獲取node的前驅節點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//當前的前驅節點是頭節點,那麼當前節點就獲取資源
//將node設定為頭節點
setHead(node);
//消除引用有利於垃圾回收
p.next = null;
failed = false;
return interrupted;
}
//這裡是for迴圈的移動條件跳過前驅接節點未取消狀態的節點,
//當前執行緒中斷的話只能返回去等待了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
//當執行緒中斷直接取消當前節點競爭
if (failed) {
cancelAcquire(node);
}
}
}
對於上面一段原始碼來說,獲取資源是不難理解的,但是沒有獲取到資源時候執行了一個if語句,看一下if語句中兩個方法中分別做了什麼。下面是原始碼
/**
* 檢查是否可以在節點後新增競爭節點,同時檢查node前驅節點是否取消,如果取消了就要將這個節點
* 移除掉,如果在前驅節點等待中返回true
*
* @param pred 前驅節點
* @param node 當前節點
* @return 返回boolean
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前驅節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
return true;
}
if (ws > 0) {
do {
//如果前驅節點的的狀態為取消狀態那麼跳過直到找到可以
//獲取競爭的node
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
當node中的前驅節點是等待中的時候就會執行下一個方法,下一個方法的原始碼如下
/**
* 將執行緒至於waiting中 並返回執行緒是否中斷
*
* @return 中斷執行緒的boolean
*/
private final boolean parkAndCheckInterrupt() {
//將執行緒至於waiting中
LockSupport.park(this);
return Thread.interrupted();
}
看到這裡基本上已經清楚了,acquireQueued()方法就是如果可以獲取到資源的時候直接獲取,不能獲取到資源時檢查父節點是否在等待狀態中,如果在等待中,就呼叫LockSupport.park(),將當前執行緒至於等待狀態,等待中斷或著喚醒。AQS獲取資源也就完了。
AQS獲取資源的總結:
1.首先先使用CAS獲取支援state。如果獲取成功的話就不在新增節點了,如果獲取失敗的話將當前執行緒封裝到node中。
2.在新增節點的時候判斷前驅節點是否是頭節點,如果前驅節點是頭節點的話,繼續for迴圈獲取資源。如果不是頭節點的話檢查當前的前驅節點是否為空,將當前節點的所有前面的節點為取消狀態的全都去掉。去掉之後當前節點還不是頭節點時,將執行緒至於waiting狀態等待中斷或喚醒。
上面的總結也就驗證了的猜想一,其實就是非公平鎖的實現,猜想二是公平的鎖的實現。可以在ReentrantLock中的內部類FairSync看到公平鎖的實現。
三 資源的釋放
關於支援的釋放我認為是比較簡單的,可以大體的猜想一下,找到持有資源的node節點,將state的值設定為0,再將當前node節點從同步佇列中移除掉。然後喚醒下一個節點的執行緒。下面是原始碼
/**
* 釋放資源,並喚醒下一個節點
*
* @param arg 狀態
* @return 釋放成功
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) {
//喚醒下一個節點
unparkSuccessor(h);
}
return true;
}
return false;
}
上面原始碼幹什麼已經在註釋中說的很清楚了,但是沒有看到資源是如何操作的。其中資源的操作就在tryRelease()方法中下面是原始碼。
/**
* 執行緒呼叫釋放鎖
*
* @param arg 狀態
* @return 是否釋放成功
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
這個方法和tryAcquire()方法一樣都是拋了一個異常我們看其中子類的實現。下面是原始碼。
/**
* 釋放資源
*
* @param release 釋放鎖的數字
* @return 是否釋放成功
*/
@Override
protected final boolean tryRelease(int release) {
//獲取到資源並減去資源
int c = getState() - release;
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
free = true;
//當state為0的時候表示資源釋放完成想獨佔的執行緒設定為null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
四 總結
從上面的原始碼中我們可以看出,鎖的實現無非就是資源的獲取,與佇列的操作,執行緒狀態的轉化。
獲取資源時:先操作state,操作成功就直接獲取到了,操作不成功新增到同步佇列中,呼叫LockSuppport.park(),將執行緒至於waiting狀態等待中斷或喚醒。
資源釋放:先操作state,操作成功的話將佇列中的當前節點移除,喚醒下一個節點。