1. 程式人生 > >等待&通知機制

等待&通知機制

等待/通知機制

什麼是等待/通知機制?

舉例說明,廚師和服務員之間的互動
1. 廚師做完一道菜的時間不確定,所以廚師將菜品放到”菜品傳遞臺”上的時間也不確定;
2. 服務員取到菜的時間取決於廚師,所以服務員就處於等待狀態
3. 服務員如何取到菜呢?又得取決於廚師,廚師將菜放在”菜品傳遞臺”上,其實就相當於一種通知,這時服務員才可以拿到菜並交給就餐者。

如何實現等待/通知機制?

一句話總結:wait 使執行緒停止執行,而 notify 使停止的執行緒繼續執行。

wait() 方法

  • Object 類的方法,作用:使當前執行程式碼的執行緒進行等待,直到接到通知被中斷為止。
  • 執行之後,當前執行緒釋放鎖
  • 只能在同步方法或同步塊中呼叫 wait()方法。原因:JDK 強制的,方法呼叫之前必須先獲得該物件的物件級別鎖,如果呼叫時沒有持有適當的鎖,則丟擲 IllegalMonitorStateException 異常。

notify()/notifyAll() 方法

  • Object 類的方法,用來通知那些可能等待該物件的物件鎖的其他執行緒
  • 在執行 notify() 方法後,當前執行緒不能馬上釋放該物件鎖,wait 狀態的執行緒也不能馬上獲取該物件鎖,需要等到執行 notify() 方法的執行緒執行完
    (退出 synchronized 程式碼塊後)。
  • 只能在同步方法或同步塊中呼叫。原因如上。

執行緒狀態切換

Runnable 狀態 & Running 狀態

Runnable:就緒狀態,隨時可能被 CPU 排程執行

Running:執行狀態,執行緒獲取 CPU 許可權進行執行

  1. 建立一個新的執行緒物件後,呼叫 start() 方法,系統為此執行緒分配 CPU 資源,執行緒進入 Runnable 狀態
  2. 如果執行緒搶佔到 CPU 資源,此執行緒進入 Running 狀態

Running 狀態 -> Blocked 狀態

blocked:阻塞狀態,執行緒放棄了 CPU 使用權,暫時停止執行,直到執行緒進入就緒狀態。分為三種:
1. 等待阻塞:呼叫 wait()方法,讓執行緒等到某工作的完成
2. 同步阻塞:synchronized 獲取物件鎖失敗(可能鎖被佔用)
3. 其他阻塞:呼叫 sleep() | join() | 發出 IO 請求時,執行緒進入阻塞。

  • 執行緒呼叫 sleep() 方法,主動放棄佔用的處理器資源
  • 執行緒呼叫了阻塞式 IO 方法,在該方法返回前,該執行緒被阻塞
  • 執行緒試圖獲得一個同步監視器,但該監視器正被其他執行緒所持有
  • 執行緒呼叫 wait() 方法,等待某個通知
  • 程式呼叫了 suspend 方法將該執行緒掛起。(此方法容易死鎖,避免使用)

Blocked 狀態 -> Runnable 狀態

  • 呼叫 sleep() 方法後,sleep()超時
  • 執行緒呼叫的阻塞 IO 已經返回,阻塞方法執行完畢
  • 執行緒成功獲得了試圖同步的監視器
  • 執行緒正在等到通知,其他執行緒發出了通知(notify() | notifyAll)
  • 處於掛起狀態的執行緒呼叫了 resume() 恢復方法

Dead 狀態

執行緒執行完了或者異常退出了 run() 方法,結束生命週期。

每個鎖物件都有兩個佇列:
- 就緒佇列:儲存將要獲得鎖的執行緒,一個執行緒被喚醒後,進入就緒佇列,等到 CPU 排程
- 阻塞佇列:儲存被阻塞的執行緒,一個執行緒被 wait() 後,進入阻塞佇列,等待下一次被喚醒

wait/notify模式的注意事項

  • wait 釋放鎖,notify 不釋放鎖
  • 當執行緒處於 wait 狀態時,使用 interrupt() 方法會出現 InterruptedException 異常
  • wait(long) 方法的作用:等待某一個時間內是否有執行緒對鎖進行喚醒,如果超過時間則自動喚醒
  • 如果通知過早,則會打亂程式正常的執行邏輯。(wait 狀態的執行緒不會被通知
  • wait 等待的條件發生變化,也容易造成程式邏輯的混亂

經典案例:生產者/消費者模式實現

實戰:等待/通知之交叉備份

建立 20 個執行緒,其中 10 個執行緒是將資料備份到 A 資料中,另外 10 個執行緒是將資料備份到 B 資料庫中,並且備份 A 資料庫和 B 資料庫是交叉進行的。

public class DBTools {

    // 確保備份 "★★★★★" 首先執行,然後與 "☆☆☆☆☆" 交替進行備份
    volatile private boolean prevIsA = false;

    synchronized public void backupA() {
        try {
            while (prevIsA) {
                wait();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println("★★★★★");
            }
            prevIsA = true;
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    synchronized public void backupB() {
        try {
            while (!prevIsA) {
                wait();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println("☆☆☆☆☆");
            }
            prevIsA = false;
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class BackupA extends Thread {
    private DBTools dbTools;

    public BackupA(DBTools dbTools) {
        super();
        this.dbTools = dbTools;
    }

    @Override
    public void run() {
        dbTools.backupA();
    }
}
public class BackupB extends Thread {
    private DBTools dbTools;

    public BackupB(DBTools dbTools) {
        super();
        this.dbTools = dbTools;
    }

    @Override
    public void run() {
        dbTools.backupB();
    }
}
public class Run {

    public static void main(String[] args) {

        DBTools dbTools = new DBTools();

        for (int i = 0; i < 20; i++) {
            BackupB output = new BackupB(dbTools);
            output.start();
            BackupA input = new BackupA(dbTools);
            input.start();
        }
    }
}