1. 程式人生 > >生產者消費者模型(wait/notify/notifyAll)

生產者消費者模型(wait/notify/notifyAll)

生產者和消費者之間的阻塞佇列

生產者消費者模型在實際生活中很多運用。對我們自己來說就是一個消費者,比如我們需要買奧利奧餅乾,我們會去超時買,並不是直接從廠商那裡買,而廠商把奧利奧生產結束後會送往各個超市進行售賣,超市就是我們消費者和生產者之間媒介。這個超市在作業系統定義為阻塞佇列。
生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從
阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

在瞭解生產者消費者模型之前需要先了解幾個方法~~

wait()方法

    public final void wait() throws InterruptedException 

wait()使執行緒停止執行,會釋放物件鎖。

  • wait()方法必須用於同步程式碼塊和同步方法,而且必須是內建鎖(synchronized),因為需要鎖住的物件。如果呼叫wait()時沒有合適的鎖,會拋異常。
  • wait()方法會使當前執行緒呼叫該方法後進行等待,並且將該執行緒置入鎖物件的等待佇列中,直到接到通知或被中斷而已。如果沒有接到通知或者中斷,會一直等。
  • wait()方法執行後當前執行緒釋放鎖,其他執行緒可以競爭該鎖。
package
CODE.多執行緒; public class Wait1 { public static void main(String[] args) throws InterruptedException { Object ob=new Object(); synchronized (ob) { System.out.println("等待開始..."); ob.wait(); System.out.println("等待結束..."); } System.
out.println("main方法結束..."); } }

只會打印出“等待開始…”,因為沒有喚醒等待,該程序就一直等待…,不會執行後面語句。

wait方法從執行態到阻塞態

wait (long time):如果到了預計時間還沒被喚醒,將繼續執行後續程式碼

wait( )之後的執行緒繼續執行有2種方法:

  • 呼叫該物件的notify( )方法喚醒等待執行緒
  • 執行緒等待時呼叫interrupt( )中斷該執行緒

1.首先用notify()喚醒執行緒:

notify()

    public final native void notify();

  • notify( )方法也必須在同步方法或者同步程式碼塊中呼叫,用來喚醒等待在該物件上的執行緒。如果有多個執行緒等待,則任意挑一個執行緒喚醒.
  • notify( )方法執行後,喚醒執行緒不會立即釋放物件鎖,要等待喚醒執行緒全部執行才會釋放鎖。
import static java.lang.Thread.sleep;

//notify()
class waitnotifyT
{
    synchronized public void  waitMethod() {
        System.out.println("等待開始...");
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("等待結束...");
    }
    synchronized public void  notifyMethod() {
        System.out.println("喚醒開始...");
       notify();
        System.out.println("喚醒結束...");
    }
}


class Mthread implements  Runnable
{
    private waitnotifyT waitnotifyT1;
    private int flag=1;
    public Mthread(waitnotifyT waitnotifyT1,int flag) {
        this.waitnotifyT1 = waitnotifyT1;
        this.flag=flag;
    }

    @Override
    public void run()
    {
        if(flag==1)
        {
            waitnotifyT1.waitMethod();
        }
        else
            waitnotifyT1.notifyMethod();
    }
}
public class Wait1
{
    public static void main(String[] args) throws InterruptedException {
        waitnotifyT wn=new waitnotifyT();
        Mthread  waitThread=new Mthread(wn,1);
        Mthread notifyThread=new Mthread(wn,0);
        new Thread(waitThread).start();
        sleep(1000);
        new Thread(notifyThread).start();
    }
}

在這裡插入圖片描述
從結果可以看見當執行notify後,喚醒執行緒不會立即釋放鎖,而是喚醒執行緒全部執行完,才會釋放鎖,執行wait()之後的程式碼。

2.當執行緒使用wait導致執行緒阻塞,呼叫interrupt會拋一個InterruptedException異常,將執行緒終止。

class WaitTh implements Runnable
{
    synchronized private void waitM()
    {
        System.out.println("等待開始...");
        try {
            wait();
        } catch (InterruptedException e) {
            System.out.println("等待被中斷");        }
    }
    public void run()
    {
        waitM();
    }
}
public class Wait1
{
    public static void main(String[] args)  {
        Thread thread=new Thread(new WaitTh());
        thread.start();
        thread.interrupt();
    }
}

在這裡插入圖片描述

notify和wait等待和喚醒的是同一個物件

notifyAll()
notifyAll方法可以一次喚醒所有的等待在該物件上的執行緒。

    public final native void notifyAll();

////notifyAll()

class waitnotifyT
{
    synchronized public void  waitMethod() {
        System.out.println(Thread.currentThread().getName()+":等待開始...");
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+":等待結束...");
    }
    synchronized public void  notifyMethod() {
        System.out.println(Thread.currentThread().getName()+":喚醒開始...");
        notifyAll();
        System.out.println(Thread.currentThread().getName()+":喚醒結束...");
    }
}


class Mthread implements  Runnable
{
    private waitnotifyT waitnotifyT1;
    private int flag=1;
    public Mthread(waitnotifyT waitnotifyT1,int flag) {
        this.waitnotifyT1 = waitnotifyT1;
        this.flag=flag;
    }

    @Override
    public void run()
    {
        if(flag==1)
        {
            waitnotifyT1.waitMethod();
        }
        else
            waitnotifyT1.notifyMethod();
    }
}
public class Wait1
{
    public static void main(String[] args) throws InterruptedException {
        waitnotifyT wn=new waitnotifyT();
        Mthread  waitThread=new Mthread(wn,1);
        Mthread notifyThread=new Mthread(wn,0);
        new Thread(waitThread,"等待執行緒1").start();
        new Thread(waitThread,"等待執行緒2").start();
        new Thread(waitThread,"等待執行緒3").start();
        new Thread(waitThread,"等待執行緒4").start();
        new Thread(waitThread,"等待執行緒5").start();
        new Thread(notifyThread,"喚醒執行緒").start();

    }
}

在這裡插入圖片描述

出現執行緒阻塞的幾種情況:
1.呼叫sleep( )方法,主動放棄佔有的cpu,不會釋放物件鎖
2.呼叫阻塞式IO方法(read( )、write( )),在該方法前,執行緒阻塞;
3.執行緒試圖獲取某個monitor,但該monitor被其他執行緒所持有導致阻塞;
4.執行緒等待某個通知,即呼叫wait( )方法(join方法是wait方法一層包裝,也會出現執行緒阻塞);釋放物件鎖;
5.呼叫執行緒suspend(),將執行緒掛起,容易導致死鎖,已被廢棄。


run()方法執行結束後,會進入銷燬階段,整個執行緒執行完畢。

同步佇列和等待佇列:

每一個物件監視器monitor都有2個佇列,一個稱同步佇列,一個稱等待佇列。

  • 同步佇列:因為競爭monitor(物件鎖)失敗導致阻塞的執行緒,這些執行緒等待cpu再次排程。或者說同步佇列儲存了將要獲得鎖的執行緒。
  • 等待佇列:因為呼叫wait()導致等待的執行緒,喚醒後進入同步佇列競爭鎖。喚醒後不會立即競爭鎖,而是先進入同步佇列,然後等待cpu排程再次競爭鎖。

單個生產者消費模型:

一個生產者,一個消費者。對生產者來說,當生產數量大於0時,即還有庫存,即先等消費者消費;對消費者來說,如果產品數量小於等於0,需要先den生產者生產。
程式碼如下:

package CODE.多執行緒;

////生成者消費者模型

class Goods
{
    private String goodsName;  //商品名稱
    private int counts;       //商品數量

    synchronized public void pro(String goodsName)
    {
        //產品數量大於0,等待消費者消費
        if(counts>0)
        {
            try {
                System.out.println("等一會生產,還有庫存");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        counts++;
        this.goodsName=goodsName;
        System.out.println(Thread.currentThread().getName()+"生產"+this.goodsName+":"+counts);
        notify(); //生產結束後,喚醒等待的消費者
    }
    synchronized public void Con()
    {
        //產品數量為0,等待生產者生產
        if(counts==0)
        {
            try {
                System.out.println("等一會來買,沒有庫存");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        counts--;
        System.out.println(Thread.currentThread().getName()+"消費"+this.goodsName+":"+counts);
        notify(); //喚醒等待的生產者
    }

}
class Trade implements Runnable
{
    private Goods goods;
    //生產者生產產品
    private int flag;

    public Trade(Goods goods, int flag) {
        this.goods = goods;
        this.flag = flag;
    }

    @Override
    public void run()
    {
        if(flag==1)
        {
            goods.pro("奧利奧餅乾");
        }
        else
        {
            goods.Con();
        }
    }
}
public class ProCon {
    public static void main(String[] args)
    {
        Goods goods=new Goods();
        new Thread(new Trade(goods,1),"生產者1").start();
        new Thread(new Trade(goods,0),"消費者1").start();
    }
}

在這裡插入圖片描述
以上針對的是生產一次,消費一次。但是在實際生活中更多的是生產多次,消費多次。即多生產多消費。

多生產多消費:
多生產:可以多生產的產品數量有一個上限;
多消費:消費者是隻要有產品就可以消費。

package CODE.多執行緒;


////生成者消費者模型

import java.util.ArrayList;
import java.util.List;

class MoreGoods
{
    private String goodsName;  //商品名稱
    private int counts;       //商品數量

    synchronized public void pro(String goodsName)  {
         //20ms生產一個商品
         try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            counts++;
            this.goodsName=goodsName;
            System.out.println(Thread.currentThread().getName()+"生產"+this.goodsName+":"+counts);
            notifyAll(); //喚醒等待的消費者

    }
    synchronized public void Con()
    {
        //用迴圈是因為如果一消費者等待一會後,另一個消費者將唯一的產品消費,那麼該消費者就需要再重新判斷是否有資源消費
            while (counts <= 0) {
                try {
                    System.out.println("等一會來買,沒有庫存");
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //100ms消費一個
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        counts--;
        System.out.println(Thread.currentThread().getName() + "消費" + this.goodsName + ":" + counts);

    }
    public int getCounts() {
        return counts;
    }
}
class MoreTrade implements Runnable
{
    private MoreGoods goods;
    //生產者生產產品
    private int flag;

    public MoreTrade(MoreGoods goods, int flag) {
        this.goods = goods;
        this.flag = flag;
    }

    @Override
    public void run() {

        if (flag == 1) {
            while (goods.getCounts() < 200) {
                goods.pro("奧利奧餅乾");
            }
        } else {
            while (true)
                goods.Con();

        }
    }
}
public class MorePro {
    public static void main(String[] args) {
        MoreGoods goods = new MoreGoods();
        List<Thread> list=new ArrayList<>();  //陣列存的是生產消費執行緒,因為生產消費執行緒需要同時啟動
        //5個生產者
        for(int i=0;i<5;i++)
        {
            Thread thread=new Thread(new MoreTrade(goods,1),"生產者"+i);
            list.add(thread);
        }
        //3個消費者 
        for(int i=0;i<3;i++)
        {
            Thread thread=new Thread(new MoreTrade(goods,0),"消費者"+i);
            list.add(thread); 
        }
        for(Thread thread:list) {
           thread.start();  //將所有執行緒都啟動
        }
    }
}

在這裡插入圖片描述