Java-多執行緒-wait/notify
Java-多執行緒-wait/notify
摘要
wait
notify
還有個notifyAll
都是執行緒通訊的常用手段。本文會簡要介紹其底層實現原理,並和Condition
的await
和signal
方法作對比。
有一個先導概念就是物件鎖和類鎖,他們其實都是物件監視器Object Monitor
,只不過類鎖是類物件的監視器,可以看另一篇文章:
Java-併發-鎖-synchronized之物件鎖和類鎖
在呼叫wait和notify之前必須持有物件鎖,那麼就必須瞭解synchronized
,可以參閱文章:
Java-併發-鎖-synchronized
更多關於Java鎖的資訊,可參考文章:
0x01 wait
1.1 基本概念
- 作用
顧名思義,wait其實就是執行緒用來做阻塞等待的。 - 超時引數
在JDK的Object中,wait方法分為帶引數和無引數版本,這裡說的引數就是等待超時的引數。 - 中斷
其他執行緒在當前執行緒執行wait之前或正在wait時,對當前執行緒呼叫中斷interrupted
方法,會丟擲InterruptedException
,且中斷標記會被自動清理。
先看沒有引數的版本:
/**
* 讓當前執行緒等待到指定Object,直到其他執行緒呼叫該物件的notify或notifyAll方法喚醒
* 該方法等價於呼叫wait(0)
*
* 注意 呼叫該方法前提是擁有該物件的物件鎖。否則會報錯丟擲IllegalMonitorStateException
*
* 當擁有物件鎖並呼叫wait方法時,會釋放物件鎖,
* 然後等待,直到其他執行緒呼叫該物件的notify或notifyAll方法喚醒那些wait在該物件鎖上的執行緒。
* 喚醒之後,該執行緒會嘗試去獲取物件鎖,拿不到就等到直到拿到
* 拿到物件鎖後繼續執行程式。
*
* 在單引數的wait方法版本中,中斷和意料之外的喚醒是可能的所以應該這麼做:
* synchronized (obj) {
* while (condition does not hold)
* obj.wait();
* ... // Perform action appropriate to condition
* }
*
* @throws IllegalMonitorStateException 呼叫執行緒未持有該物件的物件鎖.
* @throws InterruptedException
* @see java.lang.Object#notify()
* @see java.lang.Object#notifyAll()
*/
public final void wait() throws InterruptedException {
wait(0);
}
再看看帶1個引數版本的wait方法:
/**
* 讓當前執行緒等待到指定Object,直到其他執行緒呼叫該物件的notify或notifyAll方法喚醒
* 或是指定wait超時時間耗盡
*
* 注意 呼叫該方法前提是擁有該物件的物件鎖。否則會報錯丟擲IllegalMonitorStateException
*
* 該方法的原理:
* 1.呼叫wait方法的執行緒將自己加入該物件的等待結合中
* 2.然後放棄所有和該物件相關的同步鎖宣告
* 3.該呼叫執行緒隨後就不能被排程器排程執行了,進入休眠狀態直到以下情況發生:
* 1.其他執行緒對目標物件呼叫notify方法,剛好選中該執行緒被喚醒
* 2.其他執行緒對目標物件呼叫notifyAll方法喚醒所有執行緒
* 3.其他執行緒對該執行緒呼叫interrupt方法發起中斷
* 4.指定的超時時間耗盡,前提是超時時間不是0
* 4.該執行緒被喚醒後,從等待該物件的集合中移除,又可以被排程執行了
* 5.此時會跟其他執行緒一起競爭該物件的同步鎖
* 6.一旦該執行緒拿到物件同步鎖,所有在wait方法執行前的同步說明都重新起效
* 7.然後該執行緒就從wait方法中返回了,該過程結束
*
* 除了上面提到的幾種喚醒場景,還有一種極少發生的情況會喚醒執行緒,稱為`偽喚醒`
* 為了預防,所以應該這麼做:
* synchronized (obj) {
* while (condition does not hold)
* obj.wait();
* ... // Perform action appropriate to condition
* }
*
* <p>If the current thread is {@linkplain java.lang.Thread#interrupt()
* interrupted} by any thread before or while it is waiting, then an
* {@code InterruptedException} is thrown. This exception is not
* thrown until the lock status of this object has been restored as
* described above.
* 這段話沒看的太明白?
*
* 注意這個wait方法只會讓該執行緒釋放當前Object的物件鎖,而不會放棄擁有的其他物件鎖!
*
*
* @param timeout the maximum time to wait in milliseconds.
* @throws IllegalArgumentException if the value of timeout is
* negative.
* @throws IllegalMonitorStateException if the current thread is not
* the owner of the object's monitor.
* @throws InterruptedException
* @see java.lang.Object#notify()
* @see java.lang.Object#notifyAll()
*/
public final native void wait(long timeout) throws InterruptedException;
1.2 實現原理
可以先點選這裡回顧下關於ObjectWatier
的知識。
然後我們繼續分析底層原始碼。
wait/notify/notifyAll
程式碼主要在jdk8/hotspot/src/share/vm/runtime/synchronizer.cpp
裡。
1.2.1 ObjectSynchronizer::wait
下面看看wait方法底層實現,摘錄部分核心程式碼如下:
// 注意,必須使用重量級monitor來處理wait方法
// 第一個引數是控制代碼指向了我們wait的目標Object
// 第二個引數是wait的毫秒數
// 第三個是呼叫wait的執行緒
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
if (UseBiasedLocking) {
// 如果開啟了偏向鎖
// 嘗試獲取該偏向鎖,注意偏向鎖是可重入的
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
if (millis < 0) {
// wait超時時間不可小於0
TEVENT (wait - throw IAX) ;
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
// 膨脹為重量級鎖,得到該ObjectMonitor
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
// 呼叫該monitor的wait方法
monitor->wait(millis, true, THREAD);
}
1.2.2 ObjectSynchronizer::wait
ObjectMonitor
相關程式碼在
/Users/chengc/cc/work/projects/jdk8/hotspot/src/share/vm/runtime/objectMonitor.hpp
/Users/chengc/cc/work/projects/jdk8/hotspot/src/share/vm/runtime/objectMonitor.cpp
下面看看wait方法,摘錄部分核心程式碼如下:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS){
// 以本身的ObjectMonitor建立一個ObjectWaiter
ObjectWaiter node(Self);
// 將該ObjectWaiter狀態設為TS_WAIT
node.TState = ObjectWaiter::TS_WAIT ;
// 在這個AddWaiter時出現執行緒競爭的情況很少,所以採用了輕量級的自旋鎖
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
// 新增該ObjectWaiter node到雙向連結串列WaitSet中
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
// 累加waiter
_waiters++;
// 釋放ObjectMonitor
// 當呼叫返回後,其他執行緒就可以使用enter()方法競爭ObjectMonitor了
exit (true, Self) ;
// 執行緒現在可以用park()方法阻塞了
// 程式碼註釋說以後要 change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
}
1.2.3 ObjectMonitor::AddWaiter
使用了佇列的尾插法,到WaitSet
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
// 插入雙向連結串列組成的佇列的尾部
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
1.2.4 os::PlatformEvent::park()
該方法在jdk8/hotspot/src/os/linux/vm/os_linux.cpp
,主要是通過以下程式碼實現阻塞:
pthread_mutex_lock(_mutex)
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
pthread_mutex_unlock(_mutex);
0x02 notify
2.1 基本概念
- 該方法用來任意喚醒一個在物件鎖的等待集的執行緒(其實看了原始碼會發現不是任意的,而是一個WaitQueue,FIFO)。
- 但要注意,被喚醒的執行緒不會馬上開始執行,因為物件鎖還被呼叫
notify
的執行緒擁有,直到退出synchronized
塊。 - 喚醒後的執行緒跟其他執行緒一起競爭該同步物件鎖。
- 注意,該方法和wait方法一樣也必須是擁有該物件同步物件鎖的執行緒才能呼叫,否則丟擲
IllegalMonitorStateException
。
public final native void notify();
2.2 實現原理
2.2.1 ObjectSynchronizer::notify
void ObjectSynchronizer::notify(Handle obj, TRAPS) {
// 也是先用偏向鎖
if (UseBiasedLocking) {
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 獲取物件頭的MarkWord
markOop mark = obj->mark();
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
// 如果擁有的是輕量級鎖就直接返回了
return;
}
// 否則膨脹為重量級鎖,呼叫得到的ObjectMonitor的notify方法
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
2.2.2 ObjectMonitor::notify(TRAPS)
摘錄部分核心程式碼如下:
void ObjectMonitor::notify(TRAPS) {
// 檢查當前執行緒是否擁有該ObjectMonitor
CHECK_OWNER();
ObjectWaiter* iterator;
if (_WaitSet == NULL) {
// 如果WaitSet為空就返回了
TEVENT (Empty-NotifyAll) ;
return ;
}
// 自旋鎖方式獲取_WaitSetLock
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
// 獲取WaitSet首節點,並從WaitSet移除該節點
ObjectWaiter * iterator = DequeueWaiter() ;
// 在此之後根據Knob_MoveNotifyee不同,對該節點做不同處理,如加入EntryList等
// 也就是說讓該執行緒能重新競爭ObjectMonitor
// 最後釋放_WaitSetLock
Thread::SpinRelease (&_WaitSetLock) ;
}
0x03 notifyAll
3.1 基本概念
- 該方法用來喚醒所有在物件鎖的等待集的執行緒。
- 但要注意,被喚醒的執行緒不會馬上開始執行,因為物件鎖還被呼叫
notifyAll
的執行緒擁有。 - 喚醒後的執行緒跟其他執行緒一起競爭該同步物件鎖。
- 注意,該方法和wait方法一樣也必須是擁有該物件同步物件鎖的執行緒才能呼叫,否則丟擲
IllegalMonitorStateException
。
public final native void notifyAll();
3.2 實現原理
跟notify差不多,其實就是迴圈的方式把WaitSet裡的執行緒節點全部取出,放入EntryList。
0x04 wait與sleep比較
經常面試會問這個問題,往往我們都是網上查資料死記硬背。現在我們都看完了原始碼(sleep原始碼點這裡),可以得出以下結論
- wait會釋放ObjectMonitor控制權;sleep不會
- wait邏輯複雜,需要首先呼叫synchronized獲取ObjectMonitor控制權,才能呼叫wait,且wait後還有放入WaitSet邏輯,喚醒時還有一系列複雜操作;而sleep實現簡單,不需要別的執行緒喚醒
- wait與sleep都能被中斷(除了sleep(0),當然對他中斷沒有意義)
0x05 Condition.await/signal對比wait/notify
關於Condition介紹可以參考這篇文章:Java-併發-Condition
5.1 Condition和Object關係
等待 | 喚醒 | 喚醒全部 | |
---|---|---|---|
Object | wait | notify | notifyAll |
Condition | await | signal | signalAll |
5.2 wait和await對比
中斷 | 超時精確 | Deadline | |
---|---|---|---|
wait | 可中斷 | 可為納秒 | 不支援 |
await | 支援可中斷/不可中斷 | 可為納秒 | 支援 |
5.3 notify和signal對比
全部喚醒 | 喚醒順序 | 執行前提 | 邏輯 | |
---|---|---|---|---|
notify | 支援,notifyAll | 隨機(jdk寫的,其實cpp原始碼是一個wait_queue,FIFO) | 擁有鎖 | 從wait_list取出,放入entry_list,重新競爭鎖 |
signal | 支援,signalAll | 順序喚醒 | 擁有鎖 | 從condition_queue取出,放入wait_queue,重新競爭鎖 |
5.4 底層原理對比
- Object的阻塞和喚醒,前基於synchronized的。底層實現是在cpp級別,呼叫synchronized的執行緒物件會放入entry_list,競爭到鎖的執行緒處於
active
狀態。呼叫wait方法後,執行緒物件被放入wait_queue。而notify會按FIFO方法從wait_queue中取得一個物件並放回entry_list,這樣該執行緒可以重新競爭synchronized同步鎖了。 - Condition的阻塞喚醒,是基於lock的。lock維護了一個wait_queue,用於存放等待鎖的執行緒。而Condition也維護了一個condition_queue。當擁有鎖的執行緒呼叫await方法,就會被放入condition_queue;當呼叫signal方法,會從condition_queue選頭一個滿足要求的節點移除然後放入wait_queue,重新競爭lock。
5.5 應用場景對比
- Object使用比較單一,只能針對一個條件。
- 一個ReentrantLock可以有多個Condition,對應不同條件。比如在生產者消費者可以這樣實現:
private static ReentrantLock lock = new ReentrantLock();
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();
// 生產者
public void produce(E item) {
lock.lock();
try {
while(isFull()) {
// 資料滿了,生產者就阻塞,等待消費者消費完後喚醒
notFull.await();
}
// ...生產資料程式碼
// 喚醒消費者執行緒,告知有資料了,可以消費
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 消費者
public E consume() {
lock.lock();
try {
while(isEmpty()) {
// 資料空了,消費者就阻塞,等待生產者生產資料後喚醒
notEmpty.await();
}
// ...消費資料程式碼
// 喚醒生產者者執行緒,告知有資料了,可以消費
notFull.signal();
return item;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
這樣好處就很明顯了。如果使用Object,那麼喚醒的時候也許就喚醒了同類的角色執行緒。而使用condition可以在只有一個鎖的情況下,實現我們想要的只喚醒對方角色執行緒的功能。
0x06 總結
Object的阻塞和喚醒,是基於synchronized的。底層實現是在cpp級別。整個流程串起來如下:
- 呼叫
synchronized
的執行緒物件會放入entry_list
- 成功競爭到鎖的那個執行緒處於
active
狀態 - 呼叫
wait
方法後,執行緒物件被放入wait_queue
- 而
notify
會按FIFO方法從wait_queue
中取得一個物件,並放回entry_list - 呼叫
wait
的執行緒釋放鎖 - 此後該執行緒可以重新競爭synchronized同步鎖了
- 競爭到鎖的程式,可以繼續同步塊中的執行程式碼了
更多關於Java鎖的資訊,可參考文章:Java-併發-關於鎖的一切