Java多執行緒之Condition實現原理和原始碼分析(四)
章節概覽、
1、概述
上面的幾個章節我們基於lock(),unlock()方法為入口,深入分析了獨佔鎖的獲取和釋放。這個章節我們在此基礎上,進一步分析AQS是如何實現await,signal功能。其功能上和synchronize的wait,notify一樣。但是本質是也是有很多區別。
1.1、 Condition和synchronize各自實現的等待喚醒區別
- Condition是基於AQS 佇列同步器實現的。而synchronize的等待喚醒是基於jvm的語義層次上實現的。
- Condition的使用不管是await,signal都有維護各自的一個阻塞佇列。而在synchronize所有的阻塞執行緒都被放到同一個阻塞佇列裡面,所以在多執行緒的情況下,可能存在早喚醒,喚醒失敗等情況。
2、入門案例
入門案例採用的是大家比較熟悉的生產者和消費者。
public class ProducerConsumerTest { private Lock lock = new ReentrantLock(); private Condition addCondition = lock.newCondition(); private Condition removeCondition = lock.newCondition(); private LinkedList<Integer> resources = new LinkedList<>(); private int maxSize; public ProducerConsumerTest(int maxSize) { this.maxSize = maxSize; } public class Producer implements Runnable { private int proSize; private Producer(int proSize) { this.proSize = proSize; } @Override public void run() { lock.lock(); try { for (int i = 1; i < proSize; i++) { while (resources.size() >= maxSize) { System.out.println("當前倉庫已滿,等待消費..."); try { addCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("已經生產產品數: " + i + "\t現倉儲量總量:" + resources.size()); resources.add(i); removeCondition.signal(); } } finally { lock.unlock(); } } } public class Consumer implements Runnable { @Override public void run() { String threadName = Thread.currentThread().getName(); while (true) { lock.lock(); try { while (resources.size() <= 0) { System.out.println(threadName + " 當前倉庫沒有產品,請稍等..."); try { // 進入阻塞狀態 removeCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 消費資料 int size = resources.size(); for (int i = 0; i < size; i++) { Integer remove = resources.remove(); System.out.println(threadName + " 當前消費產品編號為:" + remove); } // 喚醒生產者 addCondition.signal(); } finally { lock.unlock(); } } } } public static void main(String[] args) throws InterruptedException { ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(10); Producer producer = producerConsumerTest.new Producer(100); Consumer consumer = producerConsumerTest.new Consumer(); final Thread producerThread = new Thread(producer, "producer"); final Thread consumerThread = new Thread(consumer, "consumer"); producerThread.start(); TimeUnit.SECONDS.sleep(2); consumerThread.start(); } }
其中維護了一個儲存倉庫。當生產者把生產的物品放入到倉庫中,直到倉庫填滿,進行阻塞等待,此時生產者會釋放當前的鎖。剛開始,消費者會檢查當前倉庫是否有物品,如果沒有物品進行阻塞,等待喚醒。當生產者喚醒消費者,消費者進行消費。我們以此案例的進行逐步分析。
3、 Condition原始碼分析
3.1、 Condition介面原始碼分析
public interface Condition { // 當前執行緒進入等待狀態直到被通知(signal)或者中斷 // 當前執行緒進入執行狀態並從await()方法返回的場景包括: //(1)其他執行緒呼叫相同Condition物件的signal/signalAll方法,並且當前執行緒被喚醒; //(2)其他執行緒呼叫interrupt方法中斷當前執行緒; void await() throws InterruptedException; // 當前執行緒進入等待狀態直到被通知,在此過程中對中斷訊號不敏感,不支援中斷當前執行緒 void awaitUninterruptibly(); // 當前執行緒進入等待狀態,直到被通知、中斷或者超時。如果返回值小於等於0,可以認定就是超時了 boolean await(long time, TimeUnit unit) throws InterruptedException; // 當前執行緒進入等待狀態,直到被通知、中斷或者超時。如果返回值小於等於0,可以認定就是超時了 long awaitNanos(long nanosTimeout) throws InterruptedException; //當前執行緒進入等待狀態,直到被通知、中斷或者超時。如果沒到指定時間被通知,則返回true,否則返回false boolean awaitUntil(Date deadline) throws InterruptedException; // 喚醒一個等待在Condition上的執行緒,被喚醒的執行緒在方法返回前必須獲得與Condition物件關聯的鎖 void signal(); // 喚醒所有等待在Condition上的執行緒,能夠從await()等方法返回的執行緒必須先獲得與Condition物件關聯的鎖 void signalAll(); }
3.2、Condition 實現類成員變數 建構函式說明
Condition的實現類為:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject。從類的引用方式可以看出,其是AbstractQueuedSynchronizer的一個內部類。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** Condition Queue 裡面的頭節點 */
private transient Node firstWaiter;
/** Condition Queue 裡面的尾節點 */
private transient Node lastWaiter;
/** 建構函式 */
public ConditionObject() { }
// 下面兩個是用於追蹤 呼叫 awaitXXX 方法時執行緒有沒有被中斷過
// REINTERRUPT: 代表執行緒是在 signal 後被中斷的 (REINTERRUPT = re-interrupt再次中斷最後會呼叫 selfInterrupt)
// THROW_IE: 代表在接受 signal 前被中斷的, 則直接丟擲異常 (Throw_IE = throw inner exception)
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
}
從原始碼分析可以看出,每個Condition例項裡面都會維護著一個連結串列。通過 firstWaiter,lastWaiter進行儲存。
4、wait()方法核心原始碼分析
4.1、 await()
public final void await() throws InterruptedException {
// 判斷當前執行緒是否被阻塞,如果被阻塞,直接丟擲InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
// 將當前執行緒新增到Condition等待佇列中
// 詳情請看: 4.2 小節addConditionWaiter()原始碼分析
Node node = addConditionWaiter();
// 釋放當前擁有的鎖資源:4.3小節fullyRelease(Node node)原始碼分析
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判斷當前節點是否在佇列同步器中。如果在同步器中,阻塞當前執行緒
// 死迴圈,直到其他執行緒喚醒當前執行緒
// 具體判斷是否在同步佇列中,請參考4.4 小節:boolean isOnSyncQueue(Node node)
while (!isOnSyncQueue(node)) {
// 阻塞當前執行緒,直到其他執行緒對其喚醒
LockSupport.park(this);
// 檢查 在 awaitXX 方法中的這次喚醒是否是中斷引起的
// 中斷檢測原始碼請請4.5小節:int checkInterruptWhileWaiting(Node node)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 呼叫 acquireQueued在 Sync Queue 裡面進行 獨佔鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果當前lnode節點的nextWaiter != null,則清理當前佇列
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根據中斷模式返回異常情況
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
4.2 、 addConditionWaiter()
當前方法位置:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter+
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果lastWaiter不為空,且其狀態不等於 Node.CONDITION。將當前節點刪除
if (t != null && t.waitStatus != Node.CONDITION) {
// 刪除當前節點,詳情檢視:4.2.1小結unlinkCancelledWaiters()分析
unlinkCancelledWaiters();
// 重新複製當前node節點t節點
t = lastWaiter;
}
// 建立一個node節點,設定狀態為:Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果lastWaiter()為空,設定 firstWaiter = node
// 如果lastWaiter不為空,直接加入t.nextWaiter = node
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 設定當前節點為lastWaiter節點
lastWaiter = node;
return node;
}
4.2.1、unlinkCancelledWaiters()
// 清理Condition佇列中被中斷或者過期的節點 // trail 節點一直維護者一個符合Condition條件佇列的節點。如果當前符合要求,直接尾部追加到trail中。如果當前節點不合法,直接跳過
private void unlinkCancelledWaiters() {
// 賦值當前頭節點
Node t = firstWaiter;
// trail 是中間儲存變數
Node trail = null;
// 迴圈遍歷,直到 t == null 退出
while (t != null) {
Node next = t.nextWaiter;
// 遍歷當前不符合Condition佇列規則的,將其清理
if (t.waitStatus != Node.CONDITION) {
// 設定當前 t節點的nextWaiter = null,方便 gc
t.nextWaiter = null;
// 如果 trail 為空。說明當前不合法的節點是firstWaiter,重新對其進行賦值
if (trail == null)
firstWaiter = next;
else
// 如果當前節點不合法,則將當前節點的next 賦值給 trail.nextWaiter 節點
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
// 當前節點賦值給t
trail = t;
// t賦值為下一個節點
t = next;
}
}
4.3、 fullyRelease(Node node)
完全釋放當前節點的鎖
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取系統中當前的state狀態(可衝入鎖,其狀態可能不為1)
int savedState = getState();
// 釋放當前節點的鎖,具體細節請參考上一章節
if (release(savedState)) {
// 釋放成功,重置failed的值
failed = false;
return savedState;
} else {
// 釋放失敗,丟擲異常
throw new IllegalMonitorStateException();
}
} finally {
// 如果釋放失敗,將當前的node節點設定為刪除狀態
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
4.4、boolean isOnSyncQueue(Node node)
final boolean isOnSyncQueue(Node node) {
// 判斷node節點的狀態是否為:Node.CONDITION 或者 其前驅節點為 null,直接返回為 false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果其後驅節點不為null,直接返回為true。退出當前迴圈
if (node.next != null) // If has successor, it must be on queue
return true;
/* node.prev 可以是非null,但尚未在佇列中。因為
*將其置於佇列中的CAS可能會失敗。 所以我們必須這樣做
*從尾部遍歷,以確保它實際上成功。 它
*在呼叫此方法時,它總是接近尾部,並且
*除非CAS失敗。
*/
// 詳情請檢視4.4.1 小結:findNodeFromTail(Node node)
return findNodeFromTail(node);
}
4.4.1、findNodeFromTail(Node node)
從佇列尾部進行遍歷,檢視node節點是否存在於佇列中
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
4.5、int checkInterruptWhileWaiting(Node node)
/**
* 檢查 在 awaitXX 方法中的這次喚醒是否是中斷引起的
* 若是中斷引起的, 則將 Node 從 Condition Queue 轉移到 Sync Queue 裡面
* 返回值的區別:
* 0: 當前執行緒沒有被中斷,此次喚醒是通過 signal
* THROW_IE: 此次的喚醒是通過 interrupt, 並且在接受 signal 之前
* REINTERRUPT: 執行緒的喚醒是 接受過 signal 而後又被中斷
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// 將 Node 從 Condition Queue 轉移到 Sync Queue 裡面
// 詳細原始碼分析參考4.5.1 小節:transferAfterCancelledWait(Node node)
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
4.5.1、 transferAfterCancelledWait(Node node)
final boolean transferAfterCancelledWait(Node node) {
// 通過cas修改當前節點的狀態為0,獨佔鎖預設狀態
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
新增到 Syn的佇列同步器中
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
至此wait()對應的原始碼已經分析完成。主要存在以下幾個核心步驟:
- 將當前節點新增到Condition 所維護的佇列中,尾部進行追加;
- 釋放當前執行緒鎖持有的鎖;
- 迴圈判斷當前執行緒是否在Sync的佇列同步器中;
- 如果存在於佇列同步器中,加下來的執行緒交給AQS自己去處理了;
5、signal() 核心原始碼分析
5.1、signal() 入口方法
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
// 判斷當前執行緒是否擁有鎖資源
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 喚醒頭結點
doSignal(first);
}
5.2 、void doSignal(Node first)
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 將當前節點新增到Sync的佇列中,通過unlock()方法的呼叫,進行喚醒
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
5.3、 boolean transferForSignal(Node node)
final boolean transferForSignal(Node node) {
// 設定當前節點的狀態為0,通過cas,設定失敗直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 將當前的節點新增到Sync的佇列同步器中,同時返回當前節點的前一個節點
Node p = enq(node);
int ws = p.waitStatus;
// 如果當前節點的ws > 0 或者設定當前節點的狀態
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒當前執行緒
LockSupport.unpark(node.thread);
return true;
}
至此signal 原始碼已經分析完成,signal方法主要是將當前的Condition的first節點,轉移到Sync的FIFO中。等待喚醒操作。
6、總結
至此,AQS的核心知識點原始碼分析完成,本人能力有限,如果不妥的地方,歡迎指正。