深入理解Object提供的阻塞和喚醒API
深入理解Object提供的阻塞和喚醒API
前提
前段時間花了大量時間去研讀JUC中同步器AbstractQueuedSynchronizer
的原始碼實現,再結合很久之前看過的一篇關於Object
提供的等待和喚醒機制的JVM實現,發現兩者有不少的關聯,於是決定重新研讀一下Object
中提供的阻塞和喚醒方法。本文閱讀JDK類庫原始碼使用的JDK版本是JDK11,因為本文內容可能不適合於其他版本。
Object提供的阻塞和喚醒API
java.lang.Object
作為所有非基本型別的基類,也就是說所有java.lang.Object
的子類都具備阻塞和喚醒的功能。下面詳細分析Object
提供的阻塞和喚醒API。
阻塞等待-wait
等待-wait()
方法提供了阻塞的功能,分超時和永久阻塞的版本,實際上,底層只提供了一個JNI方法:
// 這個是底層提供的JNI方法,帶超時的阻塞等待,響應中斷,其他兩個只是變體 public final native void wait(long timeoutMillis) throws InterruptedException; // 變體方法1,永久阻塞,響應中斷 public final void wait() throws InterruptedException { wait(0L); } // 變體方法2,帶超時的阻塞,超時時間分兩段:毫秒和納秒,實際上納秒大於0直接毫秒加1(這麼暴力...),響應中斷 public final void wait(long timeoutMillis, int nanos) throws InterruptedException { if (timeoutMillis < 0) { throw new IllegalArgumentException("timeoutMillis value is negative"); } if (nanos < 0 || nanos > 999999) { throw new IllegalArgumentException("nanosecond timeout value out of range"); } if (nanos > 0) { timeoutMillis++; } wait(timeoutMillis); }
也就是隻有一個wait(long timeoutMillis)
方法是JNI介面,其他兩個方法相當於:
wait()
等價於wait(0L)
。wait(long timeoutMillis, int nanos)
在引數合法的情況下等價於wait(timeoutMillis + 1L)
。
由於wait(long timeoutMillis, int nanos)
是引數最完整的方法,它的API註釋特別長,這裡直接翻譯和摘取它註釋中的核心要素:
- 當前執行緒阻塞等待直到被喚醒,喚醒的情況一般有三種:notify(All)被呼叫、執行緒被中斷或者在指定了超時阻塞的情況下超過了指定的阻塞時間。
- 當前執行緒必須獲取此物件的監視器鎖(monitor lock),也就是呼叫阻塞等待方法之前一個執行緒必須成為此物件的監視器鎖的擁有者。
- 呼叫了
wait()
方法之後,當前執行緒會把自身放到當前物件的等待集合(wait-set),然後釋放所有在此物件上的同步宣告(then to relinquish any nd all synchronization claims on this object),謹記只有當前物件上的同步宣告會被釋放,當前執行緒在其他物件上的同步鎖只有在呼叫其wait()
方法之後才會釋放。 - Warning:執行緒被喚醒之後(
notify()
或者中斷)就會從等待集合(wait-set)中移除並且重新允許被執行緒排程器排程。通常情況下,這個被喚醒的執行緒會與其他執行緒競爭物件上的同步權(鎖),一旦執行緒重新控制了物件(regained control of the object),它對物件的所有同步宣告都恢復到以前的狀態,即恢復到呼叫wait()
方法時(筆者認為,其實準確來說,是呼叫wait()
方法前)的狀態。 - 如果任意執行緒在它呼叫了
wait()
之前,或者呼叫過wait()
方法之後處於阻塞等待狀態,一旦執行緒呼叫了Thread#interrupt()
,執行緒就會中斷並且丟擲InterruptedException
異常,執行緒的中斷狀態會被清除。InterruptedException
異常會延遲到在第4點提到"它對物件的所有同步宣告都恢復到以前的狀態"的時候丟擲。
值得注意的還有:
一個執行緒必須成為此物件的監視器鎖的擁有者才能正常呼叫wait()
系列方法,也就是wait()
系列方法必須在同步程式碼塊(synchronized
程式碼塊)中呼叫,否則會丟擲IllegalMonitorStateException
異常,這一點是初學者或者不瞭解wait()
的機制的開發者經常會犯的問題。
上面的五點描述可以寫個簡單的同步程式碼塊虛擬碼時序總結一下:
final Object lock = new Object();
synchronized(lock){
1、執行緒進入同步程式碼塊,意味著獲取物件監視器鎖成功
while(!condition){
lock.wait(); 2.執行緒呼叫wait()進行阻塞等待
break;
}
3.執行緒從wait()的阻塞等待中被喚醒,恢復到第1步之後的同步狀態
4.繼續執行後面的程式碼,直到離開同步程式碼塊
}
喚醒-notify
notify()
方法的方法簽名如下:
@HotSpotIntrinsicCandidate
public final native void notify();
下面按照慣例翻譯一下其API註釋:
- 喚醒一個阻塞等待在此物件監視器上的執行緒,(如果存在多個阻塞執行緒)至於選擇哪一個執行緒進行喚醒是任意的,取決於具體的現實,一個執行緒通過呼叫
wait()
方法才能阻塞在物件監視器上。 - 被喚醒的執行緒並不會馬上繼續執行,直到當前執行緒(也就是當前呼叫了
notify()
方法的執行緒)釋放物件上的鎖。被喚醒的執行緒會與其他執行緒競爭在物件上進行同步(換言之只有獲得物件的同步控制權才能繼續執行),在成為下一個鎖定此物件的執行緒時,被喚醒的執行緒沒有可靠的特權或劣勢。 - 此方法只有在一個執行緒獲取了此物件監視器的所有權(the owner)的時候才能呼叫,具體就是:同步方法中、同步程式碼塊中或者靜態同步方法中。否則,會丟擲
IllegalMonitorStateException
異常。
喚醒所有-notifyAll
notifyAll()
方法的方法簽名如下:
@HotSpotIntrinsicCandidate
public final native void notifyAll();
1.喚醒所有阻塞等待在此物件監視器上的執行緒,一個執行緒通過呼叫wait()
方法才能阻塞在物件監視器上。
其他註釋的描述和notify()
方法類似。
小結
我們經常看到的資料中提到synchronized
關鍵字的用法:
- 普通同步方法,同步或者說鎖定的是當前例項物件。
- 靜態同步方法,同步或者說鎖定的是當前例項物件的
Class
物件。 - 同步程式碼塊,同步或者說鎖定的是括號裡面的例項物件。
對於同步程式碼塊而言,synchronized
關鍵字抽象到位元組碼層面就是同步程式碼塊中的位元組碼執行在monitorenter
和monitorexit
指令之間:
synchronized(xxxx){
...coding block
}
↓↓↓↓↓↓↓↓↓↓
monitorenter;
...coding block - bytecode
monitorexit;
JVM需要保證每一個monitorenter都有一個monitorexit與之相對應。任何物件都有一個monitor(實際上是ObjectMonitor
)與之相關聯,當且一個monitor被持有之後,它將處於鎖定狀態。執行緒執行到monitorenter指令時,將會嘗試獲取物件所對應的monitor所有權,即嘗試獲取物件的鎖。
對於同步(靜態)方法而言,synchronized
方法則會被翻譯成普通的方法呼叫和返回指令,如:invokevirtual
等等,在JVM位元組碼層面並沒有任何特別的指令來實現被synchronized
修飾的方法,而是在Class
檔案的方法表中將該方法的access_flags
欄位中的synchronized
標誌位置1,表示該方法是同步方法並使用呼叫該方法的物件或該方法所屬的Class
在JVM的內部物件表示Klass
做為鎖物件。
其實從開發者角度簡單理解,這兩種方式只是在獲取鎖的時機有所不同。
下面重複闡述幾個第一眼看起來不合理卻是事實的問題(其實前文已經提及過):
- 線上程進入
synchronized
方法或者程式碼塊,相當於獲取監視器鎖成功,如果此時成功呼叫wait()
系列方法,那麼它會立即釋放監視器鎖,並且新增到等待集合(Wait Set)中進行阻塞等待。 - 由於已經有執行緒釋放了監視器鎖,那麼在另一個執行緒進入
synchronized
方法或者程式碼塊之後,它可以呼叫notify(All)
方法喚醒等待集合中正在阻塞的執行緒,但是這個喚醒操作並不是呼叫notify(All)
方法後立即生效,而是在該執行緒退出synchronized
方法或者程式碼塊之後才生效。 - 從
wait()
方法阻塞過程中被喚醒的執行緒會競爭監視器目標物件的控制權,一旦重新控制了物件,那麼執行緒的同步狀態就會恢復到步入synchronized
方法或者程式碼塊時候的狀態(也就是成功獲取到物件監視器鎖時候的狀態),這個時候執行緒才能夠繼續執行。
為了驗證這三點,可以寫個簡單的Demo:
public class Lock {
@Getter
private final Object lock = new Object();
}
public class WaitMain {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
final Lock lock = new Lock();
new Thread(new WaitRunnable(lock), "WaitThread-1").start();
new Thread(new WaitRunnable(lock), "WaitThread-2").start();
Thread.sleep(50);
new Thread(new NotifyRunnable(lock), "NotifyThread").start();
Thread.sleep(Integer.MAX_VALUE);
}
@RequiredArgsConstructor
private static class WaitRunnable implements Runnable {
private final Lock lock;
@Override
public void run() {
synchronized (lock) {
System.out.println(String.format("[%s]-執行緒[%s]獲取鎖成功,準備執行wait方法", F.format(LocalDateTime.now()),
Thread.currentThread().getName()));
while (true) {
try {
lock.wait();
} catch (InterruptedException e) {
//ignore
}
System.out.println(String.format("[%s]-執行緒[%s]從wait中喚醒,準備exit", F.format(LocalDateTime.now()),
Thread.currentThread().getName()));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
//ignore
}
break;
}
}
}
}
@RequiredArgsConstructor
private static class NotifyRunnable implements Runnable {
private final Lock lock;
@Override
public void run() {
synchronized (lock) {
System.out.println(String.format("[%s]-執行緒[%s]獲取鎖成功,準備執行notifyAll方法", F.format(LocalDateTime.now()),
Thread.currentThread().getName()));
lock.notifyAll();
System.out.println(String.format("[%s]-執行緒[%s]先休眠3000ms", F.format(LocalDateTime.now()),
Thread.currentThread().getName()));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
//ignore
}
System.out.println(String.format("[%s]-執行緒[%s]準備exit", F.format(LocalDateTime.now()),
Thread.currentThread().getName()));
}
}
}
}
某個時刻的執行結果如下:
[2019-04-27 23:28:17.617]-執行緒[WaitThread-1]獲取鎖成功,準備執行wait方法
[2019-04-27 23:28:17.631]-執行緒[WaitThread-2]獲取鎖成功,準備執行wait方法
[2019-04-27 23:28:17.657]-執行緒[NotifyThread]獲取鎖成功,準備執行notifyAll方法 <-------- 這一步執行完說明WaitThread已經釋放了鎖
[2019-04-27 23:28:17.657]-執行緒[NotifyThread]先休眠3000ms
[2019-04-27 23:28:20.658]-執行緒[NotifyThread]準備exit <------- 這一步後NotifyThread離開同步程式碼塊
[2019-04-27 23:28:20.658]-執行緒[WaitThread-1]從wait中喚醒,準備exit <------- 這一步WaitThread-1解除阻塞
[2019-04-27 23:28:21.160]-執行緒[WaitThread-2]從wait中喚醒,準備exit <------- 這一步WaitThread-2解除阻塞,注意發生時間在WaitThread-1解除阻塞500ms之後,符合我們前面提到的第3點
如果結合wait()
和notify()
可以簡單總結出一個同步程式碼塊的虛擬碼如下:
final Object lock = new Object();
// 等待
synchronized(lock){
1、執行緒進入同步程式碼塊,意味著獲取物件監視器鎖成功
while(!condition){
lock.wait(); 2.執行緒呼叫wait()進行阻塞等待
break;
}
3.執行緒從wait()的阻塞等待中被喚醒,嘗試恢復第1步之後的同步狀態,並不會馬上生效,直到notify被呼叫並且呼叫notify方法的執行緒已經釋放鎖,同時當前執行緒需要競爭成功
4.繼續執行後面的程式碼,直到離開同步程式碼塊
}
// 喚醒
synchronized(lock){
1、執行緒進入同步程式碼塊,意味著獲取物件監視器鎖成功
lock.notify(); 2.喚醒其中一個在物件監視器上等待的執行緒
3.準備推出同步程式碼塊釋放鎖,只有釋放鎖之後第2步才會生效
}
圖解Object提供的阻塞和喚醒機制
結合前面分析過的知識點以及參考資料中的文章,重新畫一個圖理解一下物件監視器以及相應阻塞和喚醒API的工作示意過程:
- Entry Set(實際上是
ObjectMonitor
中的_EntryList屬性):存放等待鎖並且處於阻塞狀態的執行緒。 - Wait Set(實際上是
ObjectMonitor
中的_WaitSet屬性):存放處於等待阻塞狀態的執行緒。 - The Owner(實際上是
ObjectMonitor
中的_owner屬性):指向獲得物件監視器的執行緒,在同一個時刻只能有一個執行緒被The Owner持有,通俗來看,它就是監視器的控制權。
使用例子
通過Object
提供的阻塞和喚醒機制舉幾個簡單的使用例子。
維修廁所的例子
假設有以下場景:廁所的只有一個卡位,廁所維修工修廁所的時候,任何人不能上廁所。當廁所維修工修完廁所的時候,上廁所的人需要"得到廁所的控制權"才能上廁所。
// 廁所類
public class Toilet {
// 廁所的鎖
private final Object lock = new Object();
private boolean available;
public Object getLock() {
return lock;
}
public void setAvailable(boolean available) {
this.available = available;
}
public boolean getAvailable() {
return available;
}
}
// 廁所維修工
@RequiredArgsConstructor
public class ToiletRepairer implements Runnable {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private final Toilet toilet;
@Override
public void run() {
synchronized (toilet.getLock()) {
System.out.println(String.format("[%s]-廁所維修員得到了廁所的鎖,維修廁所要用5000ms...", LocalDateTime.now().format(F)));
try {
Thread.sleep(5000);
} catch (Exception e) {
// ignore
}
toilet.setAvailable(true);
toilet.getLock().notifyAll();
System.out.println(String.format("[%s]-廁所維修員維修完畢...", LocalDateTime.now().format(F)));
}
}
}
//上廁所的任務
@RequiredArgsConstructor
public class ToiletTask implements Runnable {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private final Toilet toilet;
private final String name;
private final Random random;
@Override
public void run() {
synchronized (toilet.getLock()) {
System.out.println(String.format("[%s]-%s得到了廁所的鎖...", LocalDateTime.now().format(F), name));
while (!toilet.getAvailable()) {
try {
toilet.getLock().wait();
} catch (InterruptedException e) {
//ignore
}
int time = random.nextInt(3) + 1;
try {
// 模擬上廁所用時
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
//ignore
}
System.out.println(String.format("[%s]-%s上廁所用了%s秒...", LocalDateTime.now().format(F), name, time));
}
}
}
}
// 場景入口
public class Main {
public static void main(String[] args) throws Exception {
Toilet toilet = new Toilet();
Random random = new Random();
Thread toiletRepairer = new Thread(new ToiletRepairer(toilet), "ToiletRepairer");
Thread thread1 = new Thread(new ToiletTask(toilet, "張三", random), "thread-1");
Thread thread2 = new Thread(new ToiletTask(toilet, "李四", random), "thread-2");
Thread thread3 = new Thread(new ToiletTask(toilet, "王五", random), "thread-3");
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(50);
toiletRepairer.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
某次執行的結果如下:
[2019-04-29 01:07:25.914]-張三得到了廁所的鎖...
[2019-04-29 01:07:25.931]-李四得到了廁所的鎖...
[2019-04-29 01:07:25.931]-王五得到了廁所的鎖...
[2019-04-29 01:07:25.951]-廁所維修員得到了廁所的鎖,維修廁所要用5000ms...
[2019-04-29 01:07:30.951]-廁所維修員維修完畢...
[2019-04-29 01:07:32.952]-張三上廁所用了2秒...
[2019-04-29 01:07:35.952]-王五上廁所用了3秒...
[2019-04-29 01:07:37.953]-李四上廁所用了2秒...
阻塞佇列實現
實現一個簡單固定容量的阻塞佇列,介面如下:
public interface BlockingQueue<T> {
void put(T value) throws InterruptedException;
T take() throws InterruptedException;
}
其中put(T value)
會阻塞直到佇列中有可用的容量,而take()
方法會阻塞直到有元素投放到佇列中。實現如下:
public class DefaultBlockingQueue<T> implements BlockingQueue<T> {
private Object[] elements;
private final Object notEmpty = new Object();
private final Object notFull = new Object();
private int count;
private int takeIndex;
private int putIndex;
public DefaultBlockingQueue(int capacity) {
this.elements = new Object[capacity];
}
@Override
public void put(T value) throws InterruptedException {
synchronized (notFull) {
while (count == elements.length) {
notFull.wait();
}
}
final Object[] items = this.elements;
items[putIndex] = value;
if (++putIndex == items.length) {
putIndex = 0;
}
count++;
synchronized (notEmpty) {
notEmpty.notify();
}
}
@SuppressWarnings("unchecked")
@Override
public T take() throws InterruptedException {
synchronized (notEmpty) {
while (count == 0) {
notEmpty.wait();
}
}
final Object[] items = this.elements;
T value = (T) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) {
takeIndex = 0;
}
count--;
synchronized (notFull) {
notFull.notify();
}
return value;
}
}
場景入口類:
public class Main {
public static void main(String[] args) throws Exception {
BlockingQueue<String> queue = new DefaultBlockingQueue<>(5);
Runnable r = () -> {
while (true) {
try {
String take = queue.take();
System.out.println(String.format("執行緒%s消費訊息-%s", Thread.currentThread().getName(), take));
} catch (Exception e) {
e.printStackTrace();
}
}
};
new Thread(r, "thread-1").start();
new Thread(r, "thread-2").start();
IntStream.range(0, 10).forEach(i -> {
try {
queue.put(String.valueOf(i));
} catch (InterruptedException e) {
//ignore
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
某次執行結果如下:
執行緒thread-1消費訊息-0
執行緒thread-2消費訊息-1
執行緒thread-1消費訊息-2
執行緒thread-2消費訊息-3
執行緒thread-1消費訊息-4
執行緒thread-2消費訊息-5
執行緒thread-1消費訊息-6
執行緒thread-2消費訊息-7
執行緒thread-1消費訊息-8
執行緒thread-2消費訊息-9
上面這個例子就是簡單的單生產者-多消費者的模型。
執行緒池實現
這裡實現一個極度簡陋的固定容量的執行緒池,功能是:初始化固定數量的活躍執行緒,阻塞直到有可用的執行緒用於提交任務。它只有一個介面方法,介面定義如下:
public interface ThreadPool {
void execute(Runnable runnable);
}
具體實現如下:
public class DefaultThreadPool implements ThreadPool {
private final int capacity;
private List<Worker> initWorkers;
private Deque<Worker> availableWorkers;
private Deque<Worker> busyWorkers;
private final Object nextLock = new Object();
public DefaultThreadPool(int capacity) {
this.capacity = capacity;
init(capacity);
}
private void init(int capacity) {
initWorkers = new ArrayList<>(capacity);
availableWorkers = new LinkedList<>();
busyWorkers = new LinkedList<>();
for (int i = 0; i < capacity; i++) {
Worker worker = new Worker();
worker.setName("Worker-" + (i + 1));
worker.setDaemon(true);
initWorkers.add(worker);
}
for (Worker w : initWorkers) {
w.start();
availableWorkers.add(w);
}
}
@Override
public void execute(Runnable runnable) {
if (null == runnable) {
return;
}
synchronized (nextLock) {
while (availableWorkers.size() < 1) {
try {
nextLock.wait(500);
} catch (InterruptedException e) {
//ignore
}
}
Worker worker = availableWorkers.removeFirst();
busyWorkers.add(worker);
worker.run(runnable);
nextLock.notifyAll();
}
}
private void makeAvailable(Worker worker) {
synchronized (nextLock) {
availableWorkers.add(worker);
busyWorkers.remove(worker);
nextLock.notifyAll();
}
}
private class Worker extends Thread {
private final Object lock = new Object();
private Runnable runnable;
private AtomicBoolean run = new AtomicBoolean(true);
private void run(Runnable runnable) {
synchronized (lock) {
if (null != this.runnable) {
throw new IllegalStateException("Already running a Runnable!");
}
this.runnable = runnable;
lock.notifyAll();
}
}
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized (lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
synchronized (lock) {
runnable = null;
}
if (ran) {
ran = false;
makeAvailable(this);
}
}
}
}
}
}
場景類入口:
public class Main {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception{
ThreadPool threadPool = new DefaultThreadPool(2);
threadPool.execute(() -> {
try {
System.out.println(String.format("[%s]-任務一開始執行持續3秒...", LocalDateTime.now().format(F)));
Thread.sleep(3000);
System.out.println(String.format("[%s]-任務一執行結束...", LocalDateTime.now().format(F)));
}catch (Exception e){
//ignore
}
});
threadPool.execute(() -> {
try {
System.out.println(String.format("[%s]-任務二開始執行持續4秒...", LocalDateTime.now().format(F)));
Thread.sleep(4000);
System.out.println(String.format("[%s]-任務二執行結束...", LocalDateTime.now().format(F)));
}catch (Exception e){
//ignore
}
});
threadPool.execute(() -> {
try {
System.out.println(String.format("[%s]-任務三開始執行持續5秒...", LocalDateTime.now().format(F)));
Thread.sleep(5000);
System.out.println(String.format("[%s]-任務三執行結束...", LocalDateTime.now().format(F)));
}catch (Exception e){
//ignore
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
某次執行結果如下:
[2019-04-29 02:07:25.465]-任務二開始執行持續4秒...
[2019-04-29 02:07:25.465]-任務一開始執行持續3秒...
[2019-04-29 02:07:28.486]-任務一執行結束...
[2019-04-29 02:07:28.486]-任務三開始執行持續5秒...
[2019-04-29 02:07:29.486]-任務二執行結束...
[2019-04-29 02:07:33.487]-任務三執行結束...
小結
鑑於筆者C語言學得不好,這裡就無法深入分析JVM原始碼的實現,只能結合一些現有的資料和自己的理解重新梳理一下Object
提供的阻塞和喚醒機制這些知識點。結合之前看過JUC同步器的原始碼,一時醒悟過來,JUC同步器只是在資料結構和演算法層面使用Java語言對原來JVM中C語言的阻塞和喚醒機制即Object
提供的那幾個JNI方法進行了一次實現而已。
最後,Object
提供的阻塞等待喚醒機制是JVM實現的(如果特別熟悉C語言可以通過JVM原始碼研究其實現,對於大部分開發者來說是黑箱),除非是特別熟練或者是JDK版本太低尚未引入JUC包,一般情況下不應該優先選擇Object
,而應該考慮專門為併發設計的JUC包中的類庫。
參考資料:
- JVM原始碼分析之Object.wait/notify實現-By佔小狼
- 死磕Java併發-深入分析synchronized的實現原理
- JDK11相關原始碼
原文連結
Github Page:http://www.throwable.club/2019/04/30/java-object-wait-notify/
Coding Page:http://throwable.coding.me/2019/04/30/java-object-wait-notify/
(本文完 c-7-d e-a-201904