1. 程式人生 > >生產者-消費者模式(阻塞佇列實現)

生產者-消費者模式(阻塞佇列實現)

生產者消費者模式是併發、多執行緒程式設計中經典的設計模式,生產者和消費者通過分離的執行工作解耦,簡化了開發模式,生產者和消費者可以以不同的速度生產和消費資料。這篇文章我們來看看什麼是生產者消費者模式,這個問題也是多執行緒面試題中經常被提及的。如何使用阻塞佇列(Blocking Queue)解決生產者消費者模式,以及使用生產者消費者模式的好處。

真實世界中的生產者消費者模式

生產者和消費者模式在生活當中隨處可見,它描述的是協調與協作的關係。比如一個人正在準備食物(生產者),而另一個人正在吃(消費者),他們使用一個共用的桌子用於放置盤子和取走盤子,生產者準備食物,如果桌子上已經滿了就等待,如果桌子空了的話消費者(那個吃的)等待。這裡桌子就是一個共享的物件。在Java Executor框架自身實現了生產者消費者模式它們分別負責新增和執行任務。

生產者消費者模式的好處

它的確是一種實用的設計模式,常用於編寫多執行緒或併發程式碼。下面是它的一些優點:

  1.  它簡化的開發,你可以獨立地或併發的編寫消費者和生產者,它僅僅只需知道共享物件是誰
  2. 生產者不需要知道誰是消費者或者有多少消費者,對消費者來說也是一樣
  3.  生產者和消費者可以以不同的速度執行
  4.  分離的消費者和生產者在功能上能寫出更簡潔、可讀、易維護的程式碼

多執行緒中的生產者消費者問題

生產者消費者問題是一個流行的面試題,面試官會要求你實現生產者消費者設計模式,以至於能讓生產者應等待如果佇列或籃子滿了的話,消費者等待如果佇列或者籃子是空的。這個問題可以用不同的方式來現實,經典的方法是使用wait和notify方法在生產者和消費者執行緒中合作,在佇列滿了或者佇列是空的條件下阻塞,Java5的阻塞佇列(BlockingQueue)資料結構更簡單,因為它隱含的提供了這些控制,現在你不需要使用wait和nofity在生產者和消費者之間通訊了,阻塞佇列的put()方法將阻塞如果佇列滿了,佇列take()方法將阻塞如果佇列是空的。在下部分我們可以看到程式碼例子。

使用阻塞佇列實現生產者消費者模式

阻塞佇列實現生產者消費者模式超級簡單,它提供開箱即用支援阻塞的方法put()和take(),開發者不需要寫困惑的wait、nofity程式碼去實現通訊。BlockingQueue 一個介面,Java5提供了不同的現實,如ArrayBlockingQueue和LinkedBlockingQueue,兩者都是先進先出(FIFO)順序。而ArrayLinkedQueue是自然有界的,LinkedBlockingQueue可選的邊界。下面這是一個完整的生產者消費者程式碼例子,對比傳統的wait、nofity程式碼,它更易於理解。

put()方法:類似於我們上面的生產者執行緒,容量達到最大時,自動阻塞。

take()方法:類似於我們上面的消費者執行緒,容量為0時,自動阻塞。

/**
 * 倉庫類Storage實現緩衝區
 * 
 */
public class Storage {

	// 倉庫儲存的載體 (容量為6)
	private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(6);

	// 生產num個產品
	public void produce() {

		int number = 0;
		
		while (number < 10) {
			try {
				// 放入產品,容量滿的時候自動阻塞
				list.put(number);
				System.out.println("生產1個\t【庫存量】" + list.size());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			number++;
		}
	}

	// 消費num個產品
	public void consume() {

		while (true) {
			try {
				// 消費產品,容量為空的時候自動阻塞
				System.out.println("消費1個:" + list.take() + "\t【庫存量】"
						+ list.size());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
public class Producer extends Thread {

	// 所在放置的倉庫
	private Storage storage;

	// 建構函式,設定倉庫
	public Producer(Storage storage) {
		this.storage = storage;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		storage.produce();
	}
}
public class Consumer extends Thread {

	// 所在放置的倉庫
	private Storage storage;

	// 建構函式,設定倉庫
	public Consumer(Storage storage) {
		this.storage = storage;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		storage.consume();
	}
}
public class Test {

	public static void main(String[] args) {

		// 倉庫物件
		Storage storage = new Storage();

		// 生產者物件
		Producer p1 = new Producer(storage);

		// 消費者物件
		Consumer c1 = new Consumer(storage);

		// 執行緒開始執行
		c1.start();
		p1.start();
	}
}
OutPut:
生產1個	【庫存量】1
消費1個:0	【庫存量】0
生產1個	【庫存量】1
生產1個	【庫存量】1
消費1個:1	【庫存量】0
生產1個	【庫存量】2
消費1個:2	【庫存量】1
生產1個	【庫存量】2
消費1個:3	【庫存量】1
生產1個	【庫存量】2
消費1個:4	【庫存量】1
消費1個:5	【庫存量】1
消費1個:6	【庫存量】0
生產1個	【庫存量】2
生產1個	【庫存量】1
生產1個	【庫存量】2
生產1個	【庫存量】3
消費1個:7	【庫存量】2
消費1個:8	【庫存量】1
消費1個:9	【庫存量】0
有時使用BlockingQueue可能會出現put()和System.out.println()輸出不匹配的情況,這是由於它們之間沒有同步造成的。當緩衝區已滿,生產者在put()操作時,put()內部呼叫了await()方法,放棄了執行緒的執行,然後消費者執行緒執行,呼叫take()方法,take()內部呼叫了signal()方法,通知生產者執行緒可以執行,致使在消費者的println()還沒執行的情況下生產者的println()先被執行,所以有了輸出不匹配的情況