1. 程式人生 > >java併發——wait()、notify()、notifyAll()、join()

java併發——wait()、notify()、notifyAll()、join()

鎖池和等待池

wait() ,notifyAll(),notify() 三個方法都是Object類中的方法。學習他們之前先要了解倆個概念。java中,每個物件都有兩個池,鎖(monitor)池和等待池。

鎖池:假設執行緒A已經擁有了某個物件(注意:不是類)的鎖,而其它的執行緒想要呼叫這個物件的某個synchronized方法(或者synchronized塊),由於這些執行緒在進入物件的synchronized方法之前必須先獲得該物件的鎖的擁有權,但是該物件的鎖目前正被執行緒A擁有,所以這些執行緒就進入了該物件的鎖池中。 等待池:假設一個執行緒A呼叫了某個物件的wait()方法,執行緒A就會釋放該物件的鎖(因為wait()方法必須出現在synchronized中,這樣自然在執行wait()方法之前執行緒A就已經擁有了該物件的鎖),同時執行緒A就進入到了該物件的等待池中。如果另外的一個執行緒呼叫了相同物件的notifyAll()方法,那麼處於該物件的等待池中的執行緒就會全部進入該物件的鎖池中,準備爭奪鎖的擁有權。如果另外的一個執行緒呼叫了相同物件的notify()方法,那麼僅僅有一個處於該物件的等待池中的執行緒(隨機)會進入該物件的鎖池。

notify()、notifyAll()、wait()

notify():通知一個在物件上等待的執行緒,由WAITING狀態變為BLOCKING狀態,從等待佇列移動到同步佇列,等待CPU排程獲取該物件的鎖,當該執行緒獲取到了物件的鎖後,該執行緒從wait()方法返回。 notifyAll():通知所有等待在該物件上的執行緒,由WAITING狀態變為BLOCKING狀態,等待CPU排程獲取該物件的鎖。 wait():呼叫該方法的執行緒進入WAITING狀態,並將當前執行緒放置到物件的等待佇列,只有等待另外執行緒的通知或被中斷才會返回,需要注意,呼叫wait()方法後,會釋放物件的鎖。 wait(long):

超時等待一段時間,這裡的引數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就超時返回。 wait(long,int):對於超時時間更細力度的控制,可以達到納秒。

簡單的例子:

等待通知機制,是指一個執行緒A呼叫了物件lock的wait()方法進入等待狀態,而另一個執行緒B呼叫了物件lock的notify()或者notifyAll()方法,執行緒A收到通知後從物件lock的wait()方法返回,進而執行後續操作。上述兩個執行緒通過物件lock來完成互動,而物件上的wait()和notify/notifyAll()的關係就如同開關訊號一樣,用來完成等待方和通知方之間的互動工作。

public
class WaitNotify{ //不需要為volatile,因為對於flag的操作均在synchronized鎖的保護下進行,可以保證flag的記憶體可見性 static boolean flag = true; static Object lock = new Object(); public static void main(String args[]) throws Exception{ Thread waitThread = new Thread(new Wait(),"WaitThread"); waitThread.start(); TimeUnit.SECONDS.sleep(1);//1 second -> package java.util.cocurrent Thread notifyThread = new Thread(new Notify(),"NotifyThread"); notifyThread.start(); } static class Wait implements Runnable{ public void run(){ //加鎖,擁有lock的monitor synchronized(lock){ //當條件不滿足時,繼續wait,同時釋放了lock的鎖 while(flag){ try{ System.out.println("flag is ture, Wait"); lock.wait(); }catch(InterruptedException e){ //除了notify通知,帶超時的wait()方法、執行緒中斷機制也能喚醒此執行緒 } } System.out.println("flag is false,complete"); } } } static class Notify implements Runnable{ public void run(){ synchronized(lock){ //獲取lock的鎖,然後進行通知,通知時不會釋放lock的鎖,直到當前執行緒釋放了lock,呼叫了notifyAll,並且WaitThread獲得了鎖之後,wait執行緒才能從wait()方法返回 System.out.println("Notify get lock ,begin notify"); lock.notifyAll(); flag = false; try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized(lock){ System.out.println("Notify get lock again"); } } } }

注意:

  1. wait()、notify/notifyAll() 方法是Object的本地final方法,無法被重寫。
  2. wait()使當前執行緒阻塞,前提是 必須先獲得鎖,一般配合synchronized 關鍵字使用,即,一般在synchronized 同步程式碼塊裡使用 wait()、notify/notifyAll() 方法。
  3. notify/notifyAll() 的執行只是喚醒沉睡的執行緒,而不會立即釋放鎖,鎖的釋放要看程式碼塊的具體執行情況。所以在程式設計中,儘量在使用了notify/notifyAll() 後立即退出臨界區,以喚醒其他執行緒 。
  4. wait() 需要被try catch包圍,中斷也可以使wait等待的執行緒喚醒。
  5. notify 和wait 的順序不能錯,如果A執行緒先執行notify方法,B執行緒在執行wait方法,那麼B執行緒是無法被喚醒的。
  6. notify方法只喚醒一個等待(物件的)執行緒並使該執行緒開始執行。所以如果有多個執行緒等待一個物件,這個方法只會喚醒其中一個執行緒,選擇哪個執行緒取決於作業系統對多執行緒管理的實現。notifyAll 會喚醒所有等待(物件的)執行緒,儘管哪一個執行緒將會第一個處理取決於作業系統的實現。如果當前情況下有多個執行緒需要被喚醒,推薦使用notifyAll 方法。
  7. 多執行緒中要測試某個條件的變化,要使用while。只有當前值滿足需要值的時候,執行緒才可以往下執行,所以,必須使用while 迴圈阻塞。注意,wait() 當被喚醒時候,只是讓while迴圈繼續往下走.如果此處用if的話,意味著if繼續往下走,會跳出if語句塊。但是,notifyAll 只是負責喚醒執行緒,並不保證條件,所以需要手動來保證程式的邏輯。

生產者和消費者

什麼是生產者-消費者問題呢?假設有一個公共的容量有限的池子,有兩種人,一種是生產者,另一種是消費者。需要滿足如下條件: 1、生產者產生資源往池子裡新增,前提是池子沒有滿,如果池子滿了,則生產者暫停生產,直到自己的生成能放下池子。 2、消費者消耗池子裡的資源,前提是池子的資源不為空,否則消費者暫停消耗,進入等待直到池子裡有資源數滿足自己的需求。

功能實現

  • 共享倉庫介面
public interface AbstractStorage {
    void consume(int num);
    void produce(int num);
}
  • 共享倉庫物件
public class Storage1 implements AbstractStorage {
    //倉庫最大容量
    private final int MAX_SIZE = 100;
    //倉庫儲存的載體
    private LinkedList list = new LinkedList();

    //生產產品
    public void produce(int num){
        //同步
        synchronized (list){
            //倉庫剩餘的容量不足以存放即將要生產的數量,暫停生產
            while(list.size()+num > MAX_SIZE){
                System.out.println("【要生產的產品數量】:" + num + "\t【庫存量】:"
                        + list.size() + "\t暫時不能執行生產任務!");

                try {
                    //條件不滿足,生產阻塞
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            for(int i=0;i<num;i++){
                list.add(new Object());
            }

            System.out.println("【已經生產產品數】:" + num + "\t【現倉儲量為】:" + list.size());

            list.notifyAll();
        }
    }

    //消費產品
    public void consume(int num){
        synchronized (list){
            //不滿足消費條件
            while(num > list.size()){
                System.out.println("【要消費的產品數量】:" + num + "\t【庫存量】:"
                        + list.size() + "\t暫時不能執行生產任務!");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //消費條件滿足,開始消費
            for(int i=0;i<num;i++){
                list.remove();
            }

            System.out.println("【已經消費產品數】:" + num + "\t【現倉儲量為】:" + list.size());
            list.notifyAll();
        }
    }
}
  • 生成者
public class Producer extends Thread{
    //每次生產的數量
    private int num ;

    //所屬的倉庫
    public AbstractStorage abstractStorage;

    public Producer(AbstractStorage abstractStorage){
        this.abstractStorage = abstractStorage;
    }

    public void setNum(int num){
        this.num = num;
    }

    // 執行緒run函式
    @Override
    public void run() {
        produce(num);
    }

    // 呼叫倉庫Storage的生產函式
    public void produce(int num) {
        abstractStorage.produce(num);
    }
}
  • 消費者
public class Consumer extends Thread{
    // 每次消費的產品數量
    private int num;

    // 所在放置的倉庫
    private AbstractStorage abstractStorage1;

    // 建構函式,設定倉庫
    public Consumer(AbstractStorage abstractStorage1) {
        this.abstractStorage1 = abstractStorage1;
    }

    // 執行緒run函式
    public void run() {
        consume(num);
    }

    // 呼叫倉庫Storage的生產函式
    public void consume(int num) {
        abstractStorage1.consume(num);
    }

    public void setNum(int num){
        this.num = num;
    }
}
  • 測試類
public class Test{
    public static void main(String[] args) {
        // 倉庫物件
        AbstractStorage abstractStorage = new Storage1();

        // 生產者物件
        Producer p1 = new Producer(abstractStorage);
        Producer p2 = new Producer(abstractStorage);
        Producer p3 = new Producer(abstractStorage);
        Producer p4 = new Producer(abstractStorage);
        Producer p5 = new Producer(abstractStorage);
        Producer p6 = new Producer(abstractStorage);
        Producer p7 = new Producer(abstractStorage);

        // 消費者物件
        Consumer c1 = new Consumer(abstractStorage);
        Consumer c2 = new Consumer(abstractStorage);
        Consumer c3 = new Consumer(abstractStorage);

        // 設定生產者產品生產數量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);

        // 設定消費者產品消費數量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);

        // 執行緒開始執行
        c1.start();
        c2.start();
        c3.start();

        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
}
  • 測試結果
【要消費的產品數量】:30   【庫存量】:0 暫時不能執行生產任務!
【已經生產產品數】:10    【現倉儲量為】:10
【要消費的產品數量】:30   【庫存量】:10    暫時不能執行生產任務!
【已經生產產品數】:10    【現倉儲量為】:20
【要消費的產品數量】:30   【庫存量】:20    暫時不能執行生產任務!
【已經生產產品數】:10    【現倉儲量為】:30
【已經消費產品數】:30    【現倉儲量為】:0
【要消費的產品數量】:50   【庫存量】:0 暫時不能執行生產任務!
【要消費的產品數量】:20   【庫存量】:0 暫時不能執行生產任務!
【已經生產產品數】:10    【現倉儲量為】:10
【已經生產產品數】:10    【現倉儲量為】:20
【已經消費產品數】:20    【現倉儲量為】:0
【要消費的產品數量】:50   【庫存量】:0 暫時不能執行生產任務!
【已經生產產品數】:10    【現倉儲量為】:10
【已經生產產品數】:80    【現倉儲量為】:90
【已經消費產品數】:50    【現倉儲量為】:40

join()

Thread類中的join方法的主要作用就是同步,它可以使得執行緒之間的並行執行變為序列執行。

例子:

public class JoinTest {
    public static void main(String [] args) throws InterruptedException {
        ThreadJoinTest t1 = new ThreadJoinTest("小明");
        ThreadJoinTest t2 = new ThreadJoinTest("小東");
        t1.start();
        /**join的意思是使得放棄當前執行緒的執行,並返回對應的執行緒,例如下面程式碼的意思就是:
         程式在main執行緒中呼叫t1執行緒的join方法,則main執行緒放棄cpu控制權,並返回t1執行緒繼續執行直到執行緒t1執行完畢
         所以結果是t1執行緒執行完後,才到主執行緒執行,相當於在main執行緒中同步t1執行緒,t1執行完了,main執行緒才有執行的機會
         */
        t1.join();
        t2.start();
    }

}
class ThreadJoinTest extends Thread{
    public ThreadJoinTest(String name){
        super(name);
    }
    @Override
    public void run(){
        for(int i=0;i<1000;i++){
            System.out.println(this.getName() + ":" + i);
        }
    }
}

上面程式結果是先列印完小明執行緒,在列印小東執行緒;

上面註釋也大概說明了join方法的作用:在A執行緒中呼叫了B執行緒的join()方法時,表示只有當B執行緒執行完畢時,A執行緒才能繼續執行。注意,這裡呼叫的join方法是沒有傳參的,join方法其實也可以傳遞一個引數給它的。例如:t1.join(10);:如果A執行緒中掉用B執行緒的join(10),則表示A執行緒會等待B執行緒執行10毫秒,10毫秒過後,A、B執行緒並行執行。需要注意的是,jdk規定,join(0)的意思不是A執行緒等待B執行緒0秒,而是A執行緒等待B執行緒無限時間,直到B執行緒執行完畢,即join(0)等價於join()。

join方法必須線上程start方法呼叫之後呼叫才有意義。這個也很容易理解:如果一個執行緒都沒有start,那它也就無法同步了。

原始碼:

    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

我們可以看到原始碼使用了wait()方法。(注意這個join方法本身就是synchronized),這樣當前執行緒即處於等待狀態,必須執行notify()或notifyAll()才能喚醒,但實際工作上執行完run方法後,並不需要執行notify(),但後繼程式碼也會被喚醒並執行了,這是什麼原因呢?通過對Jvm natvie的原始碼分析,我們發現thread執行完成後,cpp的原始碼中會在thread執行完畢後,會呼叫exit方法,該方法中原來隱含有呼叫notify_all(thread)的動作。