JUC原始碼分析(三)--Condition原始碼分析
背景介紹
JUC中關於鎖的大部分同步控制類都基於AQS(JUC-AbstractQueuedSynchronizer(AQS)原始碼分析),ReentrantLock也不例外,lock()和unlock()都是基於ReentrantLock中繼承AQS的子類Sync
實現(JUC-ReentrantLock原始碼分析),但是在上篇部落格中只是簡單介紹了lock()和unlock(),這篇部落格就通過一段示例程式碼介紹下ReentrantLock的Condition功能.此外,還將分析下Node.waitStatue
的值是如何變化的.
Condition實現思路
Condition是由AQS的內部類ConditionObject
ConditionObject
是AQS中的非靜態內部類
,所以AQS與ConditionObject
是1:n的關係,1個AQS可以有多個ConditionObject
,但ConditionObject
只屬於一個AQS. 由於Condition同樣有同步器的語義,在
ConditionObject
中同樣維護著一個雙端雙向佇列,不過佇列中的每個節點封裝了一個等待signal訊號的執行緒(即呼叫await()之後的執行緒) 實現思路:
- condition.await()–>將當前執行緒加入Condition佇列,釋放當前執行緒擁有的鎖資源,掛起當前執行緒
- condition.signal()–>將Condition佇列的一個執行緒轉移至AQS佇列,喚醒該執行緒
程式碼
/**
* @author pfjia
* @since 2018/3/5 10:42
*/
public class ConditionDemo {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
//1.waitThread執行lock.lock()
lock.lock();
System.out.println(Thread.currentThread().getName() + "正在執行");
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "停止執行,等待一個signal");
//3.waitThread執行condition.await().釋放鎖,喚醒signalThread
condition.await();
//7.執行condition.await()被阻塞之後的程式碼
Thread.sleep(100000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "獲得一個signal,繼續執行");
lock.unlock();
}, "waitThread");
thread1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread2 = new Thread(() -> {
//2.signalThread執行lock.lock()(因為main執行緒中睡眠1s,而waitThread中睡眠2s,所以此語句先執行),signalThread被阻塞
lock.lock();
//4.lock.lock()中被阻塞的後半部分程式碼
System.out.println(Thread.currentThread().getName() + "正在執行");
//5.signalThread執行condition.signal(),將Condition佇列中的waitThread轉移至AQS佇列中
condition.signal();
System.out.println(Thread.currentThread().getName() + "傳送一個signal");
System.out.println(Thread.currentThread().getName() + "傳送一個signal後,結束");
//6.釋放鎖
lock.unlock();
}, "signalThread");
thread2.start();
}
}
程式碼分析
程式碼中共有三個執行緒
- main執行緒:負責生成waitThread和signalThread
- waitThread:首先獲取鎖,而後呼叫condition.await()
- signalThread:獲取鎖時被佔用,等待waitThread呼叫condition.await()釋放鎖後獲取鎖,而後呼叫condition.signal()
程式碼分析
我們根據程式碼的執行順序來分析下程式碼
waitThread:lock.lock()
使用CAS將AQS.state由0修改為1,成功獲取鎖
signalThread:lock.lock()
由於waitThread已經獲取鎖資源,因此signalThread加入AQS佇列後掛起
- 呼叫
addWaiter()
將signalThread加入AQS佇列,此時AQS佇列如下(注意:head.waitStatus
為0)
- 呼叫
acquireQueued(node,1)
掛起當前執行緒後AQS佇列如下(注意:head.waitStatus
為Node.SIGNAL)
- 呼叫
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//由於pred(即頭結點)的waitStatus=0,會進入此段程式碼,將head.waitStatus修改為Node.SIGNAL(即-1)
//在acquireQueued()的死迴圈第二次時才會將signalThread掛起
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
waitThread:condition.await()
await()
共做了如下幾件事情:
- 將waitThread新增至Condition佇列–>
addConditionWaiter()
- 將waitThread獲取的鎖資源全部釋放,喚醒AQS佇列的第一個執行緒–>
fullyRelease()
- 迴圈判斷waitThread是否在AQS佇列,若不在,則將waitThread掛起
前兩點容易理解,第三點是什麼含義呢?waitThread不是在Condition佇列嗎?為什麼要判斷waitThread是否在AQS對列?
答案在signalThread:condition.signal()中,讓我們帶著疑問繼續看下去.
呼叫
addConditionWaiter()
後,Condition佇列如下:(注意:waitStatue
為Node.CONDITION,即-2)
AQS.fullRelease(node)–>AQS.release(1)–>Sync.tryRelease(1),AQS.unparkSuccessor(node)
釋放資源後呼叫AQS.unparkSuccessor(node)喚醒AQS佇列中的執行緒(即signalThread)AQS.isOnSyncQueue(node)–>LockSupport.park(this)
判斷waitThread的node是否在SyncQueue中,由於waitThread不在SyncQueue中,掛起waitThread
signalThread:lock.lock()執行緒被阻塞的後半部分
其實就是執行acquireQueued()
掛起執行緒後的程式碼,即將signalThread移出AQS佇列,操作完成後佇列如下:
signalThread:condition.signal()
上面說到的第三件事情是迴圈判斷waitThread是否在AQS佇列,若不在,則將waitThread掛起,waitThread就是在condition.signal()中從Condition佇列移至AQS佇列的.
- ConditionObject.doSignal(firstWaiter)–>AQS.transferForSignal(firstWaiter)
將Condition佇列中的waitThread從轉移至AQS佇列
AQS佇列如下:
Condition佇列如下:
signalThread:lock.unlock()
- AQS.release(1)–>Sync.tryRelease(1),AQS.unparkSuccessor(node)
釋放鎖,並喚醒在AQS佇列中的waitThread
waitThread:condition.await()被阻塞的後半部分
- AQS.isOnSyncQueue(node)
此時node已經在AQS佇列中,跳出while迴圈 - AQS.acquireQueued(node,1)–>Sync.tryAcquire(1)
嘗試獲取鎖成功,將waitThread移出AQS佇列
總結
- Condition維護一個等待signal訊號的執行緒佇列
- await()將當前執行緒加入Condition佇列,釋放當前執行緒擁有的鎖資源,掛起當前執行緒
- signal()將Condition佇列的一個執行緒轉移至AQS佇列,喚醒該執行緒
不足
- 未考慮多執行緒的情況
- 未考慮
Node.waitStatue
的變化