Java併發之Condition的實現分析
一、Condition的概念
介紹
回憶 synchronized 關鍵字,它配合 Object 的 wait()、notify() 系列方法可以實現等待/通知模式。
對於 Lock,通過 Condition 也可以實現等待/通知模式。
Condition 是一個介面。
Condition 介面的實現類是 Lock(AQS)中的 ConditionObject。
Lock 介面中有個 newCondition() 方法,通過這個方法可以獲得 Condition 物件(其實就是 ConditionObject)。
因此,通過 Lock 物件可以獲得 Condition 物件。
Lock lock = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
二、Condition的實現分析
實現
ConditionObject 類是 AQS 的內部類,實現了 Condition 介面。
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
...
可以看到,等待佇列和同步佇列一樣,使用的都是同步器 AQS 中的節點類 Node。
同樣擁有首節點和尾節點,
每個 Condition 物件都包含著一個 FIFO 佇列。
結構圖:
圖片描述(最多50字)
等待
呼叫 Condition 的 await() 方法會使執行緒進入等待佇列,並釋放鎖,執行緒狀態變為等待狀態。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
//釋放同步狀態(鎖)
int savedState = fullyRelease(node);
int interruptMode = 0;
//判斷節點是否放入同步對列
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//如果已經中斷了,則退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
分析上述方法的大概過程:
將當前執行緒建立為節點,加入等待佇列;
釋放鎖,喚醒同步佇列中的後繼節點;
while迴圈判斷節點是否放入同步佇列:
沒有放入,則阻塞,繼續 while 迴圈(如果已經中斷了,則退出)
放入,則退出 while 迴圈,執行後面的判斷
退出 while 說明節點已經在同步佇列中,呼叫 acquireQueued() 方法加入同步狀態競爭。
競爭到鎖後從 await() 方法返回,即退出該方法。
addConditionWaiter() 方法:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//清除條件佇列中所有狀態不為Condition的節點
unlinkCancelledWaiters();
t = lastWaiter;
}
//將該執行緒建立節點,放入等待佇列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
過程分析:同步佇列的首節點移動到等待佇列。加入尾節點之前會清除所有狀態不為 Condition 的節點。
通知
呼叫 Condition 的 signal() 方法,可以喚醒等待佇列的首節點(等待時間最長),喚醒之前會將該節點移動到同步佇列中。
public final void signal() {
//判斷是否獲取了鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
過程:
先判斷當前執行緒是否獲取了鎖;
然後對首節點呼叫 doSignal() 方法。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
過程:
修改首節點;
呼叫 transferForSignal() 方法將節點移動到同步佇列。
final boolean transferForSignal(Node node) {
//將節點狀態變為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將該節點加入同步佇列
Node p = enq(node);
int ws = p.waitStatus;
//如果結點p的狀態為cancel 或者修改waitStatus失敗,則直接喚醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
呼叫同步器的 enq 方法,將節點移動到同步佇列,
滿足條件後使用 LockSupport 喚醒該執行緒。
當 Condition 呼叫 signalAll() 方法:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
可以看到 doSignalAll() 方法使用了 do-while 迴圈來喚醒每一個等待佇列中的節點,直到 first 為 null 時,停止迴圈。
一句話總結 signalAll() 的作用:將等待佇列中的全部節點移動到同步佇列中,並喚醒每個節點的執行緒。
總結
整個過程可以分為三步:
第一步:一個執行緒獲取鎖後,通過呼叫 Condition 的 await() 方法,會將當前執行緒先加入到等待佇列中,並釋放鎖。然後就在 await() 中的一個 while 迴圈中判斷節點是否已經在同步佇列,是則嘗試獲取鎖,否則一直阻塞。
第二步:當執行緒呼叫 signal() 方法後,程式首先檢查當前執行緒是否獲取了鎖,然後通過 doSignal(Node first) 方法將節點移動到同步佇列,並喚醒節點中的執行緒。
第三步:被喚醒的執行緒,將從 await() 中的 while 迴圈中退出來,然後呼叫 acquireQueued() 方法競爭同步狀態。競爭成功則退出 await() 方法,繼續執行。