1. 程式人生 > 其它 >Java生產者消費者模型

Java生產者消費者模型

簡單分析生產者消費者模型

問題描述

在生活中會遇到這樣一種問題,在經營一個商店時,既需要去進貨,又需要消費者來消費,這樣才能使這個商店經營下去。但在無人為干涉情況下,進貨的速度與消費者消費的速度不一定相等。這時,若進貨的速度大於消費者消費的速度,就會造成商品的堆積;反之,若消費者消費的速度大於進貨的速度,就會出現消費者買不到商品的情況。此時便需要商店根據消費者消費的速度來控制進貨的次數。

這就對應了程式中的生產執行緒和消費執行緒,還有中間的緩衝區。生產執行緒負責生產資料到緩衝區,消費執行緒負責處理緩衝區的資料。

涉及到此問題的三個方法wait()、notify()和notifyAll()

這三個方法都是Object中的方法,它們的使用需要用到關鍵字synchronized來建立同步方法或同步程式碼塊,此時需要獲取某個物件的監視器,以下稱為鎖。當執行緒的執行程式碼前加上這個關鍵字,它就會去跟其他執行緒競爭那個物件的鎖,競爭到了便執行同步程式碼塊中的程式碼,否則處在這一步,直到競爭到了鎖才會執行下面的程式碼。當同步程式碼塊中執行了wait()方法時,此執行緒暫停,進入阻塞佇列,同時釋放鎖,這時,其他執行緒便會來競爭這個鎖。當同步程式碼塊中執行了notify()方法時,它會喚醒處於wait()狀態下的某個執行緒,使該執行緒從阻塞佇列切換到競爭鎖的佇列中,但執行完notify()方法後,它不會立即釋放鎖,而是執行完同步程式碼塊中的剩餘程式碼後再釋放鎖。notifyAll()功能與notify()基本相同,但前者會將阻塞佇列中的所有執行緒都喚醒。

利用此三個方法實現生產者消費者模型

當商店的貨架滿了後生產者執行緒暫停,否則,向貨架中新增一個商品並通知消費者消費;當消費者消費一個商品時,通知生產者生產,當貨架為空時消費者執行緒暫停。具體程式碼實現如下:

package com.MyStore201031;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class Store {
	public static void main(String[] agrs) {
		Store st=new Store();
		List<
Goods>
goods=new ArrayList<Goods>(); Thread producer1=new Thread(st.new Producer(goods,1)); Thread comsumer1=new Thread(st.new Comsumer(goods,1)); Thread producer2=new Thread(st.new Producer(goods,2)); Thread comsumer2=new Thread(st.new Comsumer(goods,2)); Thread producer3=new Thread(st.new Producer(goods,3)); Thread comsumer3=new Thread(st.new Comsumer(goods,3)); Thread producer4=new Thread(st.new Producer(goods,4)); Thread comsumer4=new Thread(st.new Comsumer(goods,4)); producer1.start(); comsumer1.start(); producer2.start(); comsumer2.start(); producer3.start(); comsumer3.start(); producer4.start(); comsumer4.start(); } class Goods{} class Producer implements Runnable{ private static final int MAX_COUNT=10; private int count; private List<Goods> goods; Producer(List<Goods> goods,int count){ this.goods=goods; this.count=count; } @Override public void run() { while (true) { synchronized (goods) { if(goods.size()==MAX_COUNT) { System.out.println("生產者-"+this.count+" 貨架已滿,無法生產"); try { goods.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { Goods one = new Goods(); goods.add(one); System.out.println("生產者-" + this.count + " 新增貨品,已有貨品:" + goods.size()); //通知消費者消費 goods.notify(); } } try { // 生產一次間隔一段時間 Thread.sleep(new Random().nextInt(1500)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class Comsumer implements Runnable{ private List<Goods> goods; private int count; Comsumer(List<Goods> goods,int count){ this.goods=goods; this.count=count; } @Override public void run() { while (true) { synchronized (goods) { if (goods.size() > 0) { goods.remove(0); System.out.println("消費者-" + this.count + " 消費貨品,剩餘貨品:" + goods.size()); //通知生產者生產 goods.notify(); } else if (goods.size() == 0) { System.out.println("消費者-" + this.count + " 貨架已空,無法消費"); try { goods.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } try { // 消費一次間隔一段時間,比生產的間隔時間更長,使二者速度不一致 Thread.sleep(new Random().nextInt(3000)); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } }

執行結果