生產者消費者模型(wait/notify/notifyAll)
生產者和消費者之間的阻塞佇列
生產者消費者模型在實際生活中很多運用。對我們自己來說就是一個消費者,比如我們需要買奧利奧餅乾,我們會去超時買,並不是直接從廠商那裡買,而廠商把奧利奧生產結束後會送往各個超市進行售賣,超市就是我們消費者和生產者之間媒介。這個超市在作業系統定義為阻塞佇列。
生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從
阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
在瞭解生產者消費者模型之前需要先了解幾個方法~~
wait()方法
public final void wait() throws InterruptedException
wait()使執行緒停止執行,會釋放物件鎖。
- wait()方法必須用於同步程式碼塊和同步方法,而且必須是內建鎖(synchronized),因為需要鎖住的物件。如果呼叫wait()時沒有合適的鎖,會拋異常。
- wait()方法會使當前執行緒呼叫該方法後進行等待,並且將該執行緒置入鎖物件的等待佇列中,直到接到通知或被中斷而已。如果沒有接到通知或者中斷,會一直等。
- wait()方法執行後當前執行緒釋放鎖,其他執行緒可以競爭該鎖。
package CODE.多執行緒;
public class Wait1
{
public static void main(String[] args) throws InterruptedException {
Object ob=new Object();
synchronized (ob)
{
System.out.println("等待開始...");
ob.wait();
System.out.println("等待結束...");
}
System. out.println("main方法結束...");
}
}
只會打印出“等待開始…”,因為沒有喚醒等待,該程序就一直等待…,不會執行後面語句。
wait方法從執行態到阻塞態
wait (long time):如果到了預計時間還沒被喚醒,將繼續執行後續程式碼
wait( )之後的執行緒繼續執行有2種方法:
- 呼叫該物件的notify( )方法喚醒等待執行緒
- 執行緒等待時呼叫interrupt( )中斷該執行緒
1.首先用notify()喚醒執行緒:
notify()
public final native void notify();
- notify( )方法也必須在同步方法或者同步程式碼塊中呼叫,用來喚醒等待在該物件上的執行緒。如果有多個執行緒等待,則任意挑一個執行緒喚醒.
- notify( )方法執行後,喚醒執行緒不會立即釋放物件鎖,要等待喚醒執行緒全部執行才會釋放鎖。
import static java.lang.Thread.sleep;
//notify()
class waitnotifyT
{
synchronized public void waitMethod() {
System.out.println("等待開始...");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待結束...");
}
synchronized public void notifyMethod() {
System.out.println("喚醒開始...");
notify();
System.out.println("喚醒結束...");
}
}
class Mthread implements Runnable
{
private waitnotifyT waitnotifyT1;
private int flag=1;
public Mthread(waitnotifyT waitnotifyT1,int flag) {
this.waitnotifyT1 = waitnotifyT1;
this.flag=flag;
}
@Override
public void run()
{
if(flag==1)
{
waitnotifyT1.waitMethod();
}
else
waitnotifyT1.notifyMethod();
}
}
public class Wait1
{
public static void main(String[] args) throws InterruptedException {
waitnotifyT wn=new waitnotifyT();
Mthread waitThread=new Mthread(wn,1);
Mthread notifyThread=new Mthread(wn,0);
new Thread(waitThread).start();
sleep(1000);
new Thread(notifyThread).start();
}
}
從結果可以看見當執行notify後,喚醒執行緒不會立即釋放鎖,而是喚醒執行緒全部執行完,才會釋放鎖,執行wait()之後的程式碼。
2.當執行緒使用wait導致執行緒阻塞,呼叫interrupt會拋一個InterruptedException異常,將執行緒終止。
class WaitTh implements Runnable
{
synchronized private void waitM()
{
System.out.println("等待開始...");
try {
wait();
} catch (InterruptedException e) {
System.out.println("等待被中斷"); }
}
public void run()
{
waitM();
}
}
public class Wait1
{
public static void main(String[] args) {
Thread thread=new Thread(new WaitTh());
thread.start();
thread.interrupt();
}
}
notify和wait等待和喚醒的是同一個物件
notifyAll()
notifyAll方法可以一次喚醒所有的等待在該物件上的執行緒。
public final native void notifyAll();
////notifyAll()
class waitnotifyT
{
synchronized public void waitMethod() {
System.out.println(Thread.currentThread().getName()+":等待開始...");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":等待結束...");
}
synchronized public void notifyMethod() {
System.out.println(Thread.currentThread().getName()+":喚醒開始...");
notifyAll();
System.out.println(Thread.currentThread().getName()+":喚醒結束...");
}
}
class Mthread implements Runnable
{
private waitnotifyT waitnotifyT1;
private int flag=1;
public Mthread(waitnotifyT waitnotifyT1,int flag) {
this.waitnotifyT1 = waitnotifyT1;
this.flag=flag;
}
@Override
public void run()
{
if(flag==1)
{
waitnotifyT1.waitMethod();
}
else
waitnotifyT1.notifyMethod();
}
}
public class Wait1
{
public static void main(String[] args) throws InterruptedException {
waitnotifyT wn=new waitnotifyT();
Mthread waitThread=new Mthread(wn,1);
Mthread notifyThread=new Mthread(wn,0);
new Thread(waitThread,"等待執行緒1").start();
new Thread(waitThread,"等待執行緒2").start();
new Thread(waitThread,"等待執行緒3").start();
new Thread(waitThread,"等待執行緒4").start();
new Thread(waitThread,"等待執行緒5").start();
new Thread(notifyThread,"喚醒執行緒").start();
}
}
出現執行緒阻塞的幾種情況:
1.呼叫sleep( )方法,主動放棄佔有的cpu,不會釋放物件鎖
2.呼叫阻塞式IO方法(read( )、write( )),在該方法前,執行緒阻塞;
3.執行緒試圖獲取某個monitor,但該monitor被其他執行緒所持有導致阻塞;
4.執行緒等待某個通知,即呼叫wait( )方法(join方法是wait方法一層包裝,也會出現執行緒阻塞);釋放物件鎖;
5.呼叫執行緒suspend(),將執行緒掛起,容易導致死鎖,已被廢棄。
run()方法執行結束後,會進入銷燬階段,整個執行緒執行完畢。
同步佇列和等待佇列:
每一個物件監視器monitor都有2個佇列,一個稱同步佇列,一個稱等待佇列。
- 同步佇列:因為競爭monitor(物件鎖)失敗導致阻塞的執行緒,這些執行緒等待cpu再次排程。或者說同步佇列儲存了將要獲得鎖的執行緒。
- 等待佇列:因為呼叫wait()導致等待的執行緒,喚醒後進入同步佇列競爭鎖。喚醒後不會立即競爭鎖,而是先進入同步佇列,然後等待cpu排程再次競爭鎖。
單個生產者消費模型:
一個生產者,一個消費者。對生產者來說,當生產數量大於0時,即還有庫存,即先等消費者消費;對消費者來說,如果產品數量小於等於0,需要先den生產者生產。
程式碼如下:
package CODE.多執行緒;
////生成者消費者模型
class Goods
{
private String goodsName; //商品名稱
private int counts; //商品數量
synchronized public void pro(String goodsName)
{
//產品數量大於0,等待消費者消費
if(counts>0)
{
try {
System.out.println("等一會生產,還有庫存");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
counts++;
this.goodsName=goodsName;
System.out.println(Thread.currentThread().getName()+"生產"+this.goodsName+":"+counts);
notify(); //生產結束後,喚醒等待的消費者
}
synchronized public void Con()
{
//產品數量為0,等待生產者生產
if(counts==0)
{
try {
System.out.println("等一會來買,沒有庫存");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
counts--;
System.out.println(Thread.currentThread().getName()+"消費"+this.goodsName+":"+counts);
notify(); //喚醒等待的生產者
}
}
class Trade implements Runnable
{
private Goods goods;
//生產者生產產品
private int flag;
public Trade(Goods goods, int flag) {
this.goods = goods;
this.flag = flag;
}
@Override
public void run()
{
if(flag==1)
{
goods.pro("奧利奧餅乾");
}
else
{
goods.Con();
}
}
}
public class ProCon {
public static void main(String[] args)
{
Goods goods=new Goods();
new Thread(new Trade(goods,1),"生產者1").start();
new Thread(new Trade(goods,0),"消費者1").start();
}
}
以上針對的是生產一次,消費一次。但是在實際生活中更多的是生產多次,消費多次。即多生產多消費。
多生產多消費:
多生產:可以多生產的產品數量有一個上限;
多消費:消費者是隻要有產品就可以消費。
package CODE.多執行緒;
////生成者消費者模型
import java.util.ArrayList;
import java.util.List;
class MoreGoods
{
private String goodsName; //商品名稱
private int counts; //商品數量
synchronized public void pro(String goodsName) {
//20ms生產一個商品
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
counts++;
this.goodsName=goodsName;
System.out.println(Thread.currentThread().getName()+"生產"+this.goodsName+":"+counts);
notifyAll(); //喚醒等待的消費者
}
synchronized public void Con()
{
//用迴圈是因為如果一消費者等待一會後,另一個消費者將唯一的產品消費,那麼該消費者就需要再重新判斷是否有資源消費
while (counts <= 0) {
try {
System.out.println("等一會來買,沒有庫存");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//100ms消費一個
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
counts--;
System.out.println(Thread.currentThread().getName() + "消費" + this.goodsName + ":" + counts);
}
public int getCounts() {
return counts;
}
}
class MoreTrade implements Runnable
{
private MoreGoods goods;
//生產者生產產品
private int flag;
public MoreTrade(MoreGoods goods, int flag) {
this.goods = goods;
this.flag = flag;
}
@Override
public void run() {
if (flag == 1) {
while (goods.getCounts() < 200) {
goods.pro("奧利奧餅乾");
}
} else {
while (true)
goods.Con();
}
}
}
public class MorePro {
public static void main(String[] args) {
MoreGoods goods = new MoreGoods();
List<Thread> list=new ArrayList<>(); //陣列存的是生產消費執行緒,因為生產消費執行緒需要同時啟動
//5個生產者
for(int i=0;i<5;i++)
{
Thread thread=new Thread(new MoreTrade(goods,1),"生產者"+i);
list.add(thread);
}
//3個消費者
for(int i=0;i<3;i++)
{
Thread thread=new Thread(new MoreTrade(goods,0),"消費者"+i);
list.add(thread);
}
for(Thread thread:list) {
thread.start(); //將所有執行緒都啟動
}
}
}